# Import necessary packages

In [1]:
from pyspark.sql import SparkSession
from pyspark.streaming.kafka import KafkaUtils
from kafka import KafkaConsumer, KafkaProducer

# Create SPARK session

In [2]:
spark = SparkSession.builder.appName('kafka').getOrCreate()

# Subscribe to "kafka_stream" Topic and create "df1" dataframe

In [3]:
df1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "kafka_stream").load()

# Check if data is streaming

In [4]:
df1.isStreaming

True

# Configure SPARK partitions to 5

In [5]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

# Display streaming data

In [6]:
df1.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
df1 = df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [8]:
#df1.writeStream.format("console").outputMode("append").start()
#df.createOrReplaceTempView("df1");
#records = spark.sql ("SELECT count(*) from df") 
#print(records)

In [9]:
msg_kafka = df1.select("value")

In [10]:
msgQuery = msg_kafka.writeStream.queryName("msg_kafka")\
.format("memory").outputMode("append")\
.start()

In [11]:
from time import sleep

In [15]:
#for x in range(2):
 #   spark.sql("SELECT * FROM msg").show()
 #  sleep(1)

df_msg = spark.sql("SELECT * FROM msg_kafka")


In [46]:
df_msg.show()

+--------------------+
|               value|
+--------------------+
|As a valued netwo...|
|As usual u can ca...|
|Wat time Ì_ wan t...|
|No plans yet. Wha...|
|Anything lar then...|
|Even u dont get i...|
|Was actually abou...|
|So how many days ...|
|**FREE MESSAGE**T...|
|"I know dat feeli...|
|Its a big differe...|
|I don know accoun...|
|"Doing nothing, t...|
|Pls i wont belive...|
|On the way to off...|
|Sunshine Quiz Wkl...|
|"Wat time liao, w...|
|"It's fine, imma ...|
|I taught that Ran...|
|Oh Howda gud gud....|
+--------------------+
only showing top 20 rows



# Preprocessing Streaming data

In [47]:
# Tokenizing

In [48]:
from pyspark.ml.feature import Tokenizer

In [49]:
df_tkn = Tokenizer().setInputCol("value").setOutputCol("TextTokenized").transform(df_msg.select("value"))

In [50]:
# Filtering stopwords

In [51]:
from pyspark.ml.feature import StopWordsRemover

In [52]:
engStopWords = StopWordsRemover.loadDefaultStopWords("english")

In [53]:
df_filtered = StopWordsRemover().setStopWords(engStopWords).setInputCol("TextTokenized").setOutputCol("TextFiltered")

In [54]:
df_filtered = df_filtered.transform(df_tkn)

In [55]:
# Using CountVectorizer to get numerical representation of strings

In [56]:
from pyspark.ml.feature import CountVectorizer

In [57]:
cv = CountVectorizer()\
.setInputCol("TextFiltered")\
.setOutputCol("TextCV")\
.setVocabSize(500)\
.setMinTF(1)\
.setMinDF(2)

In [58]:
df_cv = cv.fit(df_filtered)

In [59]:
df_cv = df_cv.transform(df_filtered)

In [60]:
df_cv.show(5)

