In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Tokenizer,StopWordsRemover
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
import numpy as np
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import NaiveBayes
from pyspark.sql import SQLContext
import pyspark as ps
from pyspark.ml.classification import LinearSVC
import warnings
from nltk.stem.snowball import SnowballStemmer
import pandas
import matplotlib.pyplot as plt
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
import pandas as pd

In [2]:
def init_spark():
    print("initializing spark...")
    try:
        sc = ps.SparkContext('local[*]')
        sqlContext = SQLContext(sc)
        print("Just created a SparkContext")
    except ValueError:
        warnings.warn("SparkContext already exists in this scope")
    spark=SparkSession.builder.getOrCreate()

In [3]:
def read_file(fileName):
    print("reading csv file...")
    df=spark.read.csv(fileName,sep=",",inferSchema=True,header=False)
    return df

In [4]:
def pre_process(df):
    print("preprocessing...")
#     df.printSchema()
#     df.show(truncate=False)
    df.count()
    
    df1=df.withColumnRenamed('_c0',"id").withColumnRenamed('_c1','label').withColumnRenamed('_c2','tweet')
#     df1.printSchema()
#     df1.show(truncate=False)
    
    df2 = df1.withColumn('tweet', regexp_replace('tweet', '[^a-z0-9A-Z`~!@#$%&<>?., ]', ''))
#     df2.show(truncate=False)
    
    df3 = df2.withColumn('tweet', regexp_replace('tweet', '[0-9`~!@#$%&<>?,\']', ''))
#     df3.show(truncate=False)
    
    df4 = df3.withColumn('tweet', regexp_replace('tweet', 'http://*.*.com', ''))
#     df4.show(truncate=False)
    
    df5 = df4.withColumn('tweet', regexp_replace('tweet', 'www.*.com', ''))
#     df5.show(truncate=False)
    
    df6 = df5.withColumn('tweet', regexp_replace('tweet', '\.', ''))
#     df6.show(truncate=False)
    
    tokenizer=Tokenizer(inputCol="tweet",outputCol="words")
    wordData=tokenizer.transform(df6)
#     wordData.show()
    
    remover=StopWordsRemover(inputCol="words",outputCol="word_clean")
    word_clean_data=remover.transform(wordData)
#     word_clean_data.show()
    
    stemmer = SnowballStemmer(language='english')
    stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens])
    count=CountVectorizer(inputCol="word_clean",outputCol="rawFeatures")
#     print(count)
    
    model=count.fit(word_clean_data)
#     print(model)
    
    featurizedData=model.transform(word_clean_data)
#     featurizedData.show()
    idf=IDF(inputCol="rawFeatures",outputCol="features")
    idfModel=idf.fit(featurizedData)
    rescaledData=idfModel.transform(featurizedData)
#     rescaledData.select("label","features").show()
    return rescaledData

In [5]:
def train_test_split(df):
    print("splitting dataset...")
    seed=0
    trainDf,testDf=df.randomSplit([0.7,0.3],seed)
    trainDf.count()
    testDf.count()
    return trainDf,testDf

In [6]:
def logistic_regression(train_data,test_data):
    print("Using logistic regression model with test_data...")
    d1 = {}
    d2 = {}
    
    lr = LogisticRegression(maxIter=15)
    paramGrid_lr = ParamGridBuilder().build()
    
    crossval_lr = CrossValidator(estimator=lr,estimatorParamMaps=paramGrid_lr,evaluator=BinaryClassificationEvaluator(),numFolds=8)
    cvModel_lr = crossval_lr.fit(train_data)
    best_model_lr = cvModel_lr.bestModel.summary
    
    report = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="label",metricName="areaUnderROC")
    p1 = report.evaluate(best_model_lr.predictions)
    
    pred_lr = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol="label",metricName="f1")
    p2 = pred_lr.evaluate(best_model_lr.predictions)
    
    pred_lr = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol="label",metricName="accuracy")
    p3 = pred_lr.evaluate(best_model_lr.predictions)
    
    train_fit_lr = best_model_lr.predictions.select('label','prediction')
    print(train_fit_lr.groupBy('label','prediction').count().show())
    
    d1['ROC'] = p1
    d1['F1'] = p2
    d1['Accuracy'] = p3
    
    predictions_lr = cvModel_lr.transform(test_data)
    print(predictions_lr.groupBy('label','prediction').count().show())
    
    my_eval_lr = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
    p4=my_eval_lr.evaluate(predictions_lr)
    
    my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
    p5=my_mc_lr.evaluate(predictions_lr)
    
    my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
    p6=my_mc_lr.evaluate(predictions_lr)
    
    d2['ROC'] = p4
    d2['F1']= p5
    d2['Accuracy'] = p6
    return d1,d2

