                                  CS491:Intro to Data Science Homework4
                            Twitter Sentiment Classification Using Apache Spark
                                  Group : Janani Neelakantan 670805407
                                          Yogeeta Monica Kuttabadkar 661868770

Installation instructions: 
1) Requires nltk package. Install using !pip install nltk if installing directly in the Jupyter notebook
2) Requires complete nltk download for all other dependencies required in this program such as stopword list, tokenize method, stemmer etc.
    - Do nltk.download()
    - When prompted give option d) and then enter
    - Again type 'all' and enter
3) Create a Spark Context. Ensure train.csv and test.csv are in the /home/jovyan/work folder. In addition, train.csv and test.csv should first be saved with UTF encoding format. The program does not handle conversion to UTF.

In [None]:
#!pip install nltk

In [None]:
import nltk
import pyspark
import re
import string
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem.snowball import SnowballStemmer
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel 
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel, LogisticRegressionWithSGD
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

In [None]:
sc = pyspark.SparkContext("local[*]")
trainFileName = "train.csv"
testFileName = "test.csv"

In [None]:
#nltk.download()

TWEET PROCESSING
The tweet processing for entire training and test data is performed using pyspark map. The following are the steps for cleaning:
- Each entry/row in the csv file is split by ','. This is done to separate out each column.
- One problem with the above is that since tweet text could also contain the character ',' the entire tweet text gets split into parts. The complete tweet text is then reframed using all the split parts.
- All characters are converted to lower case
- All digits are removed.
- @username is replaced with AT_USER
- URLs starting with www, http or https are replaced with URL
- Replace 'm with am, 'd with would, 're with are, 'll with will and more
- Replace 's with ' '. This will take care of words such as women's, bag's etc
- Replace more than 2 occurrences of a character in a word with two occurrences. For instance exxxcitteddd will become exxcittedd
- All special characters removed
- All stopwords mentioned in nltk are removed
- Snowball Stemmer used to reduce word to root form. We preferred Snowball over Porter because it is more accurate

For better cleaning, implement SpellChecker using enchant


In [None]:
def parseTweet(line):
    words=[]
    parts=line.split(',')
    label=parts[0]
    tweetText=parts[5]
    if len(parts)>5:                                         #Handling commas between the tweets
        for i in range(6,len(parts)):
            tweetText=tweetText+" "+parts[i]
    tweetText=tweetText.strip().lower()                      #remove trailing spaces
    tweetText = re.sub("\d", '', tweetText)                  #remove digits
    tweetText = re.sub(r'\-', ' ', tweetText)                #replace - with white space
    tweetText = re.sub(r'\@[\w]*','AT_USER', tweetText)      #replace @username with AT_USER
    tweetText = re.sub('(www\.[^\s]+)', ' URL ', tweetText)  #replace www. urls with URL
    tweetText = re.sub('(http://[^\s]+)',' URL ',tweetText)  #replace http: urls  with URL
    tweetText = re.sub('(https://[^\s]+)',' URL ',tweetText) #replace https: urls with URL
    tweetText = re.sub(r'\'m', ' am', tweetText)             #replace apostrophe m ('m) with am
    tweetText = re.sub(r'\'d', ' would', tweetText)          #replace apostrophe d ('d) with would
    tweetText = re.sub(r'\'re', ' are', tweetText)           #replace apostrophe re ('re) with are
    tweetText = re.sub(r'n\'t', ' not', tweetText)           #replace apostrophe t ('nt) with not
    tweetText = re.sub(r'\'ll', ' will', tweetText)          #replace apostrophe ll ('ll) with will
    tweetText = re.sub(r'\&', 'and', tweetText)              #replace & with and
    tweetText = re.sub(r'didnt ', 'did not', tweetText)      #replace didnt  with did not
    tweetText = re.sub(r'dont', 'do not', tweetText)         #replace dont with do not
    tweetText = re.sub(r'wont', 'will not', tweetText)       #replace wont with will not
    tweetText = re.sub(r'cant', 'can not', tweetText)        #replace cant with can not 
    tweetText = re.sub(r'wouldnt', 'would not', tweetText)   #replace wouldnt with would not
    tweetText = re.sub(r'couldnt', 'could not', tweetText)   #replace couldnt with could not
    tweetText = re.sub(r'isnt', 'is not', tweetText)         #replace isnt with is not
    tweetText = re.sub(r'wasnt', 'was not', tweetText)       #replace wasnt with was not
    tweetText = re.sub(r'\'s', ' ', tweetText)               #replacing any apostrphe ('s) with empty space
    tweetText = re.sub(r'([a-z])\1+',r'\1\1', tweetText)     #replacing more than 2 occurences of a charater
    #tweetText = re.sub('\W', ' ', tweetText)                 #replacing any alphanumeric characters
   
    ##Stop Words,Punctuation and Stemming
    stop_words=set(stopwords.words('english'))
    PUNCTUATION = set(string.punctuation)
    stemmer = SnowballStemmer('english')
    token_words=word_tokenize(tweetText)
    filtered_tweet=[w for w in token_words if not w in stop_words]
    for word in filtered_tweet:
        if(word == "AT_USER" or word == "URL"):
            words.append(word)
            continue
        punct_removed = ''.join([letter for letter in word if not letter in PUNCTUATION])  
        stemmedWord = stemmer.stem(punct_removed)
        words.append(str(stemmedWord))
    tweetText = ' '.join(words)
    return(label,tweetText)

