In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('nlp').getOrCreate()

In [3]:
data = spark.read.csv("/FileStore/tables/SMSSpamCollection.csv",inferSchema=True,sep=',')

In [4]:
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

In [5]:
data.show(truncate = False)

In [6]:
from pyspark.sql.functions import length

In [7]:
data = data.withColumn('length',length(data['text']))

In [8]:
data.show()

In [9]:
# Pretty Clear Difference
data.groupby('class').mean().show()

In [10]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer

tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')

In [11]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [12]:
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

In [13]:
from pyspark.ml.classification import NaiveBayes

In [14]:
nb = NaiveBayes()

In [15]:
from pyspark.ml import Pipeline

In [16]:
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])

In [17]:
cleaner = data_prep_pipe.fit(data)
clean_data = cleaner.transform(data)

In [18]:
clean_data = clean_data.select(['label','features'])
clean_data.show()

In [19]:
(training,testing) = clean_data.randomSplit([0.7,0.3])

In [20]:
spam_predictor = nb.fit(training)

In [21]:
data.printSchema()

In [22]:
test_results = spam_predictor.transform(testing)

In [23]:
test_results.show()

In [24]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [25]:
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))