In [7]:
def SVM(train_data,test_data):
    print("Using SVM model with test_data...")
    d1 = {}
    d2 = {}
    
    svm = LinearSVC(maxIter=5,regParam=0.01)
    model = svm.fit(train_data)
    
    train_pred = model.transform(train_data)
    print(train_pred.groupBy('label','prediction').count().show())
    
    my_eval_svm = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
    p1 = my_eval_svm.evaluate(train_pred)
    
    my_mc_svm = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
    p2 = my_mc_svm.evaluate(train_pred)
    
    my_mc_svm = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
    p3 = my_mc_svm.evaluate(train_pred)
    
    d1['ROC'] = p1
    d1['F1'] = p2
    d1['Accuracy'] = p3
    
    test_pred = model.transform(test_data)
    print(test_pred.groupBy('label','prediction').count().show())
    
    evaluator = BinaryClassificationEvaluator()
    evaluation = evaluator.evaluate(test_pred)
    
    my_eval_svm = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
    p4 = my_eval_svm.evaluate(test_pred)
    
    my_mc_svm = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
    p5 = my_mc_svm.evaluate(test_pred)
    
    my_mc_svm = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
    p6 = my_mc_svm.evaluate(test_pred)
    
    d2['ROC'] = p4
    d2['F1']= p5
    d2['Accuracy'] = p6
    
    return d1,d2

In [16]:
def naive_Bayes(train_data,test_data):
    print("Using Naive Bayes model with test_data...")
    d1 = {}
    d2 = {}
    nb = NaiveBayes()
    
    paramGrid_nb = ParamGridBuilder() \
        .addGrid(nb.smoothing, np.linspace(0.3, 10, 10)) \
        .build()
    crossval_nb = CrossValidator(estimator=nb, estimatorParamMaps=paramGrid_nb, evaluator=BinaryClassificationEvaluator(), numFolds= 5)
    cvModel_nb = crossval_nb.fit(train_data)
    
    train_pred = cvModel_nb.transform(train_data)
    print(train_pred.groupBy('label','prediction').count().show())
    
    my_eval_nb = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
    p1 = my_eval_nb.evaluate(train_pred)
    
    my_mc_nb = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
    p2 = my_mc_nb.evaluate(train_pred)
    
    my_mc_nb = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
    p3 = my_mc_nb.evaluate(train_pred)
    
    d1['ROC'] = p1
    d1['F1'] = p2
    d1['Accuracy'] = p3
    
    predictions_nb = cvModel_nb.transform(test_data)
    print(predictions_nb.groupBy('label','prediction').count().show())
    
    my_eval_nb = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
    p4 = my_eval_nb.evaluate(predictions_nb)
    
    my_mc_nb = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
    p5 = my_mc_nb.evaluate(predictions_nb)
    
    my_mc_nb = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
    p6 = my_mc_nb.evaluate(predictions_nb)
    
    d2['ROC'] = p4
    d2['F1']= p5
    d2['Accuracy'] = p6
    
    return d1,d2

In [9]:
def random_Forest(train_data,test_data):
    print("Using Random Forest model with test_data...")
    d1 = {}
    d2 = {}
    rf = RandomForestClassifier()
    rf_model = rf.fit(train_data)
    
    train_pred = rf_model.transform(train_data)
    print(train_pred.groupBy('label','prediction').count().show())
    
    my_eval_rf = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
    p1 = my_eval_rf.evaluate(train_pred)
    
    my_mc_rf = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
    p2 = my_mc_rf.evaluate(train_pred)
    
    my_mc_rf = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
    p3 = my_mc_rf.evaluate(train_pred)
    
    d1['ROC'] = p1
    d1['F1'] = p2
    d1['Accuracy'] = p3
    
    test_pred = rf_model.transform(test_data)
    print(test_pred.groupBy('label','prediction').count().show())
    my_eval_rf = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
    p4 = my_eval_rf.evaluate(test_pred)
    
    my_mc_rf = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
    p5 = my_mc_rf.evaluate(test_pred)
    
    my_mc_rf = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
    p6 = my_mc_rf.evaluate(test_pred)
    
    d2['ROC'] = p4
    d2['F1']= p5
    d2['Accuracy'] = p6
    return d1,d2

In [21]:
init_spark()
df=read_file("twitter.csv")
df=pre_process(df)
train_data,test_data=train_test_split(df)

initializing spark...
reading csv file...


  


preprocessing...
splitting dataset...


In [22]:
train_summary,test_summary=SVM(train_data,test_data)
print(train_summary)
print(test_summary)

train_summary,test_summary=naive_Bayes(train_data,test_data)
print(train_summary)
print(test_summary)

train_summary,test_summary=logistic_regression(train_data,test_data)
print(train_summary)
print(test_summary)

train_summary,test_summary=random_Forest(train_data,test_data)
print(train_summary)
print(test_summary)

Using SVM model with test_data...
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    0|       0.0|20733|
|    1|       0.0|  198|
|    1|       1.0| 1375|
|    0|       1.0|    4|
+-----+----------+-----+

None
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|  370|
|    0|       0.0| 8948|
|    1|       1.0|  306|
|    0|       1.0|   28|
+-----+----------+-----+

None
{'ROC': 0.9369664910967898, 'F1': 0.9906693279729695, 'Accuracy': 0.9909457642312864}
{'ROC': 0.7247716461517366, 'F1': 0.9521690921806005, 'Accuracy': 0.9587650227932035}
Using Naive Bayes model with test_data...
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    0|       0.0|20316|
|    1|       1.0| 1546|
|    1|       0.0|   27|
|    0|       1.0|  421|
+-----+----------+-----+

None
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|  109|
|    0|       0.0| 7363|
|    0|  