In [0]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import csv
from io import StringIO
import re
from math import log

In [0]:
spark = SparkSession.builder.appName("NaiveBayesSMS").getOrCreate()

In [0]:
df = spark.read.csv("dbfs:/FileStore/SpamMsgData.csv", inferSchema=True, header=False)
df = df.withColumnRenamed("_c0", "label").withColumnRenamed("_c1", "message")

In [0]:
def preprocess_row(row):
    label = row['label'].strip()
    message = row['message']
    text = message.strip() if message else ''
    tokens = re.findall(r'\b\w+\b', text.lower())
    return (label, tokens)

In [0]:
cleaned_data = df.rdd.map(preprocess_row).filter(lambda x: x[0] in ['ham', 'spam'] and len(x[1]) > 0)

In [0]:
train_data, test_data = cleaned_data.randomSplit([0.8, 0.2], seed=42)

In [0]:
doc_counts = train_data.map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)
total_docs = train_data.count()
priors = doc_counts.mapValues(lambda count: log(count / total_docs)).collectAsMap()
raw_priors = doc_counts.collectAsMap()

In [0]:
word_counts = train_data.flatMap(lambda x: [((x[0], word), 1) for word in x[1]]) \
                        .reduceByKey(lambda a, b: a + b)

In [0]:
total_words = word_counts.map(lambda x: (x[0][0], x[1])) \
                         .reduceByKey(lambda a, b: a + b) \
                         .collectAsMap()

In [0]:
vocab = word_counts.map(lambda x: x[0][1]).distinct().collect()
vocab_size = len(vocab)

In [0]:
word_probs = word_counts.map(lambda x: (
    x[0],  # (label, word)
    log((x[1] + 1) / (total_words[x[0][0]] + vocab_size))
)).collectAsMap()

In [0]:
def predict(tokens):
    label_scores = {}
    for label in priors:
        score = priors[label]
        for word in tokens:
            score += word_probs.get((label, word), log(1 / (total_words[label] + vocab_size)))
        label_scores[label] = score
    return max(label_scores.items(), key=lambda x: x[1])[0]

In [0]:
predictions = test_data.map(lambda x: (x[0], predict(x[1])))
correct_predictions = predictions.filter(lambda x: x[0] == x[1]).count()
total_predictions = predictions.count()
accuracy = correct_predictions / total_predictions


In [0]:
print("Naive Bayes Classifier Results Summary")
print("=========================================")
print(f"Total Test Samples        : {total_predictions}")
print(f"Correct Predictions       : {correct_predictions}")
print(f"Accuracy                  : {accuracy:.4f}")
print("-----------------------------------------")
print("Class Prior Probabilities (log scale):")
for label in raw_priors:
    log_prior = priors[label]
    count = raw_priors[label]
    print(f"P({label}) = {log_prior:.4f}  (Count: {count})")

Naive Bayes Classifier Results Summary
Total Test Samples        : 1085
Correct Predictions       : 1068
Accuracy                  : 0.9843
-----------------------------------------
Class Prior Probabilities (log scale):
P(ham) = -0.1465  (Count: 3874)
P(spam) = -1.9934  (Count: 611)