NAIVE BAYES MODEL AND PREDICTION

In [None]:
# Train a Naive Bayes model on the training data
def NaiveBayesClassifier(trainRDD):
    NV_model = NaiveBayes.train(trainRDD)
    return NV_model

In [None]:
#Testing the model on the test data
def NaiveBayesPrediction(NV_model, testRDD):     
    FPR = []
    TPR = []
    
    # Compare predicted labels to actual labels
    prediction_and_labels = testRDD.map(lambda point: (float(NV_model.predict(point.features)), point.label))    
    # Filter to only correct predictions
    correct = prediction_and_labels.filter(lambda predicted: predicted[0] == predicted[1])    
    # Calculate and print accuracy rate
    accuracy =  correct.count() / float(testRDD.count())    
    print("NB_accuracy = " + str(accuracy))   
    
    labels = testRDD.map(lambda lp: lp.label).distinct().collect()
    calculateMetrics(prediction_and_labels, labels) 
    calculateBinaryMetrics(prediction_and_labels)
    return accuracy

LOGISTIC REGRESSION MODEL - LBFGS ; SGD ; PREDICTION

In [None]:
#Logistic Regression with LBFGS
def LogisticRegressionClassifierWithL2(trainRDD):
    # Build the model
    LR_model = LogisticRegressionWithLBFGS.train(trainRDD, regType="l2")
    return LR_model

In [None]:
#Logistic Regression with SGD
def LogisticRegressionClassifierSGD(trainRDD):
    # Build the model
    LR_model = LogisticRegressionWithSGD.train(trainRDD)
    return LR_model

In [None]:
# Evaluating the model on testing data
def LogisticRegressionPrediction(LR_model, testRDD):
    #labelsAndPreds = testRDD.map(lambda p: (p.label, float(LR_model.predict(p.features))))
    labelsAndPreds = testRDD.map(lambda lp: (float(LR_model.predict(lp.features)), lp.label))
    correct = labelsAndPreds.filter(lambda v: v[0] == v[1])
    accuracy = correct.count() / float(testRDD.count())
    print("LR_accuracy = " + str(accuracy))
    
    labels = testRDD.map(lambda lp: lp.label).distinct().collect()    
    calculateMetrics(labelsAndPreds, labels)
    calculateBinaryMetrics(labelsAndPreds)
    return accuracy  


DECISION TREE MODEL AND PREDICTION

In [None]:
#Decision Tree classifier 
def DecisionTreeClassifier(trainRDD):
    # Build the model
    DT_model = DecisionTree.trainClassifier(trainRDD, numClasses=2, categoricalFeaturesInfo={}, impurity='gini', maxDepth=5, maxBins=32)
    return DT_model    
    

In [None]:
#Decision Tree classifier 
def DecisionTreeRegressor(trainRDD):
    # Build the model
    DT_model = DecisionTree.trainRegressor(trainRDD, categoricalFeaturesInfo={}, impurity='variance', maxDepth=5, maxBins=32)
    return DT_model    
    

