In [0]:
pip install nltk

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
import math

In [0]:
# /FileStore/tables/SPAM_text_message_20170820___Data.csv
# read csv file from the location
spamData = spark.read.csv("/FileStore/tables/SPAM_text_message_20170820___Data.csv", header=True, inferSchema=True)

In [0]:
display(spamData)

Category,Message
ham,"Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat..."
ham,Ok lar... Joking wif u oni...
spam,Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's
ham,U dun say so early hor... U c already then say...
ham,"Nah I don't think he goes to usf, he lives around here though"
spam,"FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, £1.50 to rcv"
ham,Even my brother is not like to speak with me. They treat me like aids patent.
ham,As per your request 'Melle Melle (Oru Minnaminunginte Nurungu Vettam)' has been set as your callertune for all Callers. Press *9 to copy your friends Callertune
spam,WINNER!! As a valued network customer you have been selected to receivea £900 prize reward! To claim call 09061701461. Claim code KL341. Valid 12 hours only.
spam,Had your mobile 11 months or more? U R entitled to Update to the latest colour mobiles with camera for Free! Call The Mobile Update Co FREE on 08002986030


In [0]:
#### Data pre-processing

In [0]:
from pyspark.sql.functions import lower

# convert all the messages into lowercase
spamData = spamData.withColumn("Message", lower(spamData["Message"]))

In [0]:
spamData.show(5)

+--------+--------------------+
|Category|             Message|
+--------+--------------------+
|     ham|go until jurong p...|
|     ham|ok lar... joking ...|
|    spam|free entry in 2 a...|
|     ham|u dun say so earl...|
|     ham|nah i don't think...|
+--------+--------------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import count, when, isnan, col

# check for missing values
spamData.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in spamData.columns]).show()

+--------+-------+
|Category|Message|
+--------+-------+
|       0|      0|
+--------+-------+



In [0]:
from pyspark.ml.feature import Tokenizer

# Tokenize the message into words and store it in a different column
tokenizer = Tokenizer(inputCol="Message", outputCol="words")
spamDF = tokenizer.transform(spamData)

In [0]:
spamDF.show(5)

