We have tried building a spam detection filter using Python and Spark. Our data set consists of volunteered text messages from a study in Singapore and some spam texts from a UK reporting site. 

A spam is an irrelevant or inappropriate message sent on internet to a large group of people. A Ham, on the other hand, is a message which is not a spam and is important to us. 
We aim to segregate these two messages and classify them as accurately as possible.

In [0]:
from pyspark.sql import SparkSession

In [0]:
#create a spark session with a relevant name
spark = SparkSession.builder.appName('nlp').getOrCreate()

In [0]:
#read the dataset
data = spark.read.csv('dbfs:/FileStore/shared_uploads/sanchari.gautam@gmail.com/SMSSpamCollection',inferSchema=True,sep="\t")
data.show()

As we can see, there is no column name present in our dataset, we would be renaming them as class and text.

In [0]:
#rename the column names
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')
data.show()

Now, we will be tokenizing each text message into words called tokens. Tokenizer helps us build the models to perform the clsssification. Raw texts are hard to be interpreted and hence splitting them up into smaller units helps us understand the context of the message. 

First, let us see if there is any pattern that we could find out by a simple analysis. We would be trying to find out the average length of spam and ham text messages.
Let us find out how to do that!

In [0]:
from pyspark.sql.functions import length

In [0]:
#create a new column "length" containing the length of each text message
data = data.withColumn('length',length(data['text']))
data.show()

In [0]:
#computing the average length of spam and ham messages
data.groupby('class').mean().show()

In [0]:
from pyspark.ml.feature import (Tokenizer, StopWordsRemover,CountVectorizer, 
                                IDF, StringIndexer)

In [0]:
#create an object of the Tokenizer to convert text column into tokens
tokenizer = Tokenizer(inputCol='text',outputCol='token_text')

Stopwords are the most common words in any natural language. These words do not add any value to the over all meaning and context of the document. Hence, it is better to remove these distracting words in order to make the performance of our model better. 

Some examples of the stopwords are "the, is, are, with, and, etc."

In [0]:
#creating an object of the StopWordsRemover to remove stopwords from the column token_text
stop_remove = StopWordsRemover(inputCol='token_text',outputCol='stop_token')

Now, after tokenizing the texts and removing the stopwords, we are only left with those tokens which are important and relevant to our aim of classifying them into spam and ham. 

Next, we will try to find out the term frequency of each term or token in the text with the help of CountVectorizer class. Let us see how to do that!

In [0]:
#finding the term frequency of each token 
countvec = CountVectorizer(inputCol='stop_token',outputCol='c_vec')

Inverse Document Frequency is a statistical weight to measure the importance of a term in a collection of documents. The more usage a term has across documents, the less relevant it is to the particular document concerned. 

We would be computing the IDF of each token across all messages to find out their significance.

In [0]:
#finding the IDF and saving it the tf_idf column
idf = IDF(inputCol='c_vec',outputCol='tf_idf')

Now, in PySpark, we have a disadvantage of not being able to use classification labels as strings. So, we need to convert them into indexers to feed them to the model.

We would be using the StringIndexer to perform this operation.

In [0]:
#converts ham/spam to 0/1 format
ham_spam_numeric = StringIndexer(inputCol='class',outputCol='label')

In [0]:
from pyspark.ml.feature import VectorAssembler

Finally, we are in the stage where we have all the columns required in the appropriate format. The final straw is in the conversion of all the columns into one dense vector to make it libsvm compatible.

In [0]:
#create the features column having the needed columns in a dense vector format
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

Now that we have the needed dataframe with two columns - features and class, we are ready to build our classification model. Here, we would be using the Naive Bayes' model for the same.

In [0]:
from pyspark.ml.classification import NaiveBayes

In [0]:
#creating a Naive Bayes model object with the default parameters
nb = NaiveBayes()

Finally, we would be performing all the operations in a sequence on the original dataset to make it memory efficient and less time consuming.

In [0]:
from pyspark.ml import Pipeline

In [0]:
#creating the pipeline object with the stages in the sequential manner
data_prep_pipe = Pipeline(stages=[ham_spam_numeric,tokenizer,stop_remove,countvec,idf,clean_up])

In [0]:
#Fitting the pipeline object with our data
cleaner = data_prep_pipe.fit(data)

In [0]:
#Transform the original dataset 
clean_data = cleaner.transform(data)

In [0]:
#select only the label and features column  
final_data = clean_data.select('label','features')
final_data.show()

Model Building and training are done now on 80% of the dataset and it is tested against 20% of the dataset.

In [0]:
#test train split
train,test = final_data.randomSplit([0.8,0.2])

In [0]:
#train the Naive Bayes model
spam_detector = nb.fit(train)

In [0]:
#Predict on the test dataset
test_results = spam_detector.transform(test)
test_results.show()

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
#evaluating the prediction of the spam and ham on the test dataset
acc_eval = MulticlassClassificationEvaluator()

In [0]:
#computing the accuracy
acc = acc_eval.evaluate(test_results)
acc

Thus we can see how raw texts can be fed into mathematical machine learning models for classification. Since by using the default parameters of the Naive Bayes model, we have achieved a near perfect model with an accuracy of 93%, we can conclude that our model is good to go for the classification.