<a href="https://colab.research.google.com/github/kasonancelin/Naive-Bayes-for-Spam-Email-Classifier/blob/main/NaiveBayes1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Naive Bayes Time
from typing import Set, NamedTuple, List, Tuple, Dict, Iterable
import re
import math
from collections import defaultdict

class Message(NamedTuple): # Message will be a string containing the entire email message
  text: str
  is_spam: bool

def tokenize(text: str) -> Set[str]:
  text = text.lower()
  all_words = re.findall("[a-z0-9']+", text) # Dicphers and finds the words
  return set(all_words) # Avoid repeats

assert tokenize("Data science is Science") == {'science', 'data', 'is'}

class NaiveBayesClassifier:
  def __init__(self, k: float = 0.5) -> None:
    self.k = k # pseudocount

    self.tokens: Set[str] = set() # All words found in the email
    self.token_spam_counts: Dict[str, int] = defaultdict(int) # Dictionary which will have words from the email as keys and the # of emails which are spam containing said word as values
    self.token_ham_counts: Dict[str, int] = defaultdict(int) # Dictionary which will have words from the email as keys and the # of emails which are spam containing said word as values
    self.spam_messages = self.ham_messages = 0 # Simply counts the # of spam/ham messages

  def train(self, messages: Iterable[Message]) -> None:
    for message in messages:
      if message.is_spam:
        self.spam_messages += 1
      else:
        self.ham_messages += 1
      
      for token in tokenize(message.text):
        self.tokens.add(token)
        if message.is_spam:
          self.token_spam_counts[token] += 1
        else:
          self.token_ham_counts[token] += 1

  def _probabilities(self, token: str) -> Tuple[float, float]: # Will return P(token in text | spam), P(token in text | ham)
    spam = self.token_spam_counts[token] # number of spam emails with said token
    ham = self.token_ham_counts[token] # number of ham emails with said token

    p_token_spam = (self.k + spam) / (self.spam_messages + 2 * self.k) # probability of a token being in a message given it is spam
    p_token_ham = (self.k + ham) / (self.ham_messages + 2 * self.k) # probability of a token being in a message given it is ham
    return p_token_spam, p_token_ham

  def predict(self, text: str) -> float:
    text_tokens = tokenize(text)
    log_prob_if_spam = log_prob_if_ham = 0.0

    # Iterate through each word in our vocab
    for token in self.tokens:
      prob_if_spam, prob_if_ham = self._probabilities(token) # probability of token being in a message given it is spam/ham

      # If *token* appears in the message,
      # add the log probability of seeing it
      if token in text_tokens:
        log_prob_if_spam += math.log(prob_if_spam)
        log_prob_if_ham += math.log(prob_if_ham)
      # Else, add log probability of not seeing it (1 - seeing it)
      else:
        log_prob_if_spam += math.log(1 - prob_if_spam)
        log_prob_if_ham += math.log(1 - prob_if_ham)
      # Routine math below, exponentiate the log sum
    prob_if_spam = math.exp(log_prob_if_spam)
    prob_if_ham = math.exp(log_prob_if_ham)
    return prob_if_spam / (prob_if_spam + prob_if_ham)

# basic test below to insure the code is working, we will confer with by hand work soon
messages = [Message("spam rules", is_spam = True),
            Message("ham rules", is_spam = False),
            Message("hello ham", is_spam = False)]
model = NaiveBayesClassifier(k=0.5)
model.train(messages)

assert model.tokens == {"spam", "ham", "rules", "hello"}
assert model.spam_messages == 1
assert model.ham_messages == 2
assert model.token_spam_counts["rules"] == 1
assert model.token_ham_counts["rules"] == 1
assert model.token_spam_counts == {"spam": 1, "rules": 1}
assert model.token_ham_counts == {"ham": 2, "rules": 1, "hello": 1}

# Below simply checks if our code matches a by hand version of the classification
text = "hello spam"

probs_if_spam = [3/4, 3/4, 1/4, 1/4]
probs_if_ham = [1/6, 1/6, 3/6, 3/6]

p_if_spam = math.exp(sum(math.log(p) for p in probs_if_spam))
p_if_ham = math.exp(sum(math.log(p) for p in probs_if_ham))

assert model.predict(text) == p_if_spam / (p_if_spam + p_if_ham)

In [2]:
from io import BytesIO  
import requests          
import tarfile          # Files are in .tar.bz format.

BASE_URL = "https://spamassassin.apache.org/old/publiccorpus"
FILES = ["20021010_easy_ham.tar.bz2",
         "20021010_hard_ham.tar.bz2",
         "20021010_spam.tar.bz2"]


OUTPUT_DIR = 'spam_data' # The directory for the data

for filename in FILES:
    # Get the file contents at each URL.
    content = requests.get(f"{BASE_URL}/{filename}").content

    # Wrap the in-memory bytes so we can use them as a "file."
    fin = BytesIO(content)

    # And extract all the files to the specified output dir.
    with tarfile.open(fileobj=fin, mode='r:bz2') as tf:
        tf.extractall(OUTPUT_DIR) # Send all the files to output directory (spam_data)

In [3]:
import glob, re
import random

paths = ['/content/spam_data/*/*']

data: List[Message] = []
for path in paths:
  for filename in glob.glob(path):
    is_spam = "ham" not in filename
    #print(filename)
    with open(filename, errors='ignore') as email_file:
      for line in email_file:
        #print(line)
        if line.startswith("Subject:"):
          subject = line.lstrip("Subject: ")
          #print(subject)
          data.append(Message(subject, is_spam))
          break



In [4]:
import random

def split_data(data, proportion):
  random.shuffle(data)
  bar = math.floor(proportion * len(data))
  train_data = data[:bar]
  test_data = data[bar:] # None is the last element
  return train_data, test_data

random.seed(0)
train_messages, test_messages = split_data(data, 0.75)

model = NaiveBayesClassifier()
model.train(train_messages)

In [5]:
from collections import Counter

predictions = [(message, model.predict(message.text)) for message in test_messages]
confusion_mat = Counter((message.is_spam, spam_probability > 0.5) for message, spam_probability in predictions)

print(confusion_mat)

Counter({(False, False): 681, (True, True): 83, (True, False): 43, (False, True): 18})
