In [1]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("SpamDetection").config("spark.ui.enabled", "true")\
       .config("spark.eventLog.enabled", "true")\
       .config("spark.eventLog.dir", "/home/raghavendr48edu/logs")\
       .config("spark.yarn.resourcemanager.hostname", "ip-10-1-1-204.ap-south-1.compute.internal") \
       .config("spark.yarn.resourcemanager.webapp.address", "ip-10-1-1-204.ap-south-1.compute.internal:6066") \
       .config("spark.executor.memory", "4g") \
       .config("spark.driver.memory", "4g") \
       .getOrCreate()

#### Get headers using withColumnRenamed

#### get headers by converting to DF

In [3]:
'''headers = ["header1", "header2", "header3"]  # Replace with your column names
dataframe = dataframe.toDF(*headers)'''
df=spark.read.option("delimiter","\t").csv("SMSSpamCollection").toDF("spam","message")
df.show(5)
df.printSchema()
df=df.withColumn("length",F.length(df['message']))
df.show(5)
df.groupBy('spam').mean().show(5)

+----+--------------------+
|spam|             message|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
+----+--------------------+
only showing top 5 rows

root
 |-- spam: string (nullable = true)
 |-- message: string (nullable = true)

+----+--------------------+------+
|spam|             message|length|
+----+--------------------+------+
| ham|Go until jurong p...|   111|
| ham|Ok lar... Joking ...|    29|
|spam|Free entry in 2 a...|   155|
| ham|U dun say so earl...|    49|
| ham|Nah I don't think...|    61|
+----+--------------------+------+
only showing top 5 rows

+----+-----------------+
|spam|      avg(length)|
+----+-----------------+
| ham|71.45431945307645|
|spam|138.6706827309237|
+----+-----------------+



* Spam messages has higher length

### Preprocessing
* 1.Extract the word
* 2.Removed stop words. 
* 3.Modify the stop words to include your custom words such as ‘-‘
* 4.Create the features from SMS message using CountVectorizer


### Unigrams

In [4]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,IDF,StringIndexer,VectorAssembler

tokenizer=Tokenizer(inputCol="message",outputCol="token_text")
stop_word_remover=StopWordsRemover(inputCol="token_text",outputCol="stop_tokens")
#stopwords = StopWordsRemover().getStopWords() + ["-"] less accuracy on having this
#stop_word_remover = StopWordsRemover().setStopWords(stopwords).setInputCol("token_text").setOutputCol("stop_tokens")
count_vec=CountVectorizer(inputCol="stop_tokens",outputCol="c_vec") # you can use hashdocumentfrequency
idf=IDF(inputCol="c_vec",outputCol="tf_idf")
ham_spam_to_num=StringIndexer(inputCol="spam",outputCol="label")
cleaned=VectorAssembler(inputCols=['tf_idf','length'],outputCol="features")

from pyspark.ml import Pipeline
pipeline=Pipeline(stages=[ham_spam_to_num,tokenizer,stop_word_remover,count_vec,idf,cleaned])
cleaner=pipeline.fit(df)
clean_df=cleaner.transform(df)
clean_df.show(5)