In [None]:
# Evaluating the model on test data
def DecisionTreePrediction(DT_model, testRDD):
    predictions = DT_model.predict(testRDD.map(lambda x: x.features))
    labelsAndPredictions = testRDD.map(lambda lp: lp.label).zip(predictions)
    DT_accuracy = labelsAndPredictions.filter(lambda t:t[0] == t[1]).count() / float(testRDD.count())
    print("DT_accuracy = " + str(DT_accuracy))
    
    labels = testRDD.map(lambda lp: lp.label).distinct().collect()    
    calculateMetrics(labelsAndPredictions, labels)
    calculateBinaryMetrics(labelsAndPredictions)
    return DT_accuracy
    

COMPUTING HASHING TF WITH 'N' NUMBER OF FEATURES
Create Labeled Point with label and TF feature

In [None]:
#Only HashingTF as the feature with n feature values
def computeTF(featureVal, data_cleaned):
    # Hashing term frequency vectorizer with 'x' features
    htf=HashingTF(featureVal)
    
    # Create an RDD of LabeledPoints using category labels as labels and tokenized, hashed text as feature vectors
    labeled_training_data = data_cleaned.map(lambda label: LabeledPoint(label[0], htf.transform(label[1])))
    
    return labeled_training_data

Compute hashing TF with all features

In [None]:
#Only HashingTF as the feature with all feature values
def computeTFAll(data_cleaned):
    # Hashing term frequency vectorizer with all features
    htf=HashingTF()
    
    # Create an RDD of LabeledPoints using category labels as labels and tokenized, hashed text as feature vectors
    labeled_training_data = data_cleaned.map(lambda label: LabeledPoint(label[0], htf.transform(label[1])))
    
    return labeled_training_data

In [None]:
#Computing TF-IDF Type 2
def computeTFIDF(data_cleaned):
    # Split data into labels and features, transform
    labels = data_cleaned.map(lambda doc: doc[0])
    
    tf = HashingTF(100).transform(data_cleaned.map(lambda doc:doc[1]))
    idf = IDF().fit(tf)
    tfidf = idf.transform(tf)
    
    # Combine using zip
    labeled_training_data = labels.zip(tfidf).map(lambda x: LabeledPoint(x[0], x[1]))
    
    return labeled_training_data

Calculation of Metrics

In [None]:
#Method to calculate metrics of predictions from trained model
def calculateMetrics(labels_and_predictions, labels):
    TPR = []
    FPR = []
    metrics = MulticlassMetrics(labels_and_predictions)    
    
    # Overall statistics
    precision = metrics.weightedPrecision
    recall = metrics.weightedRecall
    f1Score = metrics.weightedFMeasure()
    confusionMatrix = metrics.confusionMatrix().toArray()
    print("Summary Stats")
    print("Average Precision = %s" % precision)
    print("Average Recall = %s" % recall)
    print("Average F1 Score = %s" % f1Score)
    print("Confusion Matrix = %s" % confusionMatrix)
    
    '''
    # Weighted statistics
    print("Weighted recall = %s" % metrics.weightedRecall())
    print("Weighted precision = %s" % metrics.weightedPrecision())
    print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
    print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
    '''
    FPRW = metrics.weightedFalsePositiveRate
    TPRW = metrics.weightedTruePositiveRate
    print("Weighted false positive rate = %s" % FPRW)
    print("Weighted true positive rate = %s" % TPRW)        
    
    for label in sorted(labels):
        FPR.append(metrics.falsePositiveRate(label))
        TPR.append(metrics.truePositiveRate(label))  
        
    plotROCcurve(FPR, TPR)


In [None]:
def calculateBinaryMetrics(labels_and_predictions):
    metrics = BinaryClassificationMetrics(labels_and_predictions)
    print("Area under PR = %s" % metrics.areaUnderPR)
    
    # Area under ROC curve
    print("Area under ROC = %s" % metrics.areaUnderROC)

In [None]:
def plotROCcurve(falsepositive, truepositive):
    plt.figure(figsize=(4, 4), dpi=80)
    plt.xlabel("False Positive Rate", fontsize=14)
    plt.ylabel("True Positive Rate", fontsize=14)
    plt.title("ROC Curve", fontsize=14)
    plt.plot(falsepositive, truepositive, color='red', linewidth=2)
    plt.show()

