In [1]:
from pyspark.sql import SparkSession

In [2]:
spark =SparkSession.builder.appName('NLP').getOrCreate()

In [3]:
data = spark.read.csv("smsspamcollection/SMSSpamCollection",inferSchema=True,sep='\t')


In [4]:
data=data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

In [5]:
data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [6]:
from pyspark.sql.functions import length

In [7]:
data = data.withColumn('length',length('text'))

In [8]:
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [9]:
data.groupBy('class').mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer,CountVectorizer,HashingTF,StopWordsRemover,StringIndexer,IDF,RegexTokenizer,VectorAssembler
from pyspark.ml.linalg import Vector

In [11]:
stages=[]

regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W+")
stopwordsremove = StopWordsRemover(inputCol=regexTokenizer.getOutputCol(),outputCol='cleaned')
countvectorizer = CountVectorizer(inputCol=stopwordsremove.getOutputCol(),outputCol='tf')
idf = IDF(inputCol=countvectorizer.getOutputCol(),outputCol='tf-idf')
class_indexer = StringIndexer(inputCol='class',outputCol='label')
assembler = VectorAssembler(inputCols=[idf.getOutputCol(),'length'],outputCol='features')

stages+= [regexTokenizer,stopwordsremove,countvectorizer,idf,class_indexer,assembler]



In [12]:
from pyspark.ml.classification import NaiveBayes,RandomForestClassifier

In [13]:
nb =NaiveBayes(featuresCol='features',labelCol='label')
#rf= RandomForestClassifier(numTrees=500)

In [14]:
pipeline = Pipeline(stages=stages)

In [15]:
pipeline_model =pipeline.fit(data)

In [16]:
cleaned_data = pipeline_model.transform(data)

In [17]:
cleaned_data.select('tf','text').take(1)[0].asDict()

{'tf': SparseVector(8623, {11: 1.0, 16: 1.0, 37: 1.0, 62: 1.0, 68: 1.0, 77: 1.0, 251: 1.0, 546: 1.0, 634: 1.0, 772: 1.0, 1258: 1.0, 1300: 1.0, 1311: 1.0, 2883: 1.0, 6064: 1.0, 8037: 1.0}),
 'text': 'Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...'}

In [18]:
cleaned_data = cleaned_data.select('label','features')

In [19]:
train_data,test_data = cleaned_data.randomSplit([0.8,0.2])

In [20]:
train_data.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(8624,[0,1,2,6,11...|
|  0.0|(8624,[0,1,2,6,28...|
|  0.0|(8624,[0,1,2,10,3...|
|  0.0|(8624,[0,1,2,20,2...|
|  0.0|(8624,[0,1,2,24,4...|
+-----+--------------------+
only showing top 5 rows



In [21]:
spam_detector_nb = nb.fit(cleaned_data)


In [22]:
#spam_detector_rf = rf.fit(cleaned_data)

In [23]:
prediction_nb = spam_detector_nb.transform(test_data)
#prediction_rf = spam_detector_rf.transform(test_data)

prediction_nb.show(5)
#prediction_rf.show(5)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(8624,[0,1,4,14,1...|[-608.05107109582...|[1.0,1.3403087310...|       0.0|
|  0.0|(8624,[0,1,9,12,3...|[-299.23130723160...|[1.0,7.7902466518...|       0.0|
|  0.0|(8624,[0,1,13,29,...|[-391.19216396961...|[1.0,3.0928713747...|       0.0|
|  0.0|(8624,[0,1,18,31,...|[-1179.4829387152...|[1.0,4.8973870757...|       0.0|
|  0.0|(8624,[0,1,29,37,...|[-288.26794061463...|[1.0,1.2372242523...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [24]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [25]:
acc =MulticlassClassificationEvaluator(metricName='accuracy')

In [28]:
acc_check = acc.evaluate(prediction_nb)

In [29]:
acc_check

0.9953445065176909

In [30]:
print("Accuracy of model at predicting spam was: {}".format(acc_check))

Accuracy of model at predicting spam was: 0.9953445065176909
