# 13. Naive Bayes

## Imports

In [1]:
import glob
import math
import random
import re
import tarfile
import warnings
from collections import Counter, defaultdict
from io import BytesIO
from typing import Iterable, NamedTuple

import requests

In [2]:
random.seed(0)

In [3]:
warnings.filterwarnings("ignore")

In [4]:
# Importing functions from another notebook

%run -i 11_machine_learning.ipynb

## Spam Filter Implementation

In [5]:
# Function returns set of words from text

def tokenize(text: str) -> set[str]:
    text = text.lower()
    all_words = re.findall("[a-z0-9']+", text)
    return set(all_words)


assert tokenize("Data Science is science") == {"data", "science", "is"}

In [6]:
class Message(NamedTuple):
    text: str
    is_spam: bool

#### Naive Bayes classifier

In [7]:
class NaiveBayesClassifier:
    def __init__(self, k: float = 0.5) -> None:
        self.k = k
        self.tokens: set[str] = set()
        self.token_spam_counts: dict[str, int] = defaultdict(int)
        self.token_ham_counts: dict[str, int] = defaultdict(int)
        self.spam_messages = 0
        self.ham_messages = 0

    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages += 1

            for token in tokenize(message.text):
                self.tokens.add(token)
                if message.is_spam:
                    self.token_spam_counts[token] += 1
                else:
                    self.token_ham_counts[token] += 1

    def _probabilities(self, token: str) -> tuple[float, float]:
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]

        p_token_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
        p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)

        return p_token_spam, p_token_ham

    def predict(self, text: str) -> float:
        text_tokens = tokenize(text)
        log_prob_if_spam = 0.0
        log_prob_if_ham = 0.0

        for token in self.tokens:
            prob_if_spam, prob_if_ham = self._probabilities(token)
            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)
            else:
                log_prob_if_spam += math.log(1.0 - prob_if_spam)
                log_prob_if_ham += math.log(1.0 - prob_if_ham)

        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)

        return prob_if_spam / (prob_if_spam + prob_if_ham)

#### Testing naive Bayes classifier

In [8]:
messages = [
    Message("spam rules", is_spam=True),
    Message("ham rules", is_spam=False),
    Message("hello ham", is_spam=False),
]

In [9]:
model = NaiveBayesClassifier(k=0.5)
model.train(messages)

In [10]:
assert model.tokens == {"spam", "ham", "rules", "hello"}
assert model.spam_messages == 1
assert model.ham_messages == 2
assert model.token_spam_counts == {"spam": 1, "rules": 1}
assert model.token_ham_counts == {"ham": 2, "rules": 1, "hello": 1}

In [11]:
# Data for testing classifier

text = "hello spam"

probs_if_spam = [
    (1 + 0.5) / (1 + 2 * 0.5),
    1 - (0 + 0.5) / (1 + 2 * 0.5),
    1 - (1 + 0.5) / (1 + 2 * 0.5),
    (0 + 0.5) / (1 + 2 * 0.5),
]

probs_if_ham = [
    (0 + 0.5) / (2 + 2 * 0.5),
    1 - (2 + 0.5) / (2 + 2 * 0.5),
    1 - (1 + 0.5) / (2 + 2 * 0.5),
    (1 + 0.5) / (2 + 2 * 0.5),
]

p_if_spam = math.exp(sum(math.log(p) for p in probs_if_spam))
p_if_ham = math.exp(sum(math.log(p) for p in probs_if_ham))

In [12]:
assert model.predict(text) == p_if_spam / (p_if_spam + p_if_ham)

## Model Application

In [13]:
BASE_URL = "https://spamassassin.apache.org/old/publiccorpus"

In [14]:
FILES = [
    "20021010_easy_ham.tar.bz2",
    "20021010_hard_ham.tar.bz2",
    "20021010_spam.tar.bz2",
]

In [15]:
OUTPUT_DIR = "../data/spam_data"

In [16]:
for filename in FILES:
    content = requests.get(f"{BASE_URL}/{filename}").content

    fin = BytesIO(content)

    with tarfile.open(fileobj=fin, mode="r:bz2") as tf:
        tf.extractall(OUTPUT_DIR)

In [17]:
path = "../data/spam_data/*/*"

In [18]:
data: list[Message] = []

In [19]:
for filename in glob.glob(path):
    is_spam = "ham" not in filename

    with open(filename, errors="ignore") as email_file:
        for line in email_file:
            if line.startswith("Subject:"):
                subject = line.lstrip("Subject: ")
                data.append(Message(subject, is_spam))
                break

In [20]:
train_messages, test_messages = split_data(data, 0.75)

In [21]:
model = NaiveBayesClassifier()
model.train(train_messages)

In [22]:
predictions = [(message, model.predict(message.text)) for message in test_messages]

In [23]:
confusion_matrix = Counter((message.is_spam, spam_probability > 0.5) for message, spam_probability in predictions)

In [24]:
print(f"[+] Confusion matrix: {confusion_matrix}")

[+] Confusion matrix: Counter({(False, False): 686, (True, True): 79, (True, False): 36, (False, True): 24})


In [25]:
def p_spam_given_token(token: str, model: NaiveBayesClassifier) -> float:
    prob_if_spam, prob_if_ham = model._probabilities(token)
    return prob_if_spam / (prob_if_spam + prob_if_ham)

In [26]:
words = sorted(model.tokens, key=lambda t: p_spam_given_token(t, model))

In [27]:
print(f"[+] Most spammy words: {words[-10:]}")
print(f"[+] Least spam words: {words[:10]}")

[+] Most spammy words: ['marketing', 'zzzz', 'clearance', '95', 'per', 'systemworks', 'sale', 'rates', 'money', 'adv']
[+] Least spam words: ['spambayes', 'users', 'razor', 'zzzzteana', 'sadev', 'ouch', 'apt', 'bliss', 'selling', 'wedded']
