### Import and instantiate SparkSession

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('nlp spam filter').getOrCreate()

### Import and analyze a dataset of SMS messages with ham/spam labels. The data is courtesy of UCI Repository: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection

In [3]:
data = spark.read.csv('smsspamcollection/SMSSpamCollection',inferSchema = True,sep = '\t')

In [4]:
data.show()

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
| ham|I'm gonna be home...|
|spam|SIX chances to wi...|
|spam|URGENT! You have ...|
| ham|I've been searchi...|
| ham|I HAVE A DATE ON ...|
|spam|XXXMobileMovieClu...|
| ham|Oh k...i'm watchi...|
| ham|Eh u remember how...|
| ham|Fine if thats th...|
|spam|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [5]:
#rename columns
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

In [None]:
data.show()

In [7]:
data.describe().show()

+-------+-----+--------------------+
|summary|class|                text|
+-------+-----+--------------------+
|  count| 5574|                5574|
|   mean| null|               645.0|
| stddev| null|                 NaN|
|    min|  ham| &lt;#&gt;  in mc...|
|    max| spam|… we r stayin her...|
+-------+-----+--------------------+



In [8]:
#create a SQL-like object you can subsequently apply SQL commands to
data.createOrReplaceTempView('spam')

In [9]:
spark.sql('SELECT class,COUNT(class) FROM spam GROUP BY class').show()

+-----+------------+
|class|count(class)|
+-----+------------+
|  ham|        4827|
| spam|         747|
+-----+------------+



In [10]:
from pyspark.sql.functions import length

In [11]:
# add the length column (in terms of words)
data1 = data.withColumn('length',length(data['text']))

In [None]:
data1.show()

In [13]:
# check manually if the length function works correctly
len(data1.head(2)[1][1])

29

In [14]:
data1.createOrReplaceTempView('ham')

In [15]:
# it is useful to show that there is a clear correlation between the class and the length of a message
spark.sql('SELECT class,AVG(length) FROM ham GROUP BY class').show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [16]:
# conduct the same as above but using the Spark DataFrame tool. Check if the result is the same as using the SQL route.
from pyspark.sql.functions import corr,avg
data1.groupBy('class').avg('length').show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



### Feature transformations needed to be done on text data before machine learning algorithms can be applied 

In [18]:
from pyspark.ml.feature import StringIndexer,Tokenizer,CountVectorizer,IDF,StopWordsRemover

In [19]:
#convert a categorical variable "class" into a numerical variable "class_indexer"
data2 = StringIndexer(inputCol='class',outputCol='class_indexer').fit(data1).transform(data1)

In [None]:
data2.show()

In [21]:
data2.corr('class_indexer','length')

0.3826863816970354

In [26]:
from pyspark.ml.pipeline import Pipeline

In [28]:
data2.columns

['class', 'text', 'length', 'class_indexer']

In [None]:
# converts a message (consisting of sentences) into a list of words (tokens)
token = Tokenizer(inputCol = 'text',outputCol='tokenized')

In [46]:
# Stop words are words which should be excluded from the input, typically because the words appear frequently and don’t carry
# as much meaning ("a, an, the, so, and, etc.)".
cleaned = StopWordsRemover(inputCol = 'tokenized',outputCol = 'cleaned')

In [51]:
# CountVectorizer aims to help convert a collection of text documents to vectors of token counts. 
# CountVectorizer produces sparse representations for the documents over the vocabulary, which can then be passed to machine
# learning algorithms like LDA.
vectorized = CountVectorizer(inputCol = 'cleaned',outputCol = 'vectorized')

In [52]:
# idf (inverse document frequency) is a useful quantity that tells us about a relative importance (i.e. frequency) of a given
# term (word, token) for the entire corpus of documents in question (the collection of SMS in our case).
#Therefore, IDF should be included as a useful feature for a machine learning algorithm.
idf = IDF(inputCol='vectorized',outputCol='idf')

In [31]:
from pyspark.ml.feature import VectorAssembler

In [78]:
# a VectorAssembler object collects all the useful features into one vector of features. 
# This is a peculiarity of the Spark MLlib framework. 
assembler = VectorAssembler(inputCols = ['idf','length'],outputCol = 'features')

### Building a pipeline for the NaiveBayes classifier and evaluating the model's accuracy

In [37]:
from pyspark.ml.classification import NaiveBayes

In [79]:
nb = NaiveBayes(featuresCol ='features',labelCol = 'class_indexer')

In [80]:
pipeline = Pipeline(stages=[token,cleaned,vectorized,idf,assembler])

In [81]:
df = pipeline.fit(data2)

In [82]:
df1 = df.transform(data2)

In [83]:
df1.select('class_indexer','features').show()

+-------------+--------------------+
|class_indexer|            features|
+-------------+--------------------+
|          0.0|(13424,[7,11,31,6...|
|          0.0|(13424,[0,24,297,...|
|          1.0|(13424,[2,13,19,3...|
|          0.0|(13424,[0,70,80,1...|
|          0.0|(13424,[36,134,31...|
|          1.0|(13424,[10,60,139...|
|          0.0|(13424,[10,53,103...|
|          0.0|(13424,[125,184,4...|
|          1.0|(13424,[1,47,118,...|
|          1.0|(13424,[0,1,13,27...|
|          0.0|(13424,[18,43,120...|
|          1.0|(13424,[8,17,37,8...|
|          1.0|(13424,[13,30,47,...|
|          0.0|(13424,[39,96,217...|
|          0.0|(13424,[552,1697,...|
|          1.0|(13424,[30,109,11...|
|          0.0|(13424,[82,214,47...|
|          0.0|(13424,[0,2,49,13...|
|          0.0|(13424,[0,74,105,...|
|          1.0|(13424,[4,30,33,5...|
+-------------+--------------------+
only showing top 20 rows



In [84]:
# split the data into the train and test subsets
train,test = df1.randomSplit([0.7,0.3])

In [85]:
model = nb.fit(train)

In [86]:
result = model.transform(test)

In [87]:
# the column "probability" contains vectors of probablities for class 0 and class 1. 
# the column "prediction" assigns the datapoint (SMS) to the class with the higher probability.
result.select(result.columns[-4:]).show()

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(13424,[3,6,5140,...|[-279.16326202225...|[1.0,1.5786930706...|       0.0|
|(13424,[3,6,41,20...|[-297.03334810841...|[1.0,3.5401904427...|       0.0|
|(13424,[0,3,14,18...|[-1239.8079459947...|[1.0,6.2235721106...|       0.0|
|(13424,[0,78,220,...|[-1050.0202298242...|[0.99999999999999...|       0.0|
|(13424,[3,6,291,6...|[-324.86645760154...|[1.0,2.9902261203...|       0.0|
|(13424,[9,116,259...|[-1935.8222760773...|[1.18783812716468...|       1.0|
|(13424,[120,195,4...|[-307.69748725648...|[0.99999999935737...|       0.0|
|(13424,[5,67,75,2...|[-1165.6962394347...|[1.0,1.0241768454...|       0.0|
|(13424,[0,1,3,9,1...|[-571.78290606225...|[1.0,4.5390397988...|       0.0|
|(13424,[1,43,53,2...|[-213.11886300366...|[0.99999999999944...|       0.0|
|(13424,[3,4

In [73]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [88]:
mcce = MulticlassClassificationEvaluator(predictionCol='prediction',labelCol='class_indexer',metricName = 'accuracy')

#### The accuracy of the model

In [89]:
mcce.evaluate(result)

0.9130691898285038