In [None]:
#Additional Installation
!pip install --user nltk

In [None]:
#Initializing a spark context (ONLY ONCE)
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
conf = SparkConf().setAppName("building a warehouse")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)

In [None]:
#Functions to preprocess data. Also the training dataset is preprocessed.
from nltk.corpus import stopwords
import csv
import re
import nltk
nltk.download('stopwords')
#Retrieve the training data that we need to provide to the classifier
def splitTrainingData(trainingSamples):
    trainingSamples = csv.reader([trainingSamples], delimiter=',', quotechar='"')
    trainList = []
    for line in trainingSamples:
        trainList.append(line[0])
        trainList.append(line[5])
    return trainList
data1 = sc.textFile("train.csv").map(splitTrainingData)
#Process the data retrieved
def clean(trainingSamples):
    tweet = trainingSamples[1]
    tweet = re.sub('@[\w]*','ATUSER',tweet)
    tweet = tweet.lower()
    tweet = re.sub('[0-9](\w*)','',tweet)
    tweet = re.sub("(http|https)://[\w\-]+(\.[\w\-]+)+\S*", " URL ", tweet)
    tweet = re.sub(r'(.)\1{2,}', r'\1\1', tweet)
    trainingSamples[1] = tweet.translate ({ord(c): " " for c in "!@#$%^&*()[]{};:,./<>?\|~-=_+"})
    return trainingSamples
def removeStopWords(trainingSamples):
    tweet = trainingSamples[1]
    stop = stopwords.words('english')
    processedTweet = ''
    for word in tweet.split(): # simple tokenization
        if word not in stop:
            if "'" not in word: 
                processedTweet = processedTweet + " " + word
            else:
                mylist = [i for i in word.split("'") if i != '']
                if(len(mylist)==1):
                    processedTweet = processedTweet + " " + mylist[0]
    trainingSamples[1] = processedTweet
    return trainingSamples
cleanData = data1.map(clean).map(removeStopWords)

In [None]:
#Creating a LabeledPoint-(features, labels) to train the classifiers
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

labels = cleanData.map(lambda x: x[0], preservesPartitioning=True)
tf = HashingTF().transform(cleanData.map(lambda x: x[1].split(), preservesPartitioning=True))
training = labels.zip(tf).map(lambda x: LabeledPoint(x[0],x[1]))


In [None]:
#Traning Naive bayes Model 
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
model = NaiveBayes.train(training, 1.0)

In [None]:
#Testing the Naive Bayes Model performance(Training Data)
predictionAndLabel = training.map(lambda p: (float(model.predict(p.features)), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda x: x[0] == x[1]).count() / training.count()

from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

metrics = MulticlassMetrics(predictionAndLabel)
# Overall statistics
resultMatrix = metrics.confusionMatrix().toArray()
tp = resultMatrix[0][0]
fp = resultMatrix[0][1]
fn = resultMatrix[1][0]
tn = resultMatrix[1][1]
precision = tp/(tp+fp)
recall = tp/(tp+fn)
f1Score = (precision * recall)/(precision + recall)

print("Naive Bayesian Classifier using training data:")
print ("Accuracy = " + str(accuracy * 100))
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

print(" ")


In [None]:
# Retrieving and processing test data - input to both the classifiers
cleanTestData = sc.textFile("test.csv").map(splitTrainingData).map(clean).map(removeStopWords)

labelsTestData = cleanTestData.map(lambda x: x[0], preservesPartitioning=True)
tfTestData = HashingTF().transform(cleanTestData.map(lambda x: x[1].split(), preservesPartitioning=True))

testData = labelsTestData.zip(tfTestData).map(lambda x: LabeledPoint(x[0], x[1]))

In [None]:
#Testing the Naive Bayesian Model for performance(Test Data)
predictTestData = testData.map(lambda p: (float(model.predict(p.features)), p.label))
accuracyTestData = 1.0 * predictTestData.filter(lambda x: x[0] == x[1]).count() / testData.count()

metricsTest = MulticlassMetrics(predictTestData )
# Overall statistics
resultMatrixTest = metricsTest.confusionMatrix().toArray()
tpTest = resultMatrixTest[0][0]
fpTest = resultMatrixTest[0][1]
fnTest = resultMatrixTest[1][0]
tnTest = resultMatrixTest[1][1]
precisionTest = tpTest/(tpTest+fpTest)
recallTest = tpTest/(tpTest+fnTest)
f1ScoreTest = (precisionTest * recallTest)/(precisionTest + recallTest)
print("Naive Bayesian Classifier using test data:")
print ("Accuracy = " + str(accuracyTestData * 100))
print("Precision = %s" % precisionTest)
print("Recall = %s" % recallTest)
print("F1 Score = %s" % f1ScoreTest)
print(" ")

In [None]:
#Building the classifier two- Logistic Regression Model
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
modelTwo = LogisticRegressionWithLBFGS.train(training,iterations=10000)

In [None]:
#Testing performance of Logistic Classifier - using training data

from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

predictionAndLabelTwo = training.map(lambda p: (float(modelTwo.predict(p.features)), p.label))
accuracyModelTwo = 1.0 * predictionAndLabelTwo.filter(lambda x: x[0] == x[1]).count() / training.count()

metricsTwo = MulticlassMetrics(predictionAndLabelTwo)

resultMatrixTwo = metricsTwo.confusionMatrix().toArray()
tpTwo = resultMatrixTwo[0][0]
fpTwo = resultMatrixTwo[0][1]
fnTwo = resultMatrixTwo[1][0]
tnTwo = resultMatrixTwo[1][1]
precisionTwo = tpTwo/(tpTwo+fpTwo )
recallTwo = tpTwo/(tpTwo+fnTwo)
f1ScoreTwo = (precisionTwo * recallTwo)/(precisionTwo + recallTwo)
print("Logistic Regression Classifier - using training data:")
print("Accuracy = " + str(accuracyModelTwo * 100))
print("Precision = %s" % precisionTwo)
print("Recall = %s" % recallTwo)
print("F1 Score = %s" % f1ScoreTwo)

print(" ")

#Testing performance of Logistic Classifier - using test data
predictTestDataModelTwo = testData.map(lambda p: (float(modelTwo.predict(p.features)), p.label))
accuracyTestDataModelTwo = 1.0 * predictTestDataModelTwo.filter(lambda x: x[0] == x[1]).count() / testData.count()

metricsTwoTest = MulticlassMetrics(predictTestDataModelTwo)

resultMatrixTwoTest = metricsTwoTest.confusionMatrix().toArray()
tpTwoTest = resultMatrixTwoTest[0][0]
fpTwoTest = resultMatrixTwoTest[0][1]
fnTwoTest = resultMatrixTwoTest[1][0]
tnTwoTest = resultMatrixTwoTest[1][1]
precisionTwoTest = tpTwoTest/(tpTwoTest+fpTwoTest)
recallTwoTest = tpTwoTest/(tpTwoTest+fnTwoTest)
f1ScoreTwoTest = (precisionTwoTest * recallTwoTest)/(precisionTwoTest + recallTwoTest)
print("Logistic Regression Classifier - using test data:")
print("Accuracy = " + str(accuracyTestDataModelTwo  * 100))
print("Precision = %s" % precisionTwoTest)
print("Recall = %s" % recallTwoTest)
print("F1 Score = %s" % f1ScoreTwoTest)