In [81]:
import pyspark
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.evaluation import MulticlassMetrics
from nltk.stem import PorterStemmer
import string, timeit
import matplotlib.pyplot as pyplot

In [16]:
exclude = set(string.punctuation)
stem_porter = PorterStemmer()

In [17]:
%matplotlib inline

In [18]:
stop_words_set = ['out', '', 'we', 'was', 'how', 'myself', 'for', 'they', 'about', 'then', 'both', 'so', 
                  'don', 'as', 'any', 'after', 'you', 'why', 'been', 'where', 'by', 'yourself', 'a', 'did', 
                  'their', 'doing', 'be', 
                  'further', 'ours', 'now', 'am', 'her', 'yourselves', 'that', 'my', 'what', 'to', 'not', 
                  'own', 'there', 
                  'this', 'each', 'all', 'more', 'me', 'which', 'himself', 'nor', 'other', 'who', 'same', 
                  'at', 'such', 
                  't', 'up', 'than', 'can', 'too', 'these', 'while', 'ourselves', 'before', 'i', 'he', 'our', 
                  'its', 
                  'but', 'with', 'those', 'because', 'the', 'it', 'hers', 'just', 'between', 'over', 'had', 'does', 
                  'have', 'and', 'or', 'some', 'only', 'when', 'below', 'in', 'if', 'theirs', 'again', 'his', 
                  'whom', 
                  'above', 'should', 'itself', 'themselves', 'until', 'are', 'she', 'no', 'from', 'into', 
                  'will', 
                  'your', 'few', 'herself', 'of', 'has', 'down', 'were', 'once', 'having', 'them', 'under', 'him', 
                  'do', 'on', 'an', 'yours', 'being', 'off', 'very', 'through', 'most', 'against', 'here', 
                  'is', 's', 'during']

In [53]:
train_csv_file = sc.textFile("../data/train.csv")

# Cleaning Functions

In [20]:
def strip_punctuations(words_array):
    cleaned_words = []
    for word in words_array:
        new_word = ''.join(ch for ch in word if ch not in exclude)
        # new_word = stem_porter.stem(new_word)
        # new_word = word.strip('.,"')
        if len(new_word) != 0:
           cleaned_words.append(new_word)
    return cleaned_words


def replace_URL(words_array):
    new_words_array = []
    for single_word in words_array:
        new_word = single_word
        if single_word.startswith("www.") or single_word.startswith("http://") or single_word.startswith("https://"):
            new_word = "URL"
        
        new_words_array.append(new_word)
    return new_words_array


def replace_AT_USER(words_array):
    new_words_array = []
    for single_word in words_array:
        new_word = single_word
        if single_word.startswith("@"):
            new_word = "ATUSER"
        
        new_words_array.append(new_word)
    return new_words_array


def clean_numeric_words(words_array):
    new_words_array = []
    for single_word in words_array:
        if len(single_word) >= 2 and single_word[0].isalpha() and single_word[1].isalpha():
            new_words_array.append(single_word)
    return new_words_array


def remove_stop_words(words_array):
    new_words_array = []
    for single_word in words_array:
        if single_word not in stop_words_set:
            new_words_array.append(single_word)
    return new_words_array

In [21]:
def clean_bag_of_words(bag_of_words):
    # clean the given bag of words.
    new_words = bag_of_words.lower().split()
    new_words = replace_AT_USER(new_words)
    new_words = replace_URL(new_words)
    new_words = strip_punctuations(new_words)
    new_words = clean_numeric_words(new_words)
    # new_words = remove_stop_words(new_words)
    # print(type(cleaned_words))
    return new_words

In [54]:
def create_bag_of_words(tweet_text):
    # create bag of words in this module. 
    bag_of_cleaned_words = clean_bag_of_words(tweet_text)
    return bag_of_cleaned_words

# Extracting Tweets and Labels 

In [61]:
def extract_clean_tweet_words(single_line):
    filtered_line = single_line.split(",", 5)
    labeled_point_label = float(filtered_line[0][1])
    tweet_text = filtered_line[-1]
    bag_of_cleaned_words = create_bag_of_words(tweet_text)
    return bag_of_cleaned_words 


def extract_polarity(single_line):
    filtered_line = single_line.split(",", 5)
    labeled_point_label = float(filtered_line[0][1])
    return labeled_point_label

In [62]:
train_clean_words = train_csv_file.map(extract_clean_tweet_words)
train_polarity_rdd = train_csv_file.map(extract_polarity)

In [63]:
def apply_tf_idf(documents):
    hashingTF = HashingTF()
    tf = hashingTF.transform(documents)
    tf.cache()
    idf = IDF().fit(tf)
    tfidf = idf.transform(tf)
    return tfidf

In [64]:
train_data_idf = apply_tf_idf(train_clean_words)

In [65]:
train_clean_words_list = train_data_idf.collect()
train_polarity_list = train_polarity_rdd.collect()

