In [42]:
from pyspark.sql import SparkSession

In [43]:
spark = SparkSession.builder.appName('original_NB').getOrCreate()

In [44]:
# Import csv of spam and ham (not spam)
start_data = spark.read.format("csv").option("header", "true").load("spam.csv")
start_data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if that��s t...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [45]:
# Create a length column to be used as a future feature 
from pyspark.sql.functions import length
data = start_data.withColumn('length', length(start_data['text']))
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if that��s t...|    58|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



### Feature Transformations


In [46]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

# Create all the features to the data set
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')


In [47]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [48]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[ham_spam_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [49]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data)
cleaned = cleaner.transform(data)

In [50]:
# Show label of ham spame and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[3168,172...|
|  0.0|(262145,[122516,1...|
|  1.0|(262145,[7958,944...|
|  0.0|(262145,[28698,35...|
|  0.0|(262145,[2710,259...|
|  1.0|(262145,[8443,237...|
|  0.0|(262145,[2089,290...|
|  0.0|(262145,[38868,66...|
|  1.0|(262145,[61137,62...|
|  1.0|(262145,[10951,40...|
|  0.0|(262145,[6258,178...|
|  1.0|(262145,[28027,60...|
|  1.0|(262145,[7958,209...|
|  0.0|(262145,[55639,71...|
|  0.0|(262145,[41061,81...|
|  1.0|(262145,[7958,440...|
|  0.0|(262145,[24978,63...|
|  0.0|(262145,[739,1653...|
|  0.0|(262145,[30913,14...|
|  1.0|(262145,[7958,415...|
+-----+--------------------+
only showing top 20 rows



In [51]:
# Break data down into a training set and a testing set
(training, testing) = cleaned.randomSplit([0.7, 0.3])

In [52]:
from pyspark.ml.classification import NaiveBayes

# Create a Naive Bayes model and fit training data
nb = NaiveBayes(smoothing=1.0, modelType='multinomial')
spam_predictor = nb.fit(training)

In [53]:
# Tranform the model with the testing data
test_results = spam_predictor.transform(testing)
test_results.show(5)

+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  ham| &lt;#&gt;  mins ...|    51|  0.0|[, &lt;#&gt;, , m...|[, &lt;#&gt;, , m...|(262144,[2039,613...|(262144,[2039,613...|(262145,[2039,613...|[-339.95147265684...|[1.0,5.0310751047...|       0.0|
|  ham| and  picking the...|    41|  0.0|[, and, , picking...|[, , picking, var...|(262144,[48935,22...|(262144,[48935,22...|(262145,[48935,22...|[-294.27336555056...|[1.0,2.5345605328...|       0.0|


In [54]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print(f"Accuracy of model at predicting spam was: {acc}")

Accuracy of model at predicting spam was: 0.9646343717264626


In [55]:
spark.stop()