+--------+--------------------+--------------------+
|Category|             Message|               words|
+--------+--------------------+--------------------+
|     ham|go until jurong p...|[go, until, juron...|
|     ham|ok lar... joking ...|[ok, lar..., joki...|
|    spam|free entry in 2 a...|[free, entry, in,...|
|     ham|u dun say so earl...|[u, dun, say, so,...|
|     ham|nah i don't think...|[nah, i, don't, t...|
+--------+--------------------+--------------------+
only showing top 5 rows



In [0]:
from pyspark.ml.feature import StopWordsRemover

# remove stop words from the tokenized words
sw_remover = StopWordsRemover(inputCol="words", outputCol="words_without_stopwords")
spamDF = sw_remover.transform(spamDF)

In [0]:
spamDF.show(5)

+--------+--------------------+--------------------+-----------------------+
|Category|             Message|               words|words_without_stopwords|
+--------+--------------------+--------------------+-----------------------+
|     ham|go until jurong p...|[go, until, juron...|   [go, jurong, poin...|
|     ham|ok lar... joking ...|[ok, lar..., joki...|   [ok, lar..., joki...|
|    spam|free entry in 2 a...|[free, entry, in,...|   [free, entry, 2, ...|
|     ham|u dun say so earl...|[u, dun, say, so,...|   [u, dun, say, ear...|
|     ham|nah i don't think...|[nah, i, don't, t...|   [nah, think, goes...|
+--------+--------------------+--------------------+-----------------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from nltk.stem import PorterStemmer

# use PorterStemmer to stem the words. (i.e) to obtain root word for each words
stemmer = PorterStemmer()
stemUdf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
spamDF = spamDF.withColumn("stemmed_words", stemUdf("words_without_stopwords"))

In [0]:
spamDF.show(10)

+--------+--------------------+--------------------+-----------------------+--------------------+
|Category|             Message|               words|words_without_stopwords|       stemmed_words|
+--------+--------------------+--------------------+-----------------------+--------------------+
|     ham|go until jurong p...|[go, until, juron...|   [go, jurong, poin...|[go, jurong, poin...|
|     ham|ok lar... joking ...|[ok, lar..., joki...|   [ok, lar..., joki...|[ok, lar..., joke...|
|    spam|free entry in 2 a...|[free, entry, in,...|   [free, entry, 2, ...|[free, entri, 2, ...|
|     ham|u dun say so earl...|[u, dun, say, so,...|   [u, dun, say, ear...|[u, dun, say, ear...|
|     ham|nah i don't think...|[nah, i, don't, t...|   [nah, think, goes...|[nah, think, goe,...|
|    spam|freemsg hey there...|[freemsg, hey, th...|   [freemsg, hey, da...|[freemsg, hey, da...|
|     ham|even my brother i...|[even, my, brothe...|   [even, brother, l...|[even, brother, l...|
|     ham|as per you

In [0]:
from pyspark.ml.feature import CountVectorizer

# use count vectorizer to vectorize the set of words for each message
cVectorizer = CountVectorizer(inputCol="stemmed_words", outputCol="features")
modelCV = cVectorizer.fit(spamDF)
spamDF = modelCV.transform(spamDF)

In [0]:
spamDF.show(5)

+--------+--------------------+--------------------+-----------------------+--------------------+--------------------+
|Category|             Message|               words|words_without_stopwords|       stemmed_words|            features|
+--------+--------------------+--------------------+-----------------------+--------------------+--------------------+
|     ham|go until jurong p...|[go, until, juron...|   [go, jurong, poin...|[go, jurong, poin...|(12328,[5,12,35,7...|
|     ham|ok lar... joking ...|[ok, lar..., joki...|   [ok, lar..., joki...|[ok, lar..., joke...|(12328,[0,25,318,...|
|    spam|free entry in 2 a...|[free, entry, in,...|   [free, entry, 2, ...|[free, entri, 2, ...|(12328,[2,15,17,2...|
|     ham|u dun say so earl...|[u, dun, say, so,...|   [u, dun, say, ear...|[u, dun, say, ear...|(12328,[0,41,97,1...|
|     ham|nah i don't think...|[nah, i, don't, t...|   [nah, think, goes...|[nah, think, goe,...|(12328,[24,154,17...|
+--------+--------------------+-----------------

In [0]:
from pyspark.sql.functions import when

# use the category column with two values: ham and spam to give label column with values 0 and 1 respectively
spamDF = spamDF.withColumn("label", when(col("Category") == "spam", 1).otherwise(0))

In [0]:
spamDF.show(10)

+--------+--------------------+--------------------+-----------------------+--------------------+--------------------+-----+
|Category|             Message|               words|words_without_stopwords|       stemmed_words|            features|label|
+--------+--------------------+--------------------+-----------------------+--------------------+--------------------+-----+
|     ham|go until jurong p...|[go, until, juron...|   [go, jurong, poin...|[go, jurong, poin...|(12328,[5,12,35,7...|    0|
|     ham|ok lar... joking ...|[ok, lar..., joki...|   [ok, lar..., joki...|[ok, lar..., joke...|(12328,[0,25,318,...|    0|
|    spam|free entry in 2 a...|[free, entry, in,...|   [free, entry, 2, ...|[free, entri, 2, ...|(12328,[2,15,17,2...|    1|
|     ham|u dun say so earl...|[u, dun, say, so,...|   [u, dun, say, ear...|[u, dun, say, ear...|(12328,[0,41,97,1...|    0|
|     ham|nah i don't think...|[nah, i, don't, t...|   [nah, think, goes...|[nah, think, goe,...|(12328,[24,154,17...|    0|


In [0]:
# split the dataframe into two: training and test data
(trainingData, testData) = spamDF.randomSplit([0.7, 0.3], seed=200)

In [0]:
#convert the training and test data into rdds for the ease of calculation for naive-bayes implementation
trainingRDD = trainingData.select("label", "features").rdd.map(lambda row: (row.label, row.features))
testRDD = testData.select("label", "features").rdd.map(lambda row: (row.label, row.features))

In [0]:
# number of messages in each label (spam and ham, i.e., 1 and 0)
labelCounts = trainingRDD.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x + y)

In [0]:
# total number of messages in training data
totalData = labelCounts.map(lambda x: x[1]).sum()

In [0]:
# prior probabilities calculation
prior_prob = labelCounts.map(lambda x: (x[0], x[1] / totalData))

In [0]:
labelCounts.take(10)

Out[65]: [(0, 3364), (1, 510)]

In [0]:
totalData

Out[66]: 3874

In [0]:
prior_prob.take(10)

Out[67]: [(0, 0.8683531233866805), (1, 0.13164687661331956)]

In [0]:
# dividing the training data rdd into two, one for spam and one for not spam(ham)
spam_data = trainingRDD.filter(lambda x: x[0] == 1)
notSpam_data = trainingRDD.filter(lambda x: x[0] == 0)

In [0]:
# finding count of each word in the spam training data
spam_wordcounts = spam_data.flatMap(lambda x: x[1]).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (x[0], x[1]))

In [0]:
# finding count of each word in the ham(not spam) training data
notSpam_wordcounts = notSpam_data.flatMap(lambda x: x[1]).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (x[0], x[1]))

In [0]:
# total words in the spam training data
spam_totalWords = spam_wordcounts.map(lambda x: x[1]).sum()

In [0]:
# total words in the ham(not spam) training data
notSpam_totalWords = notSpam_wordcounts.map(lambda x: x[1]).sum()

In [0]:
# calculating conditional probability for each words in the spam training data
spamWordProbs = spam_wordcounts.map(lambda x: (x[0], x[1] / spam_totalWords))

In [0]:
# calculating conditional probability for each words in the not spam(ham) training data
notSpamWordProbs = notSpam_wordcounts.map(lambda x: (x[0], x[1] / notSpam_totalWords))

In [0]:
# converting the spamWordProbs and notSpamWordProbs rdds into dictionary for ease of usage while creating a function for passing to map testrdd
spamWordProbs_dict = dict(spamWordProbs.collect())
notSpamWordProbs_dict = dict(notSpamWordProbs.collect())

In [0]:
# converting prior_prob rdd into dictionary for ease of use while defining function that is passed to the map method for testRDD

#priorProbBC = sc.broadcast(dict(prior_prob.collect()))
prior_prob_dict = dict(prior_prob.collect())

In [0]:
#results = testRDD.map(lambda x: (
#    x[0],  
#    (sum(spamWordProbs_log[word] for word in x[1] if word in spamWordProbs_log) + prior_spam,  
#    sum(notSpamWordProbs_log[word] for word in x[1] if word in notSpamWordProbs_log) + prior_notSpam) 
#))

#predicted_labels = results.map(lambda x: (x[0], 1 if x[1][0] > x[1][1] else 0))

In [0]:
# function that calculates the spam and ham probabilities for each of the message in the testRDD and returns the predicted label
def predict(features):
    spam = math.log(prior_prob_dict[1])
    notSpam = math.log(prior_prob_dict[0])
    
    for f in features:
        if f in spamWordProbs_dict:
            spam += math.log(spamWordProbs_dict[f])
        
        if f in notSpamWordProbs_dict:
            notSpam += math.log(notSpamWordProbs_dict[f])
    
    # Return the label with the highest probability
    if spam > notSpam:
        return 1
    else:
        return 0

In [0]:
# testRDD applies the predict() function above to the vectorized features using map() method to find the predicted label (i.e) to find whether the message is spam or not
predictions = testRDD.map(lambda x: (x[0], predict(x[1])))

In [0]:
predictions.take(50)

Out[80]: [(0, 1),
 (0, 1),
 (0, 1),
 (0, 0),
 (0, 1),
 (0, 1),
 (0, 1),
 (0, 0),
 (0, 0),
 (0, 1),
 (0, 1),
 (0, 1),
 (0, 1),
 (0, 0),
 (0, 1),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 1),
 (0, 0),
 (0, 1),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 1),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 1),
 (0, 1),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 0),
 (0, 1),
 (0, 0),
 (0, 1)]

In [0]:
# filtering the predictions by checking if the actual label and predict label are equal and getting the count of it
# (i.e) correctly predicted labels count
correctPreds = predictions.filter(lambda x: x[0] == x[1]).count()

In [0]:
# entire test data predictions count
completeTest = predictions.count()

In [0]:
# finding accuracy of the prediction
accuracy = float(correctPreds) / float(completeTest)

In [0]:
# Printing Prior Probabilities for each class
print("Prior probability for spam(label: 1): ", prior_prob_dict[1])
print("Prior probability for ham(label: 0): ", prior_prob_dict[0], "\n")

# Printing Accuracy of prediction by naive-bayes
print("Accuracy: {:.2f}%".format(accuracy * 100))

Prior probability for spam(label: 1):  0.13164687661331956
Prior probability for ham(label: 0):  0.8683531233866805 

Accuracy: 84.82%