train_data_idf_list = []
for i in range(len(train_clean_words_list)):
    train_data_idf_list.append(LabeledPoint(train_polarity_list[i], train_clean_words_list[i]))

train_data = sc.parallelize(train_data_idf_list)

In [28]:
# for single_line in train_csv_file.take(100):
#     filtered_line = single_line.split(",", 5)
#     labeled_point_label = filtered_line[0][1]
#     tweet_text = filtered_line[-1]
#     bag_of_cleaned_words = create_bag_of_words(tweet_text)
#     hashingTF = HashingTF()
#     hashing_tf_features = hashingTF.transform(bag_of_cleaned_words)
#     print(bag_of_cleaned_words)
#     # print(hashing_tf_features)
#     # print(labeled_point_label)
#     # print(hashing_tf_features)

In [66]:
def extract_labeled_point(single_line):
    filtered_line = single_line.split(",", 5)
    labeled_point_label = float(filtered_line[0][1])
    tweet_text = filtered_line[-1]
    bag_of_cleaned_words = create_bag_of_words(tweet_text)
    hashingTF = HashingTF()
    hashing_tf_features = hashingTF.transform(bag_of_cleaned_words)
    return LabeledPoint(labeled_point_label, hashing_tf_features)


def calculate_accuracy_count(predictionAndLabel, total_count):
    accuracy_count = 0
    for x in predictionAndLabel.collect():
        if x[0] == x[1]: 
            accuracy_count += 1
    
    accuracy = 1.0 * accuracy_count / total_count
    return accuracy

# Testing on train data itself

## Naive Bayes

In [68]:
%time naive_bayes_model = NaiveBayes.train(train_data, 1.0)

CPU times: user 58.3 s, sys: 14.8 s, total: 1min 13s
Wall time: 1min 14s


In [69]:
predictionAndLabel = train_data.map(lambda p: (float(naive_bayes_model.predict(p.features)), p.label))
naive_bayes_training_data_accuracy = calculate_accuracy_count(predictionAndLabel, train_data.count())
print("Naive Bayes Accuracy on Training Data")
print(naive_bayes_training_data_accuracy)

Naive Bayes Accuracy on Training Data
0.9248375


## Logistic Regression

In [32]:
%time logistic_regression_model = LogisticRegressionWithLBFGS.train(train_data, iterations=100)

CPU times: user 28.4 s, sys: 7.25 s, total: 35.6 s
Wall time: 1min 15s


In [33]:
predictionAndLabel = train_data.map(lambda p: (float(logistic_regression_model.predict(p.features)), p.label))
logistic_regression_training_data_accuracy = calculate_accuracy_count(predictionAndLabel, train_data.count())
print("Logistic Regression Accuracy on Training Data")
print(logistic_regression_training_data_accuracy)

0.9695125


# Test on main testing data

## Naive Bayes

In [34]:
%time naive_bayes_model = NaiveBayes.train(train_data, 1.0)

CPU times: user 57 s, sys: 14.4 s, total: 1min 11s
Wall time: 1min 12s


In [70]:
test_csv_file = sc.textFile("../data/test.csv")

In [71]:
test_data = test_csv_file.map(extract_labeled_point)

In [74]:
predictionAndLabel = test_data.map(lambda p: (float(naive_bayes_model.predict(p.features)), p.label))
naive_bayes_testing_data_accuracy = calculate_accuracy_count(predictionAndLabel, test_data.count())
print("Naive Bayes Accuracy on Testing Data")
print(naive_bayes_testing_data_accuracy)

Naive Bayes Accuracy on Testing Data
0.7604456824512534


### Precision, Recall, F-1 Score and Confusion Matrix

In [76]:
metrics = MulticlassMetrics(predictionAndLabel)
naive_bayes_precision = metrics.precision()
print("Naive Bayes Precision")
print(naive_bayes_precision)
naive_bayes_recall = metrics.recall()
print("Naive Bayes Recall")
print(naive_bayes_recall)
naive_bayes_f1_score = metrics.fMeasure()
print("Naive Bayes F-1 Score")
print(naive_bayes_f1_score)
naive_bayes_confusion_matric = metrics.confusionMatrix()
print("Naive Bayes Confusion Matrix")
print(naive_bayes_confusion_matric)

Naive Bayes Precision
0.7604456824512534
Naive Bayes Recall
0.7604456824512534
Naive Bayes F-1 Score
0.7604456824512534
Naive Bayes Confusion Matrix
DenseMatrix([[ 136.,   41.],
             [  45.,  137.]])


## Logistic Regression

In [78]:
%time logistic_regression_model = LogisticRegressionWithLBFGS.train(train_data, iterations=100)

CPU times: user 28.1 s, sys: 7.15 s, total: 35.2 s
Wall time: 1min 13s