In [None]:
def plotAccuracy(classifierName, trainingAccuracy, KFoldAccuracy, testAccuracy):
    myfig = plt.figure()
    myfig.suptitle(classifierName + ' - Accuracy Comparison', fontsize=14, fontweight='bold')
    
    myaxes = myfig.add_subplot(111)
    myfig.subplots_adjust(top=0.85)
    myaxes.set_title('Data')
    myaxes.set_ylabel('Accuracy Percentage')
    myaxes.xaxis.set_visible(False)
    
    N = 3
    x = [1,2,3]
    y = [trainingAccuracy, KFoldAccuracy, testAccuracy]
    colors = ['red','green','blue']
    area = 150
    
    myaxes.annotate('Training Accuracy', xy=(x[0],y[0]), xytext=(x[0], (y[0]-0.001)))   
    myaxes.annotate('Avg K-Fold Accuracy', xy=(x[1],y[1]), xytext=(x[1], (y[1]-0.001)))
    myaxes.annotate('Test Accuracy', xy=(x[2],y[2]), xytext=(x[2], (y[2]-0.001)))
    
    plt.scatter(x, y, s=area, c=colors, alpha=0.5)
    
    auc = np.trapz(y,x)
    plt.show()


In [None]:
#Load train.csv as RDD and perform processing/cleaning
alltrainData = sc.textFile(trainFileName)
mapdata = alltrainData.map(parseTweet)

#Split each tweet into words
train_data_cleaned = mapdata.map(lambda label: (label[0],word_tokenize(label[1])))

In [None]:
# Split data 70/30 into training and test data sets
train_random_set, test_random_set = trainedData1.randomSplit([0.7, 0.3])

In [None]:
#Computing feature vector TF for proceesed training data
trainedData1 = computeTF(50000, train_data_cleaned)    #With number of features specified
#trainedData1 = computeTFAll(train_data_cleaned)       #With all features


In [None]:
#Naive Bayes training on all of the processed training data
Naive_model = NaiveBayesClassifier(trainedData1)
Naive_model.save(sc, "/home/jovyan/work/myNaiveBayesModel10")

In [None]:
#Testing/prediction of Naives Bayes on training data
Naive_model = NaiveBayesModel.load(sc, "/home/jovyan/work/myNaiveBayesModel10")
Naive_accuracy = NaiveBayesPrediction(Naive_model, trainedData1)
print("Naive Bayes accuracy = " + str(Naive_accuracy))

In [None]:
#Computing feature vector TF for processed training data
trainedData2 = computeTF(50000, train_data_cleaned)    #With number of features specified
#trainedData = computeTFAll(train_data_cleaned)          #With all features


In [None]:
#Logistic Regression training on all of the training data
LR_model = LogisticRegressionClassifierWithL2(trainedData2)     
#LR_model = LogisticRegressionClassifierSGD(trainedData)
LR_model.save(sc, "/home/jovyan/work/myLRModel5")

In [None]:
#Testing/prediction of Logistic Regression on training data
LR_model = LogisticRegressionModel.load(sc, "/home/jovyan/work/myLRModel5")
LR_accuracy = LogisticRegressionPrediction(LR_model, trainedData2)
print("Logistic Regression accuracy = " + str(LR_accuracy))

In [None]:
#Decision Tree training on all of the training data
trainedData3 = computeTF(5000, train_data_cleaned)    #With number of features specified
#trainedData = computeTFAll(data_cleaned)            #With all features

In [None]:
DT_model = DecisionTreeClassifier(trainedData3)
DT_model.save(sc, "/home/jovyan/work/myDTModel4")

In [None]:
'''
#KFold method1
RDDlist = []
train1, test1 = labeled_training_data.randomSplit([0.9, 0.1])
RDDlist.append(test1)
train2, test2 = train1.randomSplit([0.9, 0.1])
RDDlist.append(test2)
train3, test3 = train2.randomSplit([0.9, 0.11])
RDDlist.append(test3)
train4, test4 = train3.randomSplit([0.9, 0.1])
RDDlist.append(test4)
train5, test5 = train4.randomSplit([0.9, 0.1])
RDDlist.append(test5)
train6, test6 = train5.randomSplit([0.9, 0.1])
RDDlist.append(test6)
train7, test7 = train6.randomSplit([0.9, 0.1])
RDDlist.append(test7)
train8, test8 = train7.randomSplit([0.9, 0.1])
RDDlist.append(test8)
train9, test9 = train8.randomSplit([0.9, 0.1])
RDDlist.append(test9)
train10, test10 = train9.randomSplit([0.9, 0.1])
RDDlist.append(test10)

for i in range(0,10):
    print(RDDlist[i].count())
'''