+--------------------+--------------------+--------------------+--------------------+
|               value|       TextTokenized|        TextFiltered|              TextCV|
+--------------------+--------------------+--------------------+--------------------+
|As a valued netwo...|[as, a, valued, n...|[valued, network,...|(500,[1,45,80,140...|
|As usual u can ca...|[as, usual, u, ca...|[usual, u, call, ...|(500,[0,1],[1.0,1...|
|Wat time Ì_ wan t...|[wat, time, ì_, w...|[wat, time, ì_, w...|(500,[27,61,87,88...|
|No plans yet. Wha...|[no, plans, yet.,...| [plans, yet., ?,,,]|   (500,[295],[1.0])|
|Anything lar then...|[anything, lar, t...|[anything, lar, ì...|(500,[8,16,30,61,...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [72]:
df_input = df_cv.selectExpr("value as ReceivedMsg", "TextCV as features" )
df_input.show(5)

+--------------------+--------------------+
|         ReceivedMsg|            features|
+--------------------+--------------------+
|As a valued netwo...|(500,[1,45,80,140...|
|As usual u can ca...|(500,[0,1],[1.0,1...|
|Wat time Ì_ wan t...|(500,[27,61,87,88...|
|No plans yet. Wha...|   (500,[295],[1.0])|
|Anything lar then...|(500,[8,16,30,61,...|
+--------------------+--------------------+
only showing top 5 rows



# Load the trained Naive Bayes model

In [73]:
from pyspark.ml.classification import NaiveBayes,NaiveBayesModel
#model = load_model()

In [74]:
trainedModel=NaiveBayesModel.load("./nbModel")

# Model fitting - Function to check if the streaming msg is spam

In [75]:
#def check_if_spam(trainedModel, df_input):
 #   is_spam = True
 #   return is_spam

output = trainedModel.transform(df_input)

In [76]:
output.show(5)

+--------------------+--------------------+--------------------+--------------------+----------+
|         ReceivedMsg|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+--------------------+----------+
|As a valued netwo...|(500,[1,45,80,140...|[-63.515961231592...|[0.02900628350445...|       1.0|
|As usual u can ca...|(500,[0,1],[1.0,1...|[-7.8804239280554...|[0.74397915848769...|       0.0|
|Wat time Ì_ wan t...|(500,[27,61,87,88...|[-32.454823651635...|[0.98429700525633...|       0.0|
|No plans yet. Wha...|   (500,[295],[1.0])|[-8.2946515715809...|[0.34345453136308...|       1.0|
|Anything lar then...|(500,[8,16,30,61,...|[-26.095895283588...|[0.99955060122965...|       0.0|
+--------------------+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [82]:
from pyspark.sql.functions import when

In [83]:
output = output.withColumn("Result", when (col ("prediction") == 1, "spam").otherwise("ham"))

In [84]:
output.show(5)

+--------------------+--------------------+--------------------+--------------------+----------+------+
|         ReceivedMsg|            features|       rawPrediction|         probability|prediction|Result|
+--------------------+--------------------+--------------------+--------------------+----------+------+
|As a valued netwo...|(500,[1,45,80,140...|[-63.515961231592...|[0.02900628350445...|       1.0|  spam|
|As usual u can ca...|(500,[0,1],[1.0,1...|[-7.8804239280554...|[0.74397915848769...|       0.0|   ham|
|Wat time Ì_ wan t...|(500,[27,61,87,88...|[-32.454823651635...|[0.98429700525633...|       0.0|   ham|
|No plans yet. Wha...|   (500,[295],[1.0])|[-8.2946515715809...|[0.34345453136308...|       1.0|  spam|
|Anything lar then...|(500,[8,16,30,61,...|[-26.095895283588...|[0.99955060122965...|       0.0|   ham|
+--------------------+--------------------+--------------------+--------------------+----------+------+
only showing top 5 rows



In [88]:
finalResult = output.selectExpr("ReceivedMsg", "Result" )

In [92]:
finalResult.show()

+--------------------+------+
|         ReceivedMsg|Result|
+--------------------+------+
|As a valued netwo...|  spam|
|As usual u can ca...|   ham|
|Wat time Ì_ wan t...|   ham|
|No plans yet. Wha...|  spam|
|Anything lar then...|   ham|
|Even u dont get i...|   ham|
|Was actually abou...|   ham|
|So how many days ...|   ham|
|**FREE MESSAGE**T...|   ham|
|"I know dat feeli...|   ham|
|Its a big differe...|   ham|
|I don know accoun...|   ham|
|"Doing nothing, t...|   ham|
|Pls i wont belive...|   ham|
|On the way to off...|   ham|
|Sunshine Quiz Wkl...|   ham|
|"Wat time liao, w...|   ham|
|"It's fine, imma ...|  spam|
|I taught that Ran...|   ham|
|Oh Howda gud gud....|   ham|
+--------------------+------+
only showing top 20 rows



In [91]:
finalResult.write.csv('filterResult.csv')