In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import StopWordsRemover

from nltk.corpus import stopwords
import numpy as np
import pandas as pd

from pyspark.ml.classification import NaiveBayes, LogisticRegression, DecisionTreeClassifier
import string
from pyspark.ml.feature import HashingTF, IDF, Word2Vec
from pyspark.mllib.linalg import Vectors
import timeit
import sys

In [2]:
def naive_bayes_classifier(training_df, testing_df):
    print("\nNaive Bayes classifier\n")
    
    nb = NaiveBayes()
    start = timeit.default_timer()
    model = nb.fit(training_df)
    stop = timeit.default_timer()
    
    print('NB Training Time: ', stop - start)  
    return model.transform(testing_df).select(["label", "words", "prediction"])


def logistic_regression_classifier(training_df, testing_df):
    print("\nLogistic Regression classifier\n")
    
    lor = LogisticRegression(regParam=0.01)
    start = timeit.default_timer()
    model = lor.fit(training_df)
    stop = timeit.default_timer()
    
    print('LR Training Time: ', stop - start)  
    return model.transform(testing_df).select(["label", "words", "prediction"])


def calculate_accuracy(result_df):
    return 1.0 * result_df.filter(result_df.label == result_df.prediction).count() / result_df.count()


def generate_confusion_matrix(result_df):
    true_positives = result_df.filter((result_df.label == 1.0) & (result_df.prediction == 1.0)).count()
    true_negatives = result_df.filter((result_df.label == 0.0) & (result_df.prediction == 0.0)).count()
    false_positives = result_df.filter((result_df.label == 0.0) & (result_df.prediction == 1.0)).count()
    false_negatives = result_df.filter((result_df.label == 1.0) & (result_df.prediction == 0.0)).count()

    matrix = {"Positive": pd.Series([true_positives, false_positives], index=["Positive", "Negative"]),
              "Negative": pd.Series([false_negatives, true_negatives], index=["Positive", "Negative"])}

    df = pd.DataFrame(matrix)
    df.columns.name = "Actual / Predicted"
    return df


def remove_punctuation(sentence):
    punctuations = list(string.punctuation)
    extra_punctuations = ['.', '``', '...', '\'s', '--', '-', 'n\'t', '_', '–']
    punctuations += extra_punctuations
    filtered = [w for w in sentence.lower() if w not in punctuations]
    return ("".join(filtered)).split()


def tf_idf(data_rdd):
    data_rdd_df = data_rdd.toDF()
    hashing_tf = HashingTF(inputCol="words", outputCol="tf_features")
    tf_data = hashing_tf.transform(data_rdd_df)
    idf_data = IDF(inputCol="tf_features", outputCol="features").fit(tf_data)
    tf_idf_data = idf_data.transform(tf_data)
    return tf_idf_data.select(["label", "words", "features"])


def preprocessing(file):
    data = sc.textFile(file)
    col_rdd = data.map(lambda x: (x.split('\t')[0], x[-1]))
    punctuation_removed_rdd = col_rdd.map(lambda x: (remove_punctuation(x[0]), float(x[1])))
    data_df = sqlContext.createDataFrame(punctuation_removed_rdd, ["text", "label"])    
    remover = StopWordsRemover(inputCol="text", outputCol="words", stopWords=stopwords.words('english'))
    a = remover.transform(data_df).select(["label", "words"])
    return a


def show_evaluation_stats(predicted_df):
    accuracy = calculate_accuracy(predicted_df)
    print("Model Accuracy: {}".format(round(accuracy*100, 2)))
    
    confusion_table = generate_confusion_matrix(predicted_df)
    print("\nConfusion Matrix:")
    print(confusion_table)    

In [4]:
if __name__ == "__main__":
    conf = SparkConf()
    sc = SparkContext.getOrCreate(conf=conf)
    sqlContext = SQLContext(sc)
    
    filtered_data_df = preprocessing("grtweetsdataset/grTweets.csv")

    training, test = filtered_data_df.rdd.randomSplit([0.7, 0.3], seed=0)

    train_df = tf_idf(training)
    test_df = tf_idf(test)

In [5]:
nb_predicted_df = naive_bayes_classifier(train_df, test_df)
show_evaluation_stats(nb_predicted_df)


Naive Bayes classifier

NB Training Time:  2.3180709469997964
Model Accuracy: 80.6

Confusion Matrix:
Actual / Predicted  Positive  Negative
Positive                 955       325
Negative                 267      1504


In [6]:
lor_predicted_df = logistic_regression_classifier(train_df, test_df)
show_evaluation_stats(lor_predicted_df)


Logistic Regression classifier

LR Training Time:  6.076945770997554
Model Accuracy: 81.51

Confusion Matrix:
Actual / Predicted  Positive  Negative
Positive                 965       315
Negative                 249      1522
