# NLP Code Along

For this code along we will build a spam filter! We'll use the various NLP tools we learned about as well as a new classifier, Naive Bayes.

We'll use a classic dataset for this - UCI Repository SMS Spam Detection: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('nlp').getOrCreate()

21/11/15 10:21:32 WARN Utils: Your hostname, xtian-pc resolves to a loopback address: 127.0.1.1; using 192.168.1.39 instead (on interface enp5s0)
21/11/15 10:21:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/11/15 10:21:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
data = spark.read.csv("smsspamcollection/SMSSpamCollection",inferSchema=True,sep='\t')

In [4]:
data.columns

['_c0', '_c1']

In [9]:
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

In [10]:
data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



## Clean and Prepare the Data

** Create a new length feature: **

In [11]:
from pyspark.sql.functions import length

In [18]:
data = data.withColumn('length',length(data['text']))

In [88]:
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [19]:
# Pretty Clear Difference
data.groupby('class').mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## Feature Transformations

In [22]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, \
    CountVectorizer,IDF,StringIndexer

In [24]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [35]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [40]:
# Here we create instances of transformations to later be used in a pipeline

# Converts string to tokens, ie words
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
# Remove common stop word (here in english)
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
# Convert tokens to vectors of token counts
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
# Transform into TF-IDF representation space
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
# Label
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')

In [41]:
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

### The Model

In [42]:
from pyspark.ml.classification import NaiveBayes

In [43]:
# Naive Bayes. First naive approach =D)
# Use defaults
nb = NaiveBayes()

### Pipeline

In [44]:
from pyspark.ml import Pipeline

In [45]:
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])

In [55]:
# Fir the pipeline to convert the data.
cleaner = data_prep_pipe.fit(data)

In [56]:
# Once fitted, transform the data to the new space
clean_data = cleaner.transform(data)

### Training and Evaluation!

In [57]:
clean_data = clean_data.select(['label','features'])

In [58]:
clean_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,139...|
|  0.0|(13424,[10,53,103...|
|  0.0|(13424,[125,184,4...|
|  1.0|(13424,[1,47,118,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,120...|
|  1.0|(13424,[8,17,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,96,217...|
|  0.0|(13424,[552,1697,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,47...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



In [59]:
(training,testing) = clean_data.randomSplit([0.7,0.3])

In [61]:
# Train NB classifier
spam_predictor = nb.fit(training)

21/11/15 11:22:51 WARN DAGScheduler: Broadcasting large task binary with size 1132.7 KiB
21/11/15 11:22:51 WARN DAGScheduler: Broadcasting large task binary with size 1115.9 KiB


In [65]:
data.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)



In [66]:
test_results = spam_predictor.transform(testing)

In [67]:
test_results.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,7,8...|[-796.28157606645...|[1.0,2.9764935608...|       0.0|
|  0.0|(13424,[0,1,5,15,...|[-1001.3641105757...|[1.0,2.9823686975...|       0.0|
|  0.0|(13424,[0,1,7,8,1...|[-1167.0438651259...|[1.0,6.6126438305...|       0.0|
|  0.0|(13424,[0,1,7,15,...|[-655.22982889938...|[1.0,1.3387658512...|       0.0|
|  0.0|(13424,[0,1,9,14,...|[-535.53117979767...|[1.0,1.0410701268...|       0.0|
|  0.0|(13424,[0,1,15,20...|[-669.45710288035...|[1.0,5.3959124386...|       0.0|
|  0.0|(13424,[0,1,20,27...|[-984.56082197140...|[0.99999999994686...|       0.0|
|  0.0|(13424,[0,1,24,31...|[-357.73434821515...|[1.0,8.6230904270...|       0.0|
|  0.0|(13424,[0,1,30,12...|[-592.47127424884...|[1.0,4.1229514811...|       0.0|
|  0.0|(13424,[0

21/11/15 11:23:15 WARN DAGScheduler: Broadcasting large task binary with size 1349.3 KiB


In [68]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [73]:
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))

Accuracy of model at predicting spam was: 0.9320956093091356


21/11/15 11:25:15 WARN DAGScheduler: Broadcasting large task binary with size 1354.9 KiB


Not bad considering we're using straight math on text data! Try switching out the classification models! Or even try to come up with other engineered features!