<b>`findspark` is a  python library which is used to find the location of the Spark installed on the machine.</b>

In [1]:
import findspark
findspark.init()

### Initializing Spark Session

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('sms') \
                    .getOrCreate()


<b>the master node 'local[*]' tells Spark to run locally with as many worker threads as logical cores on your machine.</b>

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

sms = spark.read.csv("sms.csv", sep=';', header=False, schema=schema)

sms.printSchema()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



<b>The data has no headers therefore I explicitly define the schema</b>

In [4]:
from pyspark.sql.functions import length
sms = sms.withColumn('length', length(sms['text']))
sms.show()

+---+--------------------+-----+------+
| id|                text|label|length|
+---+--------------------+-----+------+
|  1|Sorry, I'll call ...|    0|    33|
|  2|Dont worry. I gue...|    0|    30|
|  3|Call FREEPHONE 08...|    1|    33|
|  4|Win a 1000 cash p...|    1|    43|
|  5|Go until jurong p...|    0|   111|
|  6|Ok lar... Joking ...|    0|    29|
|  7|Free entry in 2 a...|    1|   155|
|  8|U dun say so earl...|    0|    49|
|  9|Nah I don't think...|    0|    61|
| 10|FreeMsg Hey there...|    1|   146|
| 11|Even my brother i...|    0|    77|
| 12|As per your reque...|    0|   158|
| 13|WINNER!! As a val...|    1|   156|
| 14|Had your mobile 1...|    1|   154|
| 15|I'm gonna be home...|    0|   109|
| 16|SIX chances to wi...|    1|   136|
| 17|URGENT! You have ...|    1|   154|
| 18|I've been searchi...|    0|   196|
| 19|I HAVE A DATE ON ...|    0|    35|
| 20|XXXMobileMovieClu...|    1|   149|
+---+--------------------+-----+------+
only showing top 20 rows



<b>Next, I create a new column 'length' which signifies the length of the SMS.</b>

In [5]:
sms.groupBy('label').avg('length').show()

+-----+------------------+
|label|       avg(length)|
+-----+------------------+
|    0| 70.70188522892066|
|    1|138.14457831325302|
+-----+------------------+



<b>This is interesting, spam messages are twice as long as regular messages.</b>

In [6]:
sms.printSchema()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- length: integer (nullable = true)



### Text Preprocessing

In [7]:
from pyspark.sql.functions import regexp_replace

wrangled = sms.withColumn('text', regexp_replace(sms.text, '[_():;,.!?\\-]', ' '))

wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, '[0-9]', ' '))

wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, ' +', ' '))

<b>Above, we remove anything other that letters (eg- punctuations,numbers and symbols)</b>

In [8]:
wrangled.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0| 4827|
|    1|  747|
+-----+-----+



<b>There are a total of 5574 SMS, of which only 747 have been labelled as spam.This dataset is highly imbalanced.As a classifier just predicting all the messages as not spam will get a accuracy of 87%.</b>

In [9]:
wrangled.show()

+---+--------------------+-----+------+
| id|                text|label|length|
+---+--------------------+-----+------+
|  1|Sorry I'll call l...|    0|    33|
|  2|Dont worry I gues...|    0|    30|
|  3| Call FREEPHONE now |    1|    33|
|  4|Win a cash prize ...|    1|    43|
|  5|Go until jurong p...|    0|   111|
|  6|Ok lar Joking wif...|    0|    29|
|  7|Free entry in a w...|    1|   155|
|  8|U dun say so earl...|    0|    49|
|  9|Nah I don't think...|    0|    61|
| 10|FreeMsg Hey there...|    1|   146|
| 11|Even my brother i...|    0|    77|
| 12|As per your reque...|    0|   158|
| 13|WINNER As a value...|    1|   156|
| 14|Had your mobile m...|    1|   154|
| 15|I'm gonna be home...|    0|   109|
| 16|SIX chances to wi...|    1|   136|
| 17|URGENT You have w...|    1|   154|
| 18|I've been searchi...|    0|   196|
| 19|I HAVE A DATE ON ...|    0|    35|
| 20|XXXMobileMovieClu...|    1|   149|
+---+--------------------+-----+------+
only showing top 20 rows



###  Pipeline

In [10]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol='text', outputCol='words')

remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='terms')

hasher = HashingTF(inputCol=remover.getOutputCol(), outputCol="hash")

idf = IDF(inputCol=hasher.getOutputCol(), outputCol="features")

pipeline = Pipeline(stages=[tokenizer, remover, hasher, idf])

<b>First we tokenize the text into individual tokens,then remove stopwords.After that I perform hashing(hashing provides a fast and space-efficient way to map a huge number of words present in the SMS messages onto a smaller, finite number of values.At last I create a TF-IDF matrix which gives relatively higher importance to words that are rare across documents.<br>
Next, I create a pipeline which wraps all of the above steps. </b>

In [11]:
pipeline_model = pipeline.fit(wrangled)

In [12]:
sms_transformed = pipeline_model.transform(wrangled)

In [13]:
sms_train, sms_test = sms_transformed.randomSplit([0.8, 0.2], seed=13)

### Class Weights (Handling Imbalanced Data)

<b>Since where we have 87% positives (label == 0) in the dataset, so theoretically we want to "under-sample" the positive class. So that The logistic loss objective function should treat the negative class (label == 1) with higher weight.</b>

In [14]:
dataset_size=float(sms_train.select("label").count())
numPositives=sms_train.select("label").where('label == 1').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))