In [79]:
predictionAndLabel = test_data.map(lambda p: (float(logistic_regression_model.predict(p.features)), p.label))
logistic_regression_testing_data_accuracy = calculate_accuracy_count(predictionAndLabel, test_data.count())
print("Logistic Regression Accuracy on Testing Data")
print(logistic_regression_testing_data_accuracy)

Logistic Regression Accuracy on Testing Data
0.7381615598885793


### Precision, Recall, F-1 Score and Confusion Matrix

In [80]:
metrics = MulticlassMetrics(predictionAndLabel)
logistic_regression_precision = metrics.precision()
print("Logistic Regression Precision")
print(logistic_regression_precision)
logistic_regression_recall = metrics.recall()
print("Logistic Regression Recall")
print(logistic_regression_recall)
logistic_regression_f1_score = metrics.fMeasure()
print("Logistic Regression F-1 Score")
print(logistic_regression_f1_score)
logistic_regression_confusion_matric = metrics.confusionMatrix()
print("Logistic Regression Confusion Matrix")
print(logistic_regression_confusion_matric)

Logistic Regression Precision
0.7381615598885793
Logistic Regression Recall
0.7381615598885793
Logistic Regression F-1 Score
0.7381615598885793
Logistic Regression Confusion Matrix
DenseMatrix([[ 122.,   55.],
             [  39.,  143.]])


# Testing by doing random split

In [40]:
training, test = train_data.randomSplit([0.8, 0.2], seed=0)

## Applying NaiveBayes model

In [41]:
%time naive_bayes_model = NaiveBayes.train(training, 1.0)

CPU times: user 57.1 s, sys: 14.5 s, total: 1min 11s
Wall time: 1min 12s


In [42]:
predictionAndLabel = test.map(lambda p: (float(naive_bayes_model.predict(p.features)), p.label))
naive_bayes_split_data_accuracy = calculate_accuracy_count(predictionAndLabel, test.count())
print(naive_bayes_split_data_accuracy)

0.7128749767095212


## Applying Logistic Regression model

In [43]:
%time logistic_regression_model = LogisticRegressionWithLBFGS.train(training, iterations=100)

CPU times: user 28.3 s, sys: 7.3 s, total: 35.6 s
Wall time: 1min 14s


In [44]:
predictionAndLabel = test.map(lambda p: (float(logistic_regression_model.predict(p.features)), p.label))
logistic_regression_split_data_accuracy = calculate_accuracy_count(predictionAndLabel, test.count())
print(logistic_regression_split_data_accuracy)

0.7324389789454071


# Testing by K-Fold Cross Validation

## Apply Naive Bayes

In [49]:
split_rdd_list = train_data.randomSplit([0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1], seed=0)

In [50]:
def calculate_naive_bayes_average_accuracy():
    avg = 0
    length_rdd_list = len(split_rdd_list)
    for i in range(length_rdd_list):
        training_k_fold_data = None
        test_k_fold_data = None
        for j in range(length_rdd_list):
            if j != i:
                if training_k_fold_data is None:
                    training_k_fold_data = split_rdd_list[j]
                else:
                    training_k_fold_data = training_k_fold_data.union(split_rdd_list[j])
            else:
                test_k_fold_data = split_rdd_list[i]

        naive_bayes_model = NaiveBayes.train(training_k_fold_data, 1.0)
        predictionAndLabel = test_k_fold_data.map(lambda p: (float(naive_bayes_model.predict(p.features)), p.label))
        accuracy = calculate_accuracy_count(predictionAndLabel, test_k_fold_data.count())
        avg += accuracy
    return avg/length_rdd_list

    
naive_bayes_k_fold_accuracy = calculate_naive_bayes_average_accuracy()
print(naive_bayes_k_fold_accuracy)

0.7117267652341983


## Apply Logistic Regression

In [51]:
split_rdd_list = train_data.randomSplit([0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1], seed=0)

In [52]:
def calculate_logistic_regression_average_accuracy():
    avg = 0
    length_rdd_list = len(split_rdd_list)
    for i in range(length_rdd_list):
        training_k_fold_data = None
        test_k_fold_data = None
        for j in range(length_rdd_list):
            if j != i:
                if training_k_fold_data is None:
                    training_k_fold_data = split_rdd_list[j]
                else:
                    training_k_fold_data = training_k_fold_data.union(split_rdd_list[j])
            else:
                test_k_fold_data = split_rdd_list[i]

        logistic_regression_model = LogisticRegressionWithLBFGS.train(training_k_fold_data, iterations=100)
        predictionAndLabel = test_k_fold_data.map(lambda p: (float(logistic_regression_model.predict(p.features)), p.label))
        accuracy = calculate_accuracy_count(predictionAndLabel, test_k_fold_data.count())
        avg += accuracy
    return avg/length_rdd_list


logistic_regression_k_fold_accuracy = calculate_logistic_regression_average_accuracy()
print(logistic_regression_k_fold_accuracy)

0.7304395160784233


# Plotting

## Naive Bayes Plotting

## Logistic Regression Plotting