In [1]:
# create entry points to spark
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL

# We add this line to avoid an error : "Cannot run multiple SparkContexts at once". 
# If there is an existing spark context, we will reuse it instead of creating a new context.
sc = SparkContext.getOrCreate()

# local[*]: run Spark locally with as many working processors as logical cores on your machine.
# In the field of `master`, we use a local server with as many working processors (or threads) as possible (i.e. `local[*]`). 
# If we want Spark to run locally with 'k' worker threads, we can specify as `local[k]`.
# The `appName` field is a name to be shown on the Sparking cluster UI. 

# If there is no existing spark context, we now create a new context
if (sc is None):
    sc = SparkContext(master="local[*]", appName="Lecture Demo Week 05")
spark = SparkSession(sparkContext=sc)

# Spam classification example
The data is from UCI Machine Learning Repository and can be downloaded from [here](https://archive.ics.uci.edu/ml/machine-learning-databases/00228/).

According to the data describing the data is a set of SMS tagged messages that have been collected for SMS Spam research. It contains one set of SMS messages in English of 5,574 messages, tagged according to being ham (legitimate) or spam. We will tokenize the messages and create TF-IDF and then we will build models using cross-validation and grid search and compare the accuracy.


In [2]:
# Read data
df = spark.read.csv("SMSSpamCollection", sep = "\t", inferSchema = True, header = False)
df.show(5, truncate = False)

+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0 |_c1                                                                                                                                                        |
+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|ham |Ok lar... Joking wif u oni...                                                                                                                              |
|spam|Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
|ham |U dun say so ear

In [3]:
# Rename columns
df = df.withColumnRenamed('_c0', 'label').withColumnRenamed('_c1', 'message')
df.show(5, truncate = False)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|message                                                                                                                                                    |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|ham  |Ok lar... Joking wif u oni...                                                                                                                              |
|spam |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
|ham  |U dun say

In [4]:
# Change the status column to numeric: ham to 1.0 and spam to 0. 
from pyspark.sql.functions import when

df = df.withColumn('label', when(df['label'] == 'ham', 1.0).otherwise(0.0))
df.show(5, truncate = False)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|message                                                                                                                                                    |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|1.0  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|1.0  |Ok lar... Joking wif u oni...                                                                                                                              |
|0.0  |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
|1.0  |U dun say

## Feature Transformation: Tokenization

In [5]:
# Tokenize the messages
from pyspark.ml.feature import  Tokenizer

tokenizer = Tokenizer(inputCol="message", outputCol="words")
wordsData = tokenizer.transform(df)
wordsData.show(5, truncate=False)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|message                                                                                                                                                    |words                                                                                                                                                                                   |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------

## Feature Extraction: CountVectorizer
CountVectorizer converts the list of tokens above to vectors of token counts.

In [6]:
from pyspark.ml.feature import CountVectorizer
count = CountVectorizer (inputCol="words", outputCol="rawFeatures")
model = count.fit(wordsData)
featurizedData = model.transform(wordsData)
featurizedData.show(5, truncate=True)

+-----+--------------------+--------------------+--------------------+
|label|             message|               words|         rawFeatures|
+-----+--------------------+--------------------+--------------------+
|  1.0|Go until jurong p...|[go, until, juron...|(13587,[8,42,52,6...|
|  1.0|Ok lar... Joking ...|[ok, lar..., joki...|(13587,[5,75,411,...|
|  0.0|Free entry in 2 a...|[free, entry, in,...|(13587,[0,3,8,20,...|
|  1.0|U dun say so earl...|[u, dun, say, so,...|(13587,[5,22,60,1...|
|  1.0|Nah I don't think...|[nah, i, don't, t...|(13587,[0,1,66,87...|
+-----+--------------------+--------------------+--------------------+
only showing top 5 rows



In [7]:
from pyspark.ml.feature import  IDF
# IDF down-weighs features which appear frequently in a corpus. 
# This generally improves performance when using text as features since most frequent, 
# and hence less important words, get down-weighed.
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()  
# We want only the label and features columns for our machine learning models

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(13587,[8,42,52,6...|
|  1.0|(13587,[5,75,411,...|
|  0.0|(13587,[0,3,8,20,...|
|  1.0|(13587,[5,22,60,1...|
|  1.0|(13587,[0,1,66,87...|
|  0.0|(13587,[0,2,6,10,...|
|  1.0|(13587,[0,7,9,13,...|
|  1.0|(13587,[0,10,11,4...|
|  0.0|(13587,[0,2,3,14,...|
|  0.0|(13587,[0,4,5,10,...|
|  1.0|(13587,[0,1,6,32,...|
|  0.0|(13587,[0,6,40,46...|
|  0.0|(13587,[0,2,3,4,8...|
|  1.0|(13587,[0,1,2,3,4...|
|  1.0|(13587,[1,3,14,16...|
|  0.0|(13587,[0,4,8,11,...|
|  1.0|(13587,[158,314,3...|
|  1.0|(13587,[1,5,20,47...|
|  1.0|(13587,[4,5,29,59...|
|  0.0|(13587,[0,4,28,82...|
+-----+--------------------+
only showing top 20 rows



# ML Pipeline

## Training Data

In [8]:
# Split data into training (80%) and testing (20%)
seed = 0  # set seed for reproducibility
trainDF, testDF = rescaledData.randomSplit([0.8,0.2],seed)

In [9]:
print("Number of training data: ", trainDF.count())
print("Number of test data: ", testDF.count())

Number of training data:  4424
Number of test data:  1150


## Feature Vectors

In [10]:
rescaledData.select("label", "features").show(5, truncate=True)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(13587,[8,42,52,6...|
|  1.0|(13587,[5,75,411,...|
|  0.0|(13587,[0,3,8,20,...|
|  1.0|(13587,[5,22,60,1...|
|  1.0|(13587,[0,1,66,87...|
+-----+--------------------+
only showing top 5 rows



## Model: Logistic Regression Classifier
Logistic regression is a popular method to predict a categorical response. It is a special case of Generalized Linear models that predicts the probability of the outcomes. In spark.ml logistic regression can be used to predict a binary outcome by using binomial logistic regression, or it can be used to predict a multiclass outcome by using multinomial logistic regression.

### Best Model

In [11]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np

lr = LogisticRegression(maxIter = 10)

paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, np.linspace(0.3, 0.01, 10)) \
    .addGrid(lr.elasticNetParam, np.linspace(0.3, 0.8, 6)) \
    .build()
crossval_lr = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid_lr,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds= 5)  

cvModel_lr = crossval_lr.fit(trainDF)
best_model_lr = cvModel_lr.bestModel.summary
best_model_lr.predictions.columns

['label',
 'message',
 'words',
 'rawFeatures',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [12]:
train_fit_lr = best_model_lr.predictions.select('label','prediction')
train_fit_lr.groupBy('label','prediction').count().show()

# 3848 ham and 602 spams

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0| 3831|
|  0.0|       0.0|  593|
+-----+----------+-----+



In [13]:
# How accurate is the model? 
# we use MulticlassClassificationEvaluator for the accuracy of the model
# We can get the f1 score, accuracy, precision.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
my_mc_lr.evaluate(best_model_lr.predictions)

1.0

### Model Evaluation

In [14]:
predictions_lr = cvModel_lr.transform(testDF)
# As you can see below, the predictions dataframe contains the original data and the predictions.
predictions_lr.columns

['label',
 'message',
 'words',
 'rawFeatures',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [15]:
predictions_lr.show(5)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|             message|               words|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  0.0|2p per min to cal...|[2p, per, min, to...|(13587,[0,10,11,1...|(13587,[0,10,11,1...|[7.48979684801116...|[0.99944155557299...|       0.0|
|  0.0|3 FREE TAROT TEXT...|[3, free, tarot, ...|(13587,[0,10,11,5...|(13587,[0,10,11,5...|[3.35819247882461...|[0.96637208705479...|       0.0|
|  0.0|5 Free Top Polyph...|[5, free, top, po...|(13587,[0,3,15,34...|(13587,[0,3,15,34...|[4.52658264760498...|[0.98929818725677...|       0.0|
|  0.0|500 New Mobiles f...|[500, new, mobile...|(13587,[0,40,64,8...|(13587,[0,40,64,8...|[3.54605267190848...|[0.97197008438366.

In [16]:
# Show sample predictions
predictions_lr.select('label', 'prediction').show(5)

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 5 rows



In [17]:
predictions_lr.groupBy('label','prediction').count().show()
# It missed 21 spam messages but it got the ham ones correctly.

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  994|
|  0.0|       1.0|   33|
|  1.0|       0.0|    2|
|  0.0|       0.0|  121|
+-----+----------+-----+



In [18]:
# How accurate is the model?
# we use MulticlassClassificationEvaluator for the accuracy of the model
my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
my_mc_lr.evaluate(predictions_lr)

0.9695652173913043