The number of ones are 601
Percentage of ones are 13.421170165252343


In [15]:
BalancingRatio= numNegatives/dataset_size
print('BalancingRatio = {}'.format(BalancingRatio))

BalancingRatio = 0.8657882983474765


In [16]:
from pyspark.sql.functions import when
sms_train=sms_train.withColumn("classWeights", when(sms_train.label == 1,BalancingRatio).otherwise(1-BalancingRatio))
sms_train.select("classWeights","label","features").show(5)

+------------------+-----+--------------------+
|      classWeights|label|            features|
+------------------+-----+--------------------+
|0.1342117016525235|    0|(262144,[78820,10...|
|0.8657882983474765|    1|(262144,[46213,10...|
|0.8657882983474765|    1|(262144,[51247,77...|
|0.1342117016525235|    0|(262144,[38555,52...|
|0.8657882983474765|    1|(262144,[29259,68...|
+------------------+-----+--------------------+
only showing top 5 rows



<b>Here we give a weight of ~0.87 to spam messages and ~0.13 to non spam messages.</b>

### Model Building and Evaluation

In [17]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="label", featuresCol="features",weightCol="classWeights",maxIter=10)
lr_model = lr.fit(sms_train)

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
prediction = lr_model.transform(sms_test)
print("The area under ROC for test set {}".format(evaluator.evaluate(prediction)))

The area under ROC for test set 0.9720259552992109


<b>Wow! we get a roc-auc score of 97% with our baseline model.But it is a fact that F1 score is a better evaluation metric than roc-auc when dealing with imbalanced datasets (see [here](https://www.kaggle.com/lct14558/imbalanced-data-why-you-should-not-use-roc-curve)),so we will consider F1 score as well.</b>

In [19]:
prediction.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+------------------------------------------+
|label|prediction|probability                               |
+-----+----------+------------------------------------------+
|0    |0.0       |[0.9952162043831404,0.004783795616859571] |
|0    |0.0       |[0.9999853417220804,1.4658277919459315E-5]|
|1    |1.0       |[1.2911360078789951E-5,0.9999870886399211]|
|1    |0.0       |[0.9769451956618017,0.023054804338198266] |
|0    |0.0       |[0.9596079884065271,0.04039201159347307]  |
+-----+----------+------------------------------------------+
only showing top 5 rows



In [20]:
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    0|       0.0|  942|
|    1|       1.0|  126|
|    1|       0.0|   20|
|    0|       1.0|    8|
+-----+----------+-----+



<b>F1 score is basically the harmonic mean of precision and recall, the harmonic mean gives much more weight to low values.As a result, the classifier will only get a high F1 score if both recall and precision are high.</b>

In [21]:
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

precision = TP/(TP+FP)
recall = TP/(TP+FN)
F1score = (2*precision*recall)/(precision+recall)
print('precision = {:.2f}\nrecall    = {:.2f}\nF1 Score  = {:.2f}'.format(precision, recall,F1score))

precision = 0.94
recall    = 0.86
F1 Score  = 0.90


<b>As you can see we achieved a roc-auc score of 97% but a F1 score of just 90%.With hyper parameter using Grid search I think we can achieve much better results!</b>

### Grid Search and Cross Validation

In [22]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

params = ParamGridBuilder().addGrid(hasher.numFeatures, [1024, 4096, 16384]) \
                           .addGrid(hasher.binary, [True, False]) \
                           .addGrid(lr.regParam, [0.01, 0.1, 1.0, 10.0]) \
                           .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
                           .build()

cv = CrossValidator(estimator=lr, estimatorParamMaps=params,evaluator = evaluator, numFolds=5)

<b>There are 72 points in the parameter grid and 5 folds in the cross-validator.Therefore 360 models are built,it took me 11 minutes for training.</b>

In [23]:
lr_cv_model = cv.fit(sms_train)

In [24]:
prediction_cv = lr_cv_model.transform(sms_test)
print("The area under ROC for test set after cv {}".format(evaluator.evaluate(prediction_cv)))

The area under ROC for test set after cv 0.9843186733958208


In [25]:
TN = prediction_cv.filter('prediction = 0 AND label = prediction').count()
TP = prediction_cv.filter('prediction = 1 AND label = prediction').count()
FN = prediction_cv.filter('prediction = 0 AND label != prediction').count()
FP = prediction_cv.filter('prediction = 1 AND label != prediction').count()

precision = TP/(TP+FP)
recall = TP/(TP+FN)
F1score = (2*precision*recall)/(precision+recall)
print('precision = {:.2f}\nrecall    = {:.2f}\nF1 Score  = {:.2f}'.format(precision, recall,F1score))

precision = 0.98
recall    = 0.88
F1 Score  = 0.93


<b>To conclude after all the hyper parameter tuning we end up witha model with a roc-auc score of 98% and F1 score of 93%.<br>It is worth mentioning that without adding the class weights we will end up with a model with an F1 score of 0 as precision was 0 but a roc-auc score of nearly 100%.This is because of the highly imbalanced data the model blindly predicts all the messages to be not spam.So it is very important to address the problem of imbalanced datasets.  </b>