In [0]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import StringIndexer
from math import log
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, col


In [0]:
df = spark.read.option("header", "true") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv("/FileStore/tables/spam.csv") \
    .withColumnRenamed("Type", "label_string") \
    .withColumnRenamed("Message", "sms")

In [0]:
regexTokenizer = RegexTokenizer(inputCol="sms", outputCol="tokens", pattern="\\W+")
regexTokenized = regexTokenizer.transform(df)
regexTokenized = regexTokenized.withColumn('tokens',F.expr("array_remove(transform(tokens, x -> regexp_replace(x, '[0-9]', '')), '') as tokens"))

stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="tokens_stopwords")
removed = stopwords_remover.transform(regexTokenized)

indexer = StringIndexer(inputCol="label_string", outputCol="label")
indexed = indexer.fit(removed).transform(removed)

In [0]:
train, test = indexed.randomSplit([0.7, 0.3], seed = 2018)

In [0]:
# Training Phase
num_ham = train.select('label').where(train.label==0.0).count() # count of all rows training dataset: ham
num_spam = train.select('label').where(train.label==1.0).count() # count of all rows training dataset: spam

In [0]:
distinct_ham = train.select(F.explode('tokens_stopwords').alias('distinct_ham')).where(train.label==0.0).distinct()
distinct_spam = train.select(F.explode('tokens_stopwords').alias('distinct_spam')).where(train.label==1.0).distinct()

unique_ham = distinct_ham.count() # count of unique words in ham training dataset 
unique_spam = distinct_spam.count() # count of unique words in spam training dataset 

In [0]:
ham_text = train.where(train.label==0).select('tokens_stopwords','label') # all rows of training dataset: ham 
spam_text = train.where(train.label==1).select('tokens_stopwords','label') # all rows of training dataset: spam

In [0]:
ham_prior = ham_text.count() / train.count() # prior for training dataset: ham 
spam_prior = spam_text.count() / train.count() # prior for training dataset: spam 
print("ham_prior = ", ham_prior)
print("spam_prior = ", spam_prior)

ham_prior =  0.8675213675213675
spam_prior =  0.13247863247863248


In [0]:
all_ham = train.select(F.explode('tokens_stopwords').alias('distinct_ham')).where(train.label==0.0).count() # count of all words in training dataset: ham
all_spam = train.select(F.explode('tokens_stopwords').alias('distinct_spam')).where(train.label==1.0).count() # count of all words in training dataset: spam

In [0]:
key_list = distinct_ham.union(distinct_spam).withColumnRenamed("distinct_ham","all_words") # all words in ham and spam
total_type = key_list.count() # count of all words in ham and spam

In [0]:
ham_words_with_count = train.select(F.explode('tokens_stopwords').alias('ham_words')).where(train.label==0.0).groupBy('ham_words').count().withColumn("cond_prob", lit(0.0).cast('long'))
total_den_ham =  all_ham + total_type
ham_train_count = ham_words_with_count.rdd.map(lambda x: (x[0], x[1], x[1] * 1.0/ total_den_ham)).toDF(["ham_words","count","cond_prob"]) # all ham words woth conditional prob

spam_words_with_count = train.select(F.explode('tokens_stopwords').alias('spam_words')).where(train.label==1.0).groupBy('spam_words').count().withColumn("cond_prob", lit(0.0).cast('long'))
total_den_spam = all_spam + total_type
spam_train_count = spam_words_with_count.rdd.map(lambda x: (x[0], x[1], x[1] * 1.0 / total_den_spam)).toDF(["spam_words","count","cond_prob"]) # all spam words woth conditional prob

In [0]:
# Testing Phase
test_data = test.select('tokens_stopwords', 'label')

In [0]:
distinct_ham_list = distinct_ham.select('distinct_ham').rdd.flatMap(lambda x: x).collect() # convert to list
distinct_spam_list = distinct_spam.select('distinct_spam').rdd.flatMap(lambda x: x).collect() # convert to list

In [0]:
def predict(row):
    score1 = log(ham_prior)
    score2 = log(spam_prior)
    ham_not_present = [x for x in row if x not in distinct_ham_list]
    spam_not_present = [x for x in row if x not in distinct_spam_list]
    if len(ham_not_present) > 0:         
        score1 += len(ham_not_present) * log(1 / total_den_ham)
    if len(spam_not_present) > 0:
        score2 += len(spam_not_present) * log(1 / total_den_spam)
        
    ham_add = ham_train_count.filter(ham_train_count.ham_words.isin(row)).withColumn('log_prob',F.log(col('cond_prob'))).select('log_prob').groupby().sum().collect()[0][0]
    words_not_ham = ham_train_count.filter(~ham_train_count.ham_words.isin(row)).withColumn("log_prob", F.log(1 - col("cond_prob"))).select('log_prob').groupby().sum().collect()[0][0]
    if (ham_add != None) and (words_not_ham != None):
         score1 += ham_add + words_not_ham
            
    spam_add = spam_train_count.filter(spam_train_count.spam_words.isin(row)).withColumn('log_prob',F.log(col('cond_prob'))).select('log_prob').groupby().sum().collect()[0][0]
    words_not_spam = spam_train_count.filter(~spam_train_count.spam_words.isin(row)).withColumn("log_prob", F.log(1 - col("cond_prob"))).select('log_prob').groupby().sum().collect()[0][0]
    
    if (spam_add != None) and (words_not_spam != None):
        score2 += spam_add + words_not_spam
        
    if (score1 >= score2): 
        return 0
    
    else: 
        return 1

In [0]:
correct = 0
total_len = test_data.count()
rows = test_data.collect()
for row in rows:
    label = row['label']
    text = row['tokens_stopwords']
    predicted_label = predict(text)
    if label == predicted_label:
        correct += 1
        
accuracy = correct / total_len
print('Accuracy = ' , accuracy * 100)

Accuracy =  91.48936170212765
