In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import length
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as F

In [3]:
# initialize SparkSession
spark = SparkSession.builder.appName('nuveo_nlp').getOrCreate()

In [4]:
# define function to load the given data
def load_data(csv_data_path):
    loaded_data = spark.read.csv(csv_data_path,inferSchema=True,sep='\t')
    loaded_data = loaded_data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')
    return loaded_data

In [5]:
# load trainningset
data_tr = load_data("TrainingSet/sms-hamspam-train.csv")
data_tr.show(4)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
| spam|SMSSERVICES. for ...|
| spam|25p 4 alfie Moon'...|
| spam|U have a secret a...|
+-----+--------------------+
only showing top 4 rows



In [None]:
# display legth of each text
data_tr = data_tr.withColumn('length',length(data_tr['text']))
data_tr.show(4)

In [6]:
# tockenize text
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
# remove common tockens
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
# count vectors
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
# perform inverse doc freq
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
# adjust labels to spark
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')
# combine features
clean_up = VectorAssembler(inputCols=['tf_idf'],outputCol='features')

In [8]:
# import naivebayes and use defaults
nb = NaiveBayes()
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])
cleaner = data_prep_pipe.fit(data_tr)
clean_data = cleaner.transform(data_tr)
clean_data = clean_data.select(['label','features'])
clean_data.show(4)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(12088,[7,10,29,6...|
|  1.0|(12088,[20,49,203...|
|  1.0|(12088,[2,4,9,27,...|
|  1.0|(12088,[0,1,25,41...|
+-----+--------------------+
only showing top 4 rows



In [9]:
# split data to train & test and foreward model to spark
(training,testing) = clean_data.randomSplit([0.7,0.3])
spam_predictor = nb.fit(training)
test_results = spam_predictor.transform(testing)
test_results.show(4)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(12088,[0,1,2,7,9...|[-646.82868526919...|[1.0,1.9752667653...|       0.0|
|  0.0|(12088,[0,1,3,8,1...|[-485.67289961260...|[0.99999999999921...|       0.0|
|  0.0|(12088,[0,1,4,46,...|[-716.82603090970...|[1.0,2.4997650708...|       0.0|
|  0.0|(12088,[0,1,5,15,...|[-809.99569223678...|[1.0,4.1931462547...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 4 rows



In [10]:
# evaluate default performance
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))

Accuracy of model at predicting spam was: 0.9067091114071515