In [None]:
#10-Fold validation method 
def KFoldValidation(modelName, labeled_training_data):
    print(modelName)
    RDDlist = []
    RDDlist = labeled_training_data.randomSplit([0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1])
    accuracy_list = []
    
    for i in range(0,10):
        k=1
        training_KF_data = sc.emptyRDD()        
        for j in range(0,10):
            if(i == j):
                test_KF_data = RDDlist[j]
            else:
                if(k == 1):
                    training_KF_data = RDDlist[j]
                else:
                    training_KF_data = training_KF_data.union(RDDlist[j])   
                k = k + 1
            
            
        if(modelName is 'NaiveBayes'):
            NV_model = NaiveBayesClassifier(training_KF_data)
            NV_accuracy = NaiveBayesPrediction(NV_model, test_KF_data)
            print(NV_accuracy)
            accuracy_list.append(NV_accuracy)
                
        elif(modelName is 'LogisticRegression'):
            LR_model = LogisticRegressionClassifierWithL2(training_KF_data)
            LR_accuracy = LogisticRegressionPrediction(LR_model, test_KF_data)
            print(LR_accuracy)
            accuracy_list.append(LR_accuracy)
        
        else:
            DT_model = DecisionTreeClassifier(training_KF_data)
            DT_accuracy = DecisionTreePrediction(DT_model, test_KF_data)
            print(DT_accuracy)
            accuracy_list.append(DT_accuracy)
                
    avg_accuracy = sum(accuracy_list)/10.0
    print("10 fold validation accuracy for " + modelName + " = " + str(avg_accuracy))


In [None]:
trainedDataNew = computeTFAll(train_data_cleaned) 
KFoldValidation('NaiveBayes', trainedDataNew)

In [None]:
trainedDataNew = computeTFAll(train_data_cleaned) 
KFoldValidation('LogisticRegression', trainedDataNew)

In [None]:
trainedDataNew = computeTF(5000,train_data_cleaned) 
KFoldValidation('DecisionTree', trainedDataNew)

In [None]:
#Load test.csv as RDD and perform processing/cleaning
alltestData = sc.textFile(testFileName)
mapdata = alltestData.map(parseTweet)

#Split each tweet into words
test_data_cleaned = mapdata.map(lambda label: (label[0],word_tokenize(label[1])))

In [None]:
testData1 = computeTF(50000, test_data_cleaned)
#testData1 = computeTFAll(test_data_cleaned)

In [None]:
#Testing/prediction of Naives Bayes on test data
Naive_model = NaiveBayesModel.load(sc, "/home/jovyan/work/myNaiveBayesModel2")
Naive_accuracy = NaiveBayesPrediction(Naive_model, testData1)
print("Naive Bayes accuracy = " + str(Naive_accuracy))

In [None]:
testData3 = computeTF(50000, test_data_cleaned)
#testData2 = computeTFAll(test_data_cleaned)

In [None]:
#Testing/prediction of Logistic Regression on test data
LR_model = LogisticRegressionModel.load(sc, "/home/jovyan/work/myLRModel5")
LR_accuracy = LogisticRegressionPrediction(LR_model, testData3)
print("Logistic Regression accuracy = " + str(LR_accuracy))

In [None]:
testData3 = computeTF(1000, test_data_cleaned)

In [None]:
#Testing/prediction of Decision Tree on test data
DT_model = DecisionTreeModel.load(sc, "/home/jovyan/work/myDTModel3")
DT_accuracy = DecisionTreePrediction(DT_model, testData3)
print("Decision Tree accuracy = " + str(DT_accuracy))

In [None]:
plotAccuracy('Naive Bayes',81.81, 71.79, 78.27)
plotAccuracy('Logistic Regression', 74.56, 73.89, 74.09)
plotAccuracy('Decision Tree', 61.55, 0 , 59.61)
