In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, when, size, length, rand, regexp_replace, asc, desc
from pyspark.ml.classification import NaiveBayes

In [0]:
# The URL of the raw file on GitHub
github_raw_url = "https://raw.githubusercontent.com/swetha0404/datasets/main/bdma/spamhamtext.csv"
 
# The destination path where you want to save the file in Databricks
dbfs_destination_path = "dbfs:/FileStore/tables/spamhamtext.csv"
 
# Using dbutils to read and write the file
dbutils.fs.cp(github_raw_url, dbfs_destination_path)

Out[48]: True

In [0]:
# Load the dataset
df = spark.read.csv("dbfs:/FileStore/tables/spamhamtext.csv", header=True, inferSchema=True)
df.show()

+--------+--------------------+
|Category|             Message|
+--------+--------------------+
|     ham|Go until jurong p...|
|     ham|Ok lar... Joking ...|
|    spam|Free entry in 2 a...|
|     ham|U dun say so earl...|
|     ham|Nah I don't think...|
|    spam|FreeMsg Hey there...|
|     ham|Even my brother i...|
|     ham|As per your reque...|
|    spam|WINNER!! As a val...|
|    spam|Had your mobile 1...|
|     ham|I'm gonna be home...|
|    spam|SIX chances to wi...|
|    spam|URGENT! You have ...|
|     ham|I've been searchi...|
|     ham|I HAVE A DATE ON ...|
|    spam|XXXMobileMovieClu...|
|     ham|Oh k...i'm watchi...|
|     ham|Eh u remember how...|
|     ham|Fine if thats th...|
|    spam|England v Macedon...|
+--------+--------------------+
only showing top 20 rows



In [0]:
df.select("Category").distinct().show()

+--------------------+
|            Category|
+--------------------+
|ham\tHI BABE UAWA...|
|                 ham|
|                spam|
|           ham\tYeah|
+--------------------+



In [0]:
# newdf = df.filter((df["Type"]) <= 20)
df.createOrReplaceTempView("messages")
filtered_df = spark.sql("SELECT * FROM messages WHERE LENGTH(Category) <= 5")

In [0]:
# filtered_df.select('Type').distinct().show()
df = filtered_df
df.printSchema()

root
 |-- Category: string (nullable = true)
 |-- Message: string (nullable = true)



In [0]:
df.show()

+--------+--------------------+
|Category|             Message|
+--------+--------------------+
|     ham|Go until jurong p...|
|     ham|Ok lar... Joking ...|
|    spam|Free entry in 2 a...|
|     ham|U dun say so earl...|
|     ham|Nah I don't think...|
|    spam|FreeMsg Hey there...|
|     ham|Even my brother i...|
|     ham|As per your reque...|
|    spam|WINNER!! As a val...|
|    spam|Had your mobile 1...|
|     ham|I'm gonna be home...|
|    spam|SIX chances to wi...|
|    spam|URGENT! You have ...|
|     ham|I've been searchi...|
|     ham|I HAVE A DATE ON ...|
|    spam|XXXMobileMovieClu...|
|     ham|Oh k...i'm watchi...|
|     ham|Eh u remember how...|
|     ham|Fine if thats th...|
|    spam|England v Macedon...|
+--------+--------------------+
only showing top 20 rows



In [0]:
df = df.orderBy(rand())
df.show()

+--------+--------------------+
|Category|             Message|
+--------+--------------------+
|     ham|Sorry, I'll call ...|
|     ham|This is all just ...|
|     ham|Sat right? Okay t...|
|     ham|Some friends want...|
|     ham|Hurry home u big ...|
|     ham|Sorry . I will be...|
|     ham|Joy's father is J...|
|     ham|Ok good then i la...|
|     ham|Ugh just got outt...|
|     ham|Tell me again wha...|
|     ham|Did u got that pe...|
|     ham|Abeg, make profit...|
|     ham|And now electrici...|
|     ham|Its on in engalnd...|
|     ham|Bugis oso near wa...|
|    spam|You are guarantee...|
|     ham|Then ur physics g...|
|     ham|"Finally it has h...|
|     ham|Dear we got  &lt;...|
|     ham|Reckon need to be...|
+--------+--------------------+
only showing top 20 rows



In [0]:
# Rename columns for convenience
df = df.withColumnRenamed("Message", "text").withColumnRenamed("Category", "label")

In [0]:
df = df.withColumn("text_1", regexp_replace("text", "[\W_]+", " "))
df = df.withColumn("text_2", regexp_replace("text_1", "^\s+|\s+$", ""))
df = df.withColumn("text_3", regexp_replace("text_2", "\s+", " "))
df.show(30)

+-----+--------------------+--------------------+--------------------+--------------------+
|label|                text|              text_1|              text_2|              text_3|
+-----+--------------------+--------------------+--------------------+--------------------+
|  ham|Sorry, I'll call ...|Sorry I ll call l...|Sorry I ll call l...|Sorry I ll call l...|
|  ham|This is all just ...|This is all just ...|This is all just ...|This is all just ...|
|  ham|Sat right? Okay t...|Sat right Okay th...|Sat right Okay th...|Sat right Okay th...|
|  ham|Some friends want...|Some friends want...|Some friends want...|Some friends want...|
|  ham|Hurry home u big ...|Hurry home u big ...|Hurry home u big ...|Hurry home u big ...|
|  ham|Sorry . I will be...|Sorry I will be a...|Sorry I will be a...|Sorry I will be a...|
|  ham|Joy's father is J...|Joy s father is J...|Joy s father is J...|Joy s father is J...|
|  ham|Ok good then i la...|Ok good then i la...|Ok good then i la...|Ok good th

In [0]:
tokenizer = Tokenizer(inputCol="text_3", outputCol="words")
tokenized = tokenizer.transform(df)
tokenized.show()

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|                text|              text_1|              text_2|              text_3|               words|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|  ham|Sorry, I'll call ...|Sorry I ll call l...|Sorry I ll call l...|Sorry I ll call l...|[sorry, i, ll, ca...|
|  ham|This is all just ...|This is all just ...|This is all just ...|This is all just ...|[this, is, all, j...|
|  ham|Sat right? Okay t...|Sat right Okay th...|Sat right Okay th...|Sat right Okay th...|[sat, right, okay...|
|  ham|Some friends want...|Some friends want...|Some friends want...|Some friends want...|[some, friends, w...|
|  ham|Hurry home u big ...|Hurry home u big ...|Hurry home u big ...|Hurry home u big ...|[hurry, home, u, ...|
|  ham|Sorry . I will be...|Sorry I will be a...|Sorry I will be a...|Sorry I will be a...|[sorr

In [0]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
removed = remover.transform(tokenized)
removed.show()

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|                text|              text_1|              text_2|              text_3|               words|            filtered|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  ham|Sorry, I'll call ...|Sorry I ll call l...|Sorry I ll call l...|Sorry I ll call l...|[sorry, i, ll, ca...|[sorry, ll, call,...|
|  ham|This is all just ...|This is all just ...|This is all just ...|This is all just ...|[this, is, all, j...|     [creepy, crazy]|
|  ham|Sat right? Okay t...|Sat right Okay th...|Sat right Okay th...|Sat right Okay th...|[sat, right, okay...|[sat, right, okay...|
|  ham|Some friends want...|Some friends want...|Some friends want...|Some friends want...|[some, friends, w...|[friends, want, d...|
|  ham|Hurry home u big ...|Hurry home u big ...|Hurry home u 

In [0]:
new_df = removed.withColumn("label", when(col("label")=='spam', 0).when(col("label")=='ham', 1))
new_df.show()

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|                text|              text_1|              text_2|              text_3|               words|            filtered|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    1|Sorry, I'll call ...|Sorry I ll call l...|Sorry I ll call l...|Sorry I ll call l...|[sorry, i, ll, ca...|[sorry, ll, call,...|
|    1|This is all just ...|This is all just ...|This is all just ...|This is all just ...|[this, is, all, j...|     [creepy, crazy]|
|    1|Sat right? Okay t...|Sat right Okay th...|Sat right Okay th...|Sat right Okay th...|[sat, right, okay...|[sat, right, okay...|
|    1|Some friends want...|Some friends want...|Some friends want...|Some friends want...|[some, friends, w...|[friends, want, d...|
|    1|Hurry home u big ...|Hurry home u big ...|Hurry home u 

In [0]:
final_df = new_df.select("filtered", "label")
final_df.show()

+--------------------+-----+
|            filtered|label|
+--------------------+-----+
|[sorry, ll, call,...|    1|
|     [creepy, crazy]|    1|
|[sat, right, okay...|    1|
|[friends, want, d...|    1|
|[hurry, home, u, ...|    1|
|[sorry, able, get...|    1|
|[joy, father, joh...|    1|
|[ok, good, later,...|    1|
|[ugh, got, outta,...|    1|
|     [tell, address]|    1|
|[u, got, persons,...|    1|
|[abeg, make, prof...|    1|
|[electricity, wen...|    1|
|[engalnd, telly, ...|    1|
|[bugis, oso, near...|    1|
|[guaranteed, late...|    0|
|  [ur, physics, get]|    1|
|[finally, happene...|    1|
|[dear, got, lt, g...|    1|
|[reckon, need, to...|    1|
+--------------------+-----+
only showing top 20 rows



In [0]:
final_df = final_df.withColumnRenamed("filtered", "text")
display(final_df)

text,label
"List(sorry, ll, call, later, meeting)",1
"List(creepy, crazy)",1
"List(sat, right, okay, thanks)",1
"List(friends, want, drive, em, someplace, probably, take)",1
"List(hurry, home, u, big, butt, hang, last, caller, u, food, done, m, starving, ask, cooked)",1
"List(sorry, able, get, see, morning)",1
"List(joy, father, john, john, joy, father, u, ans, ths, hav, lt, gt, iq, tis, ias, question, try, answer)",1
"List(ok, good, later, come, find, c, lucky, told, go, earlier, later, pple, take, finish)",1
"List(ugh, got, outta, class)",1
"List(tell, address)",1


In [0]:
train, test = final_df.randomSplit([0.7, 0.3], seed = 200)

In [0]:
# train.show()
train.count()

Out[66]: 3874

In [0]:
# test.show()
test.count()

Out[67]: 1698

In [0]:
n = final_df.count()
temp = final_df.groupBy("label").count()
print("Count:", n)

Count: 5572


In [0]:
temp.show()

+-----+-----+
|label|count|
+-----+-----+
|    1| 4825|
|    0|  747|
+-----+-----+



In [0]:
labelCount = temp.rdd.map(lambda x: x[1]).collect()
print("labelCount:",labelCount)

labelCount: [4825, 747]


In [0]:
hamCount = labelCount[0]
spamCount = labelCount[1]
print("ham count: ", hamCount, "\nspam count: ", spamCount)

ham count:  4825 
spam count:  747


In [0]:
priorham = hamCount / n
priorspam = spamCount / n

print("Prior of ham:", priorham, "\nPrior of spam:", priorspam)

Prior of ham: 0.8659368269921034 
Prior of spam: 0.13406317300789664


In [0]:
trainInput = train.rdd.map(lambda x : [x[0], x[1]])
 
resultspam = trainInput.flatMap(lambda x: x[0] if x[1]==0 else []).filter(lambda x: x!=[])
resultspam.collect()

Out[69]: ['07732584351',
 'rodger',
 'burns',
 'msg',
 'tried',
 'call',
 're',
 'reply',
 'sms',
 'free',
 'nokia',
 'mobile',
 'free',
 'camcorder',
 'please',
 'call',
 '08000930705',
 'delivery',
 'tomorrow',
 '07801543489',
 'guaranteed',
 'latests',
 'nokia',
 'phone',
 '40gb',
 'ipod',
 'mp3',
 'player',
 '500',
 'prize',
 'txt',
 'word',
 'collect',
 '83355',
 'tc',
 'llc',
 'ny',
 'usa',
 '150p',
 'mt',
 'msgrcvd18',
 '09066362231',
 'urgent',
 'mobile',
 '07xxxxxxxxx',
 'won',
 '2',
 '000',
 'bonus',
 'caller',
 'prize',
 '02',
 '06',
 '03',
 '2nd',
 'attempt',
 'reach',
 'call',
 '09066362231',
 'asap',
 '0a',
 'networks',
 'allow',
 'companies',
 'bill',
 'sms',
 'responsible',
 'suppliers',
 '1',
 'new',
 'message',
 'call',
 '0207',
 '083',
 '6089',
 '1',
 'new',
 'message',
 'please',
 'call',
 '08712400200',
 '1',
 'new',
 'message',
 'please',
 'call',
 '08718738034',
 '1',
 'new',
 'voicemail',
 'please',
 'call',
 '08719181503',
 '1',
 'new',
 'voicemail',
 'please',

In [0]:
resultham = trainInput.flatMap(lambda x: x[0] if x[1]==1 else []).filter(lambda x: x!=[])
resultham.collect()

Out[70]: ['',
 '',
 '1',
 '20',
 'call',
 'cost',
 'guess',
 'isnt',
 'bad',
 'miss',
 'ya',
 'need',
 'ya',
 'want',
 'ya',
 'love',
 'ya',
 '1',
 'cbe',
 '2',
 'chennai',
 '1',
 'go',
 'write',
 'msg',
 '2',
 'put',
 'dictionary',
 'mode',
 '3',
 'cover',
 'screen',
 'hand',
 '4',
 'press',
 'lt',
 'gt',
 '5',
 'gently',
 'remove',
 'ur',
 'hand',
 'interesting',
 '1',
 'go',
 'write',
 'msg',
 '2',
 'put',
 'dictionary',
 'mode',
 '3',
 'cover',
 'screen',
 'hand',
 '4',
 'press',
 'lt',
 'gt',
 '5',
 'gently',
 'remove',
 'ur',
 'hand',
 'interesting',
 '1',
 'go',
 'write',
 'msg',
 '2',
 'put',
 'dictionary',
 'mode',
 '3',
 'cover',
 'screen',
 'hand',
 '4',
 'press',
 'lt',
 'gt',
 '5',
 'gently',
 'remove',
 'ur',
 'hand',
 'interesting',
 '1',
 'tension',
 'face',
 '2',
 'smiling',
 'face',
 '3',
 'waste',
 'face',
 '4',
 'innocent',
 'face',
 '5',
 'terror',
 'face',
 '6',
 'cruel',
 'face',
 '7',
 'romantic',
 'face',
 '8',
 'lovable',
 'face',
 '9',
 'decent',
 'face',
 'l

In [0]:

trainham = resultham.map(lambda x : (x,1)).reduceByKey(lambda x,y : x + y).filter(lambda x: x[0]!='')
trainham.take(10)

Out[71]: [('1', 41),
 ('20', 5),
 ('call', 163),
 ('cost', 8),
 ('guess', 24),
 ('isnt', 5),
 ('bad', 26),
 ('miss', 44),
 ('ya', 43),
 ('need', 115)]

In [0]:

trainspam = resultspam.map(lambda x : (x,1)).reduceByKey(lambda x,y : x + y).filter(lambda x: x[0]!='')
trainspam.take(10)

Out[72]: [('07732584351', 1),
 ('rodger', 1),
 ('burns', 1),
 ('msg', 37),
 ('tried', 13),
 ('call', 256),
 ('re', 10),
 ('reply', 75),
 ('sms', 32),
 ('free', 164)]

In [0]:
# Vocabulary Count
trainHamCount = trainham.count()
trainSpamCount = trainspam.count()
 
v = trainHamCount + trainSpamCount
 
print("trainHamCount: ", trainHamCount)
print("trainSpamCount: ", trainSpamCount)
print("v: ", v)


trainHamCount:  5602
trainSpamCount:  2326
v:  7928


In [0]:
trainHamProbability = trainham.map(lambda x : (x[0], (float)(x[1] + 1) / (float)(v + trainHamCount)))
trainSpamProbability = trainspam.map(lambda x : (x[0], (float)(x[1] + 1) / (float)(v + trainSpamCount)))
trainHamProbability.take(10)

Out[76]: [('1', 0.0031042128603104213),
 ('20', 0.0004434589800443459),
 ('call', 0.012121212121212121),
 ('cost', 0.0006651884700665188),
 ('guess', 0.0018477457501847746),
 ('isnt', 0.0004434589800443459),
 ('bad', 0.0019955654101995565),
 ('miss', 0.0033259423503325942),
 ('ya', 0.0032520325203252032),
 ('need', 0.008573540280857354)]

In [0]:
trainSpamProbability.take(10)

Out[77]: [('07732584351', 0.0001950458357714063),
 ('rodger', 0.0001950458357714063),
 ('burns', 0.0001950458357714063),
 ('msg', 0.0037058708796567194),
 ('tried', 0.001365320850399844),
 ('call', 0.025063389896625706),
 ('re', 0.0010727520967427347),
 ('reply', 0.007411741759313439),
 ('sms', 0.0032182562902282035),
 ('free', 0.01609128145114102)]

In [0]:
from pyspark.sql.functions import monotonically_increasing_id
 
#adding id column for easy access
testDataFrame = test.withColumn("id", monotonically_increasing_id())
testDataFrame.sort(testDataFrame.id.desc()).collect()
testData = testDataFrame.rdd.map(lambda x:(x["text"],x["label"],x["id"]))

In [0]:
testDataFrame.display()

text,label,id
List(),1,0
List(),1,1
"List(08714712388, 10am, 7pm, cost, 10p)",0,2
"List(1, finish, meeting, call)",1,3
"List(1, new, message, please, call, 08715205273)",0,4
"List(1, new, voicemail, please, call, 08719181513)",0,5
"List(1, number, 2, gonna, massive, pain, ass, d, rather, get, involved, possible)",1,6
"List(1, reach, home, call)",1,7
"List(10, min, later, k)",1,8
"List(1000, flirting, txt, girl, bloke, ur, name, age, eg, girl, zoe, 18, 8007, join, get, chatting)",0,9


In [0]:
testData.collect()

Out[91]: [([], 1, 0),
 ([], 1, 1),
 (['08714712388', '10am', '7pm', 'cost', '10p'], 0, 2),
 (['1', 'finish', 'meeting', 'call'], 1, 3),
 (['1', 'new', 'message', 'please', 'call', '08715205273'], 0, 4),
 (['1', 'new', 'voicemail', 'please', 'call', '08719181513'], 0, 5),
 (['1',
   'number',
   '2',
   'gonna',
   'massive',
   'pain',
   'ass',
   'd',
   'rather',
   'get',
   'involved',
   'possible'],
  1,
  6),
 (['1', 'reach', 'home', 'call'], 1, 7),
 (['10', 'min', 'later', 'k'], 1, 8),
 (['1000',
   'flirting',
   'txt',
   'girl',
   'bloke',
   'ur',
   'name',
   'age',
   'eg',
   'girl',
   'zoe',
   '18',
   '8007',
   'join',
   'get',
   'chatting'],
  0,
  9),
 (['1000',
   'girls',
   'many',
   'local',
   '2',
   'u',
   'r',
   'virgins',
   '2',
   'r',
   'ready',
   '2',
   '4fil',
   'ur',
   'every',
   'sexual',
   'need',
   'u',
   '4fil',
   'text',
   'cute',
   '69911',
   '1',
   '50p',
   'm'],
  0,
  10),
 (['1000',
   'winner',
   'guaranteed',
   '

In [0]:
tokenizedTestData = testData.flatMap(lambda x : [(word, x[2]) for word in x[0]])
tokenizedTestDF = tokenizedTestData.toDF(["word","id"])
 
#join the training Data for positive label with TokenizedtestData to get the Probability of each word(By ID) in the ham class
#We are multiplying the prior probability of ham class with the probabilities of all the words [P(A|B)]
 
trainHamProbabilityDF = trainHamProbability.toDF(["word", "hamProb"])
testHamProbability = tokenizedTestDF.join(trainHamProbabilityDF, tokenizedTestDF.word == trainHamProbabilityDF.word)
testHamProbability = testHamProbability.rdd.map(lambda x: (x["id"],x["hamProb"]))
testHamProbability = testHamProbability.reduceByKey(lambda x, y: x + y).map(lambda x:(x[0], x[1] * priorham)).sortByKey()
testHamProbability.collect()

Out[92]: [(2, 0.0007040136804813849),
 (3, 0.01644831962579236),
 (4, 0.023232451455885702),
 (5, 0.020224393002919783),
 (6, 0.04460886684504775),
 (7, 0.022080429069643432),
 (8, 0.014848288533789212),
 (9, 0.03008058452965917),
 (10, 0.1979558467026294),
 (11, 0.011584225106102788),
 (12, 0.026944523589333004),
 (13, 0.07065737302285899),
 (14, 0.029824579554938665),
 (15, 0.030016583285979042),
 (16, 0.04217681958520297),
 (17, 0.01203223381186367),
 (18, 0.05920115040411645),
 (20, 0.14829088160685164),
 (21, 0.030976601941180935),
 (22, 0.05920115040411645),
 (23, 0.012416241273944424),
 (25, 0.025984504934131115),
 (26, 0.026240509908851618),
 (27, 0.031936620596382824),
 (28, 0.00422408208288831),
 (29, 0.012992252467065557),
 (30, 0.00857616665313687),
 (31, 0.008640167896816998),
 (32, 0.0035840696460870502),
 (33, 0.09452983691554596),
 (34, 0.021312414145481923),
 (35, 0.02560049747205036),
 (36, 0.014272277340668077),
 (37, 0.12083434806807768),
 (38, 0.09849791402371376),

In [0]:
#join the training Data for spam label with TokenizedtestData to get the Probability of each word(By ID) in the spam class
#We are multiplying the prior probability of spam class with the probabilities of all the words [P(A|B)]
 
trainSpamProbabilityDF = trainSpamProbability.toDF(["word", "spamProb"])
testSpamProbability = tokenizedTestDF.join(trainSpamProbabilityDF, tokenizedTestDF.word == trainSpamProbabilityDF.word)
testSpamProbability = testSpamProbability.rdd.map(lambda x: (x["id"],x["spamProb"]))
testSpamProbability = testSpamProbability.reduceByKey(lambda x, y: x + y).map(lambda x:(x[0], x[1] * priorspam)).sortByKey()
testSpamProbability.collect()

Out[93]: [(2, 0.0006144888951990581),
 (3, 0.004379867657269882),
 (4, 0.005831107388484678),
 (5, 0.005621919679480744),
 (6, 0.003961492239262013),
 (7, 0.004471387279959104),
 (8, 0.0007321569815137714),
 (9, 0.004942059625217957),
 (10, 0.013976353807825383),
 (11, 0.006837823238066114),
 (12, 0.003360077575875701),
 (13, 0.005491177361353286),
 (14, 0.003765378762070824),
 (15, 0.0022618421036050434),
 (16, 0.002876330998804102),
 (17, 0.0060272208656758685),
 (18, 0.009008145718981937),
 (19, 6.537115906372959e-05),
 (20, 0.01595056281155002),
 (21, 0.01172758593603309),
 (22, 0.0089689230235437),
 (23, 0.00018303924537844285),
 (25, 0.000496820808884345),
 (26, 0.0016473532084059857),
 (27, 0.008668215691850542),
 (28, 3.922269543823775e-05),
 (29, 0.00018303924537844285),
 (30, 0.0004445238816333612),
 (31, 0.007517683292328903),
 (32, 0.00045759811344610714),
 (33, 0.0005491177361353286),
 (34, 0.0008498250678284847),
 (35, 0.0004575981134461072),
 (36, 0.0010197900813941816),

In [0]:
#Join the DFs so that we get (id, [hamProb, spamProb]) where each row shows the probability of word with ID in ham and spam class
combinedProb = testSpamProbability.join(testHamProbability).sortByKey("id")
combinedProb.collect()

Out[94]: [(2, (0.0006144888951990581, 0.0007040136804813849)),
 (3, (0.004379867657269882, 0.01644831962579236)),
 (4, (0.005831107388484678, 0.023232451455885702)),
 (5, (0.005621919679480744, 0.020224393002919783)),
 (6, (0.003961492239262013, 0.04460886684504775)),
 (7, (0.004471387279959104, 0.022080429069643432)),
 (8, (0.0007321569815137714, 0.014848288533789212)),
 (9, (0.004942059625217957, 0.03008058452965917)),
 (10, (0.013976353807825383, 0.1979558467026294)),
 (11, (0.006837823238066114, 0.011584225106102788)),
 (12, (0.003360077575875701, 0.026944523589333004)),
 (13, (0.005491177361353286, 0.07065737302285899)),
 (14, (0.003765378762070824, 0.029824579554938665)),
 (15, (0.0022618421036050434, 0.030016583285979042)),
 (16, (0.002876330998804102, 0.04217681958520297)),
 (17, (0.0060272208656758685, 0.01203223381186367)),
 (18, (0.009008145718981937, 0.05920115040411645)),
 (20, (0.01595056281155002, 0.14829088160685164)),
 (21, (0.01172758593603309, 0.030976601941180935)),

In [0]:
#Finding the actual prediction by comparing the hamProb and spamProb
prediction = combinedProb.map(lambda x:(x[0], 1 if x[1][0] > x[1][1] else 0)).toDF(["id","predictions"])
finalPrediction = prediction.join(testDataFrame, prediction.id == testDataFrame.id)
 
finalPrediction = finalPrediction.select(col("predictions").cast("double"),col("label").cast("double"))
finalPrediction.display()

predictions,label
0.0,0.0
0.0,1.0
0.0,0.0
0.0,0.0
0.0,1.0
0.0,1.0
0.0,1.0
0.0,0.0
0.0,0.0
0.0,0.0


In [0]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
 
results = finalPrediction.rdd.map(lambda x : x)
results.take(10)

Out[97]: [Row(predictions=0.0, label=0.0),
 Row(predictions=0.0, label=1.0),
 Row(predictions=0.0, label=0.0),
 Row(predictions=0.0, label=0.0),
 Row(predictions=0.0, label=1.0),
 Row(predictions=0.0, label=1.0),
 Row(predictions=0.0, label=1.0),
 Row(predictions=0.0, label=0.0),
 Row(predictions=0.0, label=0.0),
 Row(predictions=0.0, label=0.0)]

In [0]:
metrics = MulticlassMetrics(results)
print("Confusion matrix:\n",metrics.confusionMatrix())
print("Model Accuracy = %s" % metrics.accuracy)



Confusion matrix:
 DenseMatrix([[ 210.,    5.],
             [1376.,    0.]])
Model Accuracy = 0.13199245757385292