+----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|spam|             message|length|label|          token_text|         stop_tokens|               c_vec|              tf_idf|            features|
+----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
| ham|Go until jurong p...|   111|  0.0|[go, until, juron...|[go, jurong, poin...|(13423,[7,11,31,6...|(13423,[7,11,31,6...|(13424,[7,11,31,6...|
| ham|Ok lar... Joking ...|    29|  0.0|[ok, lar..., joki...|[ok, lar..., joki...|(13423,[0,24,297,...|(13423,[0,24,297,...|(13424,[0,24,297,...|
|spam|Free entry in 2 a...|   155|  1.0|[free, entry, in,...|[free, entry, 2, ...|(13423,[2,13,19,3...|(13423,[2,13,19,3...|(13424,[2,13,19,3...|
| ham|U dun say so earl...|    49|  0.0|[u, dun, say, so,...|[u, dun, say, ear...|(13423,[0,70,80,1...|(13423,[0,70,80,1...|

### CountVectorizer Vocabulary

### CountVectorization , TFIDF , VectorAssembler o/p's

### Split the data into train and test -decide on a strategy

In [5]:
clean_df=clean_df.select(['label','features'])
(train,test)=clean_df.randomSplit([0.7,0.3],seed=42)

from pyspark.ml.classification import NaiveBayes
nb=NaiveBayes()
pred=nb.fit(train)
res=pred.transform(test)
res.show(5)


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,41,...|[-1060.7325420854...|[1.0,9.6391158107...|       0.0|
|  0.0|(13424,[0,1,5,20,...|[-803.13623340156...|[1.0,2.7071860143...|       0.0|
|  0.0|(13424,[0,1,7,8,1...|[-1152.0926413349...|[1.0,6.3682506790...|       0.0|
|  0.0|(13424,[0,1,7,15,...|[-656.71821333935...|[1.0,7.6641099247...|       0.0|
|  0.0|(13424,[0,1,12,33...|[-444.22584589378...|[1.0,1.4534997554...|       0.0|
|  0.0|(13424,[0,1,14,18...|[-1361.3769955108...|[1.0,3.0750315514...|       0.0|
|  0.0|(13424,[0,1,14,31...|[-215.48901780946...|[1.0,4.5987538642...|       0.0|
|  0.0|(13424,[0,1,18,20...|[-839.83426960294...|[1.0,9.2487549816...|       0.0|
|  0.0|(13424,[0,1,21,27...|[-757.86523084176...|[1.0,1.0965819687...|       0.0|
|  0.0|(13424,[0

### TestFeatures, RawPredictions( extracted from classProbabilities,FeatureProbabilities)

### Use logistic regression,DecisionTreeClassifier, RandomForestClassifier,GBTClassifier and check the accuracy

In [6]:
from pyspark.ml.classification import NaiveBayes,LogisticRegression,DecisionTreeClassifier, RandomForestClassifier,GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator 
def build_model(evaluator):
    models=[]
    models.append(('LogisticRegression',LogisticRegression()))
    models.append(('lr_with_params',LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)))
    models.append(('NaiveBayes',NaiveBayes()))
    models.append(('DecisionTreeClassifier',DecisionTreeClassifier()))
    models.append(('GradientBoostClassifier',GBTClassifier()))
    models.append(('RandomForestClassifier',RandomForestClassifier()))
    for name,model in models:
        pred=model.fit(train)    
        result=pred.transform(test)
        accuracy=evaluator.evaluate(result)
        print(f"{name} accuracy is {accuracy*100}")

uni_bin=build_model(BinaryClassificationEvaluator())
uni_bin_param=build_model(BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("prediction").setMetricName("areaUnderROC"))
uni_mul=build_model(MulticlassClassificationEvaluator())

LogisticRegression accuracy is 95.28848387140773
lr_with_params accuracy is 50.0
NaiveBayes accuracy is 18.28180290464936
DecisionTreeClassifier accuracy is 48.964381839793425
GradientBoostClassifier accuracy is 97.34708237126499
RandomForestClassifier accuracy is 93.43550612763516
LogisticRegression accuracy is 87.5
lr_with_params accuracy is 50.0
NaiveBayes accuracy is 94.36746000300575
DecisionTreeClassifier accuracy is 80.28073039771563
GradientBoostClassifier accuracy is 83.65502848633068
RandomForestClassifier accuracy is 50.0
LogisticRegression accuracy is 96.46661770212962
lr_with_params accuracy is 80.51199958122098
NaiveBayes accuracy is 92.66021977210805
DecisionTreeClassifier accuracy is 93.37090926376854
GradientBoostClassifier accuracy is 94.54026726766364
RandomForestClassifier accuracy is 80.51199958122098


In [7]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
rf = RandomForestClassifier()
param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 50, 100]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()
evaluator = MulticlassClassificationEvaluator()
cv = CrossValidator(estimator=rf,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=5,
                    seed=42)
cv_model = cv.fit(train)
best_model = cv_model.bestModel
predictions = best_model.transform(test)
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)


Accuracy: 0.8875938854992451


### Bigrams

In [8]:
from pyspark.ml.feature import NGram

tokenizer=Tokenizer(inputCol="message",outputCol="token_text")
stop_word_remover=StopWordsRemover(inputCol="token_text",outputCol="stop_tokens")
ngram = NGram().setN(2).setInputCol("stop_tokens").setOutputCol("ngrams")
cv_ngram_model = CountVectorizer().setInputCol("ngrams").setOutputCol("c_vec")
idf=IDF(inputCol="c_vec",outputCol="tf_idf")
ham_spam_to_num=StringIndexer(inputCol="spam",outputCol="label")
cleaned=VectorAssembler(inputCols=['tf_idf','length'],outputCol="features")

pipeline1=Pipeline(stages=[ham_spam_to_num,tokenizer,stop_word_remover,ngram,cv_ngram_model,idf,cleaned])
cleaner1=pipeline1.fit(df)
clean_df1=cleaner1.transform(df)
clean_df1.show(5)

clean_df1=clean_df1.select(['label','features'])
(train,test)=clean_df1.randomSplit([0.7,0.3],seed=42)


big_bin=build_model(BinaryClassificationEvaluator())
big_bin_param=build_model(BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("prediction").setMetricName("areaUnderROC"))
big_mul=build_model(MulticlassClassificationEvaluator())

+----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|spam|             message|length|label|          token_text|         stop_tokens|              ngrams|               c_vec|              tf_idf|            features|
+----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| ham|Go until jurong p...|   111|  0.0|[go, until, juron...|[go, jurong, poin...|[go jurong, juron...|(36668,[8357,9892...|(36668,[8357,9892...|(36669,[8357,9892...|
| ham|Ok lar... Joking ...|    29|  0.0|[ok, lar..., joki...|[ok, lar..., joki...|[ok lar..., lar.....|(36668,[13150,136...|(36668,[13150,136...|(36669,[13150,136...|
|spam|Free entry in 2 a...|   155|  1.0|[free, entry, in,...|[free, entry, 2, ...|[free entry, entr...|(36668,[44,56,113...|(36668,[44,56,113...|(36669,[44,56,113...

### Trigrams

In [9]:

tokenizer=Tokenizer(inputCol="message",outputCol="token_text")
transformed = tokenizer.transform(df)
#transformed.show(1)

stop_word_remover=StopWordsRemover(inputCol="token_text",outputCol="stop_tokens")
stop_word_transformed = stop_word_remover.transform(transformed)
#stop_word_transformed.show(1)

ngram = NGram().setN(3).setInputCol("stop_tokens").setOutputCol("ngrams")
ngramDataFrame = ngram.transform(stop_word_transformed)
#ngramDataFrame.select("ngrams").show(2, False)

cv_ngram_model = CountVectorizer().setInputCol("ngrams").setOutputCol("c_vec").fit(ngramDataFrame)
cv_featured = cv_ngram_model.transform(ngramDataFrame)
#cv_featured.show(1)

idf=IDF(inputCol="c_vec",outputCol="tf_idf")
ham_spam_to_num=StringIndexer(inputCol="spam",outputCol="label")
cleaned=VectorAssembler(inputCols=['tf_idf','length'],outputCol="features")

pipeline1=Pipeline(stages=[ham_spam_to_num,tokenizer,stop_word_remover,ngram,cv_ngram_model,idf,cleaned])
cleaner1=pipeline1.fit(df)
clean_df1=cleaner1.transform(df)
clean_df1.show(5)

clean_df1=clean_df1.select(['label','features'])
(train,test)=clean_df1.randomSplit([0.7,0.3],seed=42)


tri_bin=build_model(BinaryClassificationEvaluator())
trin_bin_param=build_model(BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("prediction").setMetricName("areaUnderROC"))
tri_mul=build_model(MulticlassClassificationEvaluator())

+----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|spam|             message|length|label|          token_text|         stop_tokens|              ngrams|               c_vec|              tf_idf|            features|
+----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| ham|Go until jurong p...|   111|  0.0|[go, until, juron...|[go, jurong, poin...|[go jurong point,...|(36208,[5516,5797...|(36208,[5516,5797...|(36209,[5516,5797...|
| ham|Ok lar... Joking ...|    29|  0.0|[ok, lar..., joki...|[ok, lar..., joki...|[ok lar... joking...|(36208,[9744,9906...|(36208,[9744,9906...|(36209,[9744,9906...|
|spam|Free entry in 2 a...|   155|  1.0|[free, entry, in,...|[free, entry, 2, ...|[free entry 2, en...|(36208,[21,1534,1...|(36208,[21,1534,1...|(36209,[21,1534,1...

In [11]:
from pyspark.ml import PipelineModel

tokenizer=Tokenizer(inputCol="message",outputCol="token_text")
stop_word_remover=StopWordsRemover(inputCol="token_text",outputCol="stop_tokens")
#stopwords = StopWordsRemover().getStopWords() + ["-"] less accuracy on having this
#stop_word_remover = StopWordsRemover().setStopWords(stopwords).setInputCol("token_text").setOutputCol("stop_tokens")
count_vec=CountVectorizer(inputCol="stop_tokens",outputCol="c_vec") # you can use hashdocumentfrequency
idf=IDF(inputCol="c_vec",outputCol="tf_idf")
ham_spam_to_num=StringIndexer(inputCol="spam",outputCol="label")
cleaned=VectorAssembler(inputCols=['tf_idf','length'],outputCol="features")
lr=LogisticRegression()
pipeline=Pipeline(stages=[ham_spam_to_num,tokenizer,stop_word_remover,count_vec,idf,cleaned,lr])
cleaner=pipeline.fit(df)
cleaner.write().overwrite().save("/user/raghavendr48edu/spam_model")

In [12]:
pipeline = PipelineModel.load("/user/raghavendr48edu/spam_model")
pipeline

PipelineModel_a74d373ce022