# Naive Bayes

Naive Bayes is a classifier that uses Bayes theorem to estimate the probablity for P(some class | some features) using P(some features | some class) and Bayes Theorem.

Bayes Theorem (e.g. A = 'message is spam', B = 'message contains the word 'exclusive'')

$$P(A|B) = \frac{P(B|A)P(A)}{P(B)}$$


This has a simple proof using the definition of 'and' in probability theory, as well as conditional probability: start from P(AB) and derive to conditional form for each, then isolate conditionals

$$1. P(A \cap B) = P(A|B)P(B)$$

$$2. P(A \cap B) = P(B|A)P(A)$$

$$3. P(A|B)P(B) = P (B | A)P(A)$$

$$4. P(A|B) = \frac{P(B|A)P(A)}{P(B)}$$


## A Naive Bayes spam-filter

In our spam filter construction, lets assume that we want to model  P(spam|email content) as P(spam |bag-of-words(email)). We can use Bayes Theorem to instead model this like so:

$$P(spam|BOW(email)) = \frac{P(BOW(email) | spam)P(spam)}{P(BOW(email))}$$


This is nearly impossible to estmate from data frequencies as-is, but if we assume that each word contributes to P(spam) individually (obviously wrong but works well enough in practice), then we can model the probabilty of each word in the bag independently:

$$P(BOW(email) | spam) = \prod_{w_i \in BOW(email)} P(w_i | spam)$$

and

$$P(BOW(email)) = \prod_{w_i \in BOW(email)} P(w_i)$$

While this is the correct definition, in practice we want to smooth for the fact that whatever labelled dataset we have doesn't represent the domain completely. For instance, if 'bitcoin' only occurs once in the dataset and that message is *not* spam, then we will estimate a 0% chance of spam in the case the message has the word bitcoin:

$$P(spam|BOW(email)) = \frac{P(BOW(email) | spam)P(spam)}{P(BOW(email))}$$

$$P(spam|BOW(email)) = \frac{P(spam)\prod_{w_i \in BOW(email)} P(w_i | spam)}{P(BOW(email))}$$

$$P(spam|BOW(email)) = \frac{P(spam)(x_0 * 0 ...  x_V)}{P(BOW(email))} = \textbf{0}$$

**To account for this, we begin with a uniform 'pseudocount' for all possible words: effectively assuming we've seen each word at least k times per class already**

## Implementation

In [57]:
# class called NaiveBayesClassifier that has a predict and train method
# class 
from typing import Tuple, Set, NamedTuple, Iterable, Dict, List
from collections import defaultdict, Counter
import re, math, os, glob, random, sys
import tqdm

from io import BytesIO
import requests, tarfile

sys.path.insert(0, "../")
from machine_learning import split_data, f1_score

In [2]:
def tokenize(email: str) -> Set[str]:
    """
    tokenize the email string into a case-insensitive bag-of-words
    """
    email = email.lower()
    tokens = re.findall('[a-z0-9]+', email)
    return set(tokens)
assert tokenize('Data Science is science') == {'data', 'is', 'science'}

In [3]:
class Message(NamedTuple):
    """
    Class representing an email message and  whether it is spam or not
    """
    email: str  # original email content
    spam: bool  # true if spam, false if ham
    

In [45]:
class NaiveBayesClassifier:
    """
    A Naive Bayes Classifier operating on Messages, determining if it is spam or ham
    """
    k: int  # pseudocount: just do the math with this value so we can swap k params without re-training
    spam_count: int  # the number of messages in our training set that were spam
    ham_count: int  # the number of messages in our training set that were NOT spam
    spam_token_counts: Dict[str, int]  # per-token frequencies when an email is spam
    ham_token_counts: Dict[str, int]  # per-token frequencies when an email is NOT spam
    all_tokens: Set[str]  # all the tokens in the dataset
    p_spam_prior: float  # real-world expected probability of spam
    
    def __init__(self, k:float = 1, p_spam_prior:float = None):
        self.k = k
        self.spam_count = 0
        self.ham_count = 0
        self.spam_token_counts = defaultdict(int)
        self.ham_token_counts = defaultdict(int)
        self.all_tokens = set()
        self.p_spam_prior = p_spam_prior
    
    def train(self, emails: Iterable[Message]) -> None:
        """
        Train our classifier on the emails
        """
        for email in tqdm.tqdm(emails):
            # account for P(spam) and P(!spam)
            if email.spam:
                self.spam_count += 1                
            else:
                self.ham_count += 1
            # now modify per-class token frequencies
            tokens: Set[str] = tokenize(email.email)
            for token in tokens:
                self.all_tokens.add(token)
                if email.spam:
                    self.spam_token_counts[token] += 1
                else:
                    self.ham_token_counts[token] += 1
    
    def _p_token(self, token: str, given_spam: bool) -> float:
        """
        Given a token, return the probability of its occurence in a spam (given_spam=True) or
        ham (given_spam=False) message
        """
        # get the count of our token in the spam or ham context, as well as the total count of that context
        token_count: int = self.spam_token_counts[token] if given_spam else self.ham_token_counts[token]
        message_count: int = self.spam_count if given_spam else self.ham_count
        # compute p as smoothed by k
        p_token: float = float(token_count + self.k) / ((2 * self.k) + message_count)
        return p_token
            
    def predict(self, email:str) -> bool:
        """
        Predict whether a particular email is spam (true) or ham (false)
        """
        # first, tokenize it
        email_tokens: Set[str] = tokenize(email)
        # We'll be computing conditional probs of each token (or its absence) using a log-sum to avoid underflow
        log_p_sum_given_spam, log_p_sum_given_ham = 0, 0
        # for each POSSIBLE token, appropriately modify these sums
        for token in self.all_tokens:
            # how often do we expect to see this token in a spam or ham message?
            p_token_given_spam, p_token_given_ham  = self._p_token(token, True), self._p_token(token, False)
            if token in email_tokens:
                # this token is present, modify our conditional prob for spam/ham to note its presence
                log_p_sum_given_spam += math.log(p_token_given_spam)
                log_p_sum_given_ham += math.log(p_token_given_ham)
            else:
                 # this token is absent, modify our conditional prob for spam/ham to note its absence
                log_p_sum_given_spam += math.log(1 - p_token_given_spam)
                log_p_sum_given_ham += math.log(1 - p_token_given_ham)
        # now compute our conditional probabilities P(BOW(email) | spam) and P(BOW(email) | !spam)
        p_tokens_given_spam, p_tokens_given_ham = math.exp(log_p_sum_given_spam), math.exp(log_p_sum_given_ham)
        # using Bayes Theorem, our prediction for P(spam | BOW(email)) is:
        p_spam: float = float(self.spam_count / (self.spam_count + self.ham_count))
        p_ham: float = 1 - p_spam
        # if no class_balance_prior, use real frequencies in training data
        class_ratio: float = p_ham / p_spam if self.p_spam_prior is None else (1 - self.p_spam_prior) / self.p_spam_prior
        return p_tokens_given_spam / (p_tokens_given_spam + (p_tokens_given_ham * class_ratio))
            

Derivation of the `predict` return quantity:

$$P(spam|BOW(email)) = 
\frac{P(BOW(email) | spam)P(spam)}
{P(BOW(email)|spam)P(spam) + P(BOW(email) | \neg{spam})P(\neg{spam})}$$

Becomes:

$$P(spam|BOW(email)) = 
\frac{P(BOW(email) | spam)}{P(BOW(email)|spam) + P(BOW(email) | \neg{spam}) * \frac{P(\neg{spam})}{P(spam)})}$$



In [46]:
# Some unit tests!
messages = [Message("spam rules", spam=True), Message("Ham rules", spam=False), Message("hello ham", spam=False)]
model = NaiveBayesClassifier(k=0.5)
model.train(messages)

# verify correct counting after train:
assert model.spam_count == 1
assert model.ham_count == 2
assert model.spam_token_counts == {'spam': 1, 'rules': 1}
assert model.ham_token_counts == {'ham': 2, 'rules': 1, 'hello': 1}
assert model.all_tokens == {'spam', 'ham', 'rules', 'hello'}

# verify correct predictions (the hard way)
text = 'hello spam'
probs_if_spam = [
    (1 + .5) / (1 + .5*2), # spam (present)
    1 - ((0 + .5) / (1 + .5*2)), # ham (absent)
    1 - ((1 + .5) / (1 + .5*2)), # rules (absent)
    (0 + .5) / (1 + .5*2), # hello (present)
]

probs_if_ham = [
    (0 + .5) / (2 + .5*2), # spam
    1 - ((2 + .5) / (2 + .5*2)), # ham
    1 - ((1 + .5) / (2 + .5*2)), # rules
    (1 + .5) / (2 + .5*2), # hello
]

p_if_spam = math.exp(sum(math.log(p) for p in probs_if_spam))
p_if_ham = math.exp(sum(math.log(p) for p in probs_if_ham))
assert abs(model.predict(text) - p_if_spam / (p_if_spam + (p_if_ham * (2.)))) < 10**-7

# now verify result (like book) when we assume spam and ham are balanced classes in the real world
model = NaiveBayesClassifier(k=0.5, p_spam_prior=0.5)
model.train(messages)
assert abs(model.predict(text) - p_if_spam / (p_if_spam + p_if_ham)) < 10**-7

100%|██████████| 3/3 [00:00<00:00, 9868.95it/s]
100%|██████████| 3/3 [00:00<00:00, 17747.41it/s]


### Using our model on SpamAssasin

In [50]:
BASE_URL = 'https://spamassassin.apache.org/old/publiccorpus/'
FILES = ['20021010_easy_ham.tar.bz2','20021010_hard_ham.tar.bz2', '20021010_spam.tar.bz2']

OUTPUT_DIR = '/Users/bking/bking-dsfs/spam_data'
if not os.path.isfile(OUTPUT_DIR) or len(os.listdir(OUTPUT_DIR)) == 0:
    for fname in FILES:
        content = requests.get(BASE_URL + fname).content
        fin = BytesIO(content)
        with tarfile.open(fileobj=fin, mode='r:bz2') as tf:
            tf.extractall(OUTPUT_DIR)

In [52]:
path = OUTPUT_DIR + '/*/*'
data: List[Message] = []

for fname in glob.glob(path):
    is_spam = 'ham' not in fname
    
    with open(fname, errors='ignore') as ef:
        for line in ef:
            if line.startswith('Subject:'):
                subject = line.lstrip('Subject: ')
                data.append(Message(subject, is_spam))
                break  # done with this file

In [56]:
random.seed(0)
train_msgs, test_msgs = split_data(data, .75)

model_book = NaiveBayesClassifier(p_spam_prior=0.5)
model_me = NaiveBayesClassifier(p_spam_prior=None)
model_book.train(train_msgs)
model_me.train(train_msgs)

100%|██████████| 2475/2475 [00:00<00:00, 101918.44it/s]
100%|██████████| 2475/2475 [00:00<00:00, 132821.15it/s]


In [60]:
predictions_book = [(msg, model_book.predict(msg.email)) for msg in test_msgs]
predictions_me = [(msg, model_me.predict(msg.email)) for msg in test_msgs]

In [62]:
confusion_matrix_book = Counter((msg.spam, p_spam > 0.5) for msg, p_spam in predictions_book)
confusion_matrix_me = Counter((msg.spam, p_spam > 0.5) for msg, p_spam in predictions_me)

In [63]:
print(confusion_matrix_book)

Counter({(False, False): 679, (True, False): 81, (True, True): 58, (False, True): 7})


In [64]:
print(confusion_matrix_me)

Counter({(False, False): 683, (True, False): 102, (True, True): 37, (False, True): 3})


In [65]:
help(f1_score)

Help on function f1_score in module machine_learning:

f1_score(true_positives: int, false_positives: int, false_negatives: int, true_negatives: int) -> float
    compute the F1 score from the confusion matrix



In [67]:
cmb, cme = confusion_matrix_book, confusion_matrix_me
print('Books F1 Score: ', f1_score(cmb[(True, True)], cmb[(True, False)], cmb[(False, True)], cmb[(False, False)]))
print('My F1 Score: ', f1_score(cme[(True, True)], cme[(True, False)], cme[(False, True)], cme[(False, False)]))

Books F1 Score:  0.5686274509803922
My F1 Score:  0.41340782122905034


So do what the book says I guess?