In [18]:
from src.spark import Spark
import src.tweet_volume as funcs
import matplotlib.pyplot as plt
from pyspark.sql import functions as fs
import os 
import pandas as pd
from src.plotting import double_plot
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import src.nlp.clean as clean
from pyspark.sql.types import StringType, IntegerType, StructField, StructType


In [4]:
spark = Spark('load', 'local')
sess = spark.session()

In [None]:
df = funcs.load_dataframe(sess, '/cs/home/ls99/PycharmProjects/Spark/data/*.json', funcs.schema)
df2 = funcs.parse_timestamp(df)
eth_df = df2.filter(fs.lower(df['text']).like("%ether%") | fs.lower(df['text']).like("%eth%") | fs.lower(df['text']).like("%ethereum%"))
eth_clean = clean.clean_tweets(eth_df, "text")



In [7]:
eth_clean.first().text

u' #ethereum #eth'

In [13]:
sent_schema = StructType([
    StructField("target", IntegerType(), True),
    StructField("text", StringType(), True)
])

train_df = sess.read.csv("/cs/unique/ls99-kf39-cs5052/train/train.csv", header=True, schema=sent_schema)


In [4]:

train_df.first()

Row(target=0, text=u'                     is so sad for my APL friend.............')

In [15]:
train_clean = clean.clean_tweets(train_df, "text")


In [16]:
eth_train, eth_eval = train_clean.randomSplit([0.99, 0.1], seed=42)
print("Number of tweets in train: {:,}".format(eth_train.count()))

Number of tweets in train: 1,434,200


In [8]:
from pyspark.ml.feature import NGram, VectorAssembler, CountVectorizer


def ngram_model(input_col=("text", "target"), n=3):
    tokeniser = [Tokenizer(inputCol=input_col[0], outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="%d_grams" % i) for i in range(1, n + 1)
    ]
    count_vectoriser = [
        CountVectorizer(vocabSize=5000, inputCol="%d_grams" % i, outputCol="%d_tf" % i) for i in range(1, n+1)
    ]
    inverse_doc_freq = [
        IDF(inputCol="%d_tf" % i, outputCol="%d_idf" %i, minDocFreq=5) for i in range(1, n + 1)
    ]
    
    vector_assembler = [
        VectorAssembler(inputCols=["%d_idf" % i for i in range(1, n+1)], outputCol="features")
    ]
    
    string_index = [
        StringIndexer(inputCol=input_col[1], outputCol="label", handleInvalid="keep")
    ]
    
    return Pipeline(stages=tokeniser + ngrams + count_vectoriser + inverse_doc_freq + vector_assembler + string_index)

In [8]:

def simple_tfidf_model(input_col=("text","target")):
    tokeniser = Tokenizer(inputCol=input_col[0], outputCol="words")
    hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol="tf")
    idf = IDF(inputCol="tf", outputCol="features", minDocFreq=5)
    
    string_index = StringIndexer(inputCol=input_col[1], outputCol="label", handleInvalid="keep")
    pipeline = Pipeline(stages=[tokeniser, hashtf, idf, string_index])
    
    return pipeline


RocAuc: 0.8661, Accuracy: 0.7906


In [9]:
pipeline = ngram_model()

# Sentiment feature selection pipeline
ngram_pipeline = pipeline.fit(eth_train)
eth_train_df = ngram_pipeline.transform(eth_train)
eth_eval_df = ngram_pipeline.transform(eth_eval)


RocAuc: 0.8661, Accuracy: 0.7906


In [10]:


# Fit a classifier to classify as either positive or negative sentiment
lr = LogisticRegression(maxIter=100, regParam=0.1)
classifier = lr.fit(eth_train_df)


RocAuc: 0.8661, Accuracy: 0.7906


In [11]:

predictions = classifier.transform(eth_eval_df)



RocAuc: 0.8661, Accuracy: 0.7906


In [12]:
report = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")


def classification_report(predictions):
    bin_rep = report.evaluate(predictions)
    accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(eth_eval_df.count())

    print("RocAuc: %.4f, Accuracy: %.4f" % (bin_rep, accuracy))


RocAuc: 0.8661, Accuracy: 0.7906


In [18]:
ngram_pipeline.save("/cs/unique/ls99-kf39-cs5052/models/pipeline")
classifier.save("/cs/unique/ls99-kf39-cs5052/models/linreg")

In [11]:
eth_model = PipelineModel.load("/cs/unique/ls99-kf39-cs5052/models/pipeline")
classifier = LogisticRegressionModel.load("/cs/unique/ls99-kf39-cs5052/models/linreg")

In [12]:
eth_transformed = ngram_pipeline.transform(eth_clean)
eth_pred = classifier.transform(eth_transformed)

positive = eth_pred.filter(eth_pred.prediction == 0.0).count()
negative = eth_pred.filter(eth_pred.prediction == 1.0).count()

print("Ether tweets positive: {:,}, negative: {:,}".format(positive, negative))

Ether tweets positive: 13369.000000, negative: 1022.000000
