# Seminar

- Sinh viên 1: 1753086 - Tống Lê Thiên Phúc
- Sinh viên 2: 1753089 - Nguyễn Lý Nhật Phương

In [1]:
import random
from typing import TypeVar, List, Tuple

X = TypeVar('X')  # generic type to represent a data point

def split_data(data: List[X], prob: float) -> Tuple[List[X], List[X]]:
    """Split data into fractions [prob, 1 - prob]"""
    data = data[:]                    # Make a shallow copy
    random.shuffle(data)              # because shuffle modifies the list.
    cut = int(len(data) * prob)       # Use prob to find a cutoff

    return data[:cut], data[cut:]     # and split the shuffled list there.


Y = TypeVar('Y')  # generic type to represent output variables


def train_test_split(xs: List[X],
                     ys: List[Y],
                     test_pct: float) -> Tuple[List[X], List[X], List[Y], List[Y]]:

    # Generate the indices and split them.
    idxs = [i for i in range(len(xs))]
    train_idxs, test_idxs = split_data(idxs, 1 - test_pct)
    
    return ([xs[i] for i in train_idxs],  # x_train
            [xs[i] for i in test_idxs],   # x_test
            [ys[i] for i in train_idxs],  # y_train
            [ys[i] for i in test_idxs])   # y_test

def accuracy(tp: int, fp: int, fn: int, tn: int) -> float:
    correct = tp + tn
    total = tp + fp + fn + tn

    return correct / total

def precision(tp: int, fp: int, fn: int, tn: int) -> float:
    return tp / (tp + fp)

def recall(tp: int, fp: int, fn: int, tn: int) -> float:
    return tp / (tp + fn)

def f1_score(tp: int, fp: int, fn: int, tn: int) -> float:
    p = precision(tp, fp, fn, tn)
    r = recall(tp, fp, fn, tn)
    
    return 2 * p * r / (p + r)

## Naive Bayes

### A Really Dumb Spam Filter 

#### Bayes Theorem

$$P(h|\mathcal{D}) = \frac{P(\mathcal{D}|h)P(h)}{P(\mathcal{D})}$$

- $P(h) = $ **prior probability** of hypothesis $h$
- $P(\mathcal{D}) = $ **prior probability** of training data $\mathcal{D}$
- $P(h|\mathcal{D}) = $ probability of $h$ given $\mathcal{D}$ (**posterior probability**)
- $P(\mathcal{D}|h) = $ probability of $\mathcal{D}$ given $h$ (**likelyhood**)

$S$: The event "the message is spam"

$V$: The event "the message contains the word *viagra*"


- The probability that the message is spam conditional on containing the word *viagra* is:

$$P(S|V) = \frac{P(V|S)P(S)}{P(V|S)P(S) + P(V|\neg S)P(\neg S)}$$

- Assume that any message is equally likely to be spam or notspam ($P(S) = P(\neg S) = 0.5$) then:

$$P(S|V) = \frac{P(V|S)}{P(V|S) + P(V|\neg S)}$$

**Example:** If	50%	of spam	messages	have	the	word	viagra,	but	only 1%	of	nonspam messages	do,	then	the	probability	that	any	given	viagra-containing	email	is	spam	is:
$$\frac{0.5}{0.5+0.01} = 0.98$$

### A More Sophisticated Spam Filte

- Have a vocabulary of many words $w_1, ... , w_n$
- $X_i$: The event "a message contains the word	$w_i$" 
- The probability that a spam message contains the $i$th word $$P(X_i|S)$$
- The probability that a nonspam message contains the $i$th word $$P(X_i|\neg S)$$



The	key	to	Naive	Bayes	is	making	the	(big)	assumption	that	the	presences	(or	absences)	of each	word	are	independent	of	one	another,	conditional	on	a	message	being	spam	or	not. Intuitively,	this	assumption	means	that	knowing	whether	a	certain	spam	message	contains the	word	“viagra”	gives	you	no	information	about	whether	that	same	message	contains	the word	“rolex.”	

$$P(X_1 = x_1, ..., X_n = x_N|S) = P(X_1 = x_1|S)\times...\timesP(X_n = x_n|S)$$

This	is	an	extreme	assumption.	(There’s	a	reason	the	technique	has	“naive”	in	its	name.) Imagine	that	our	vocabulary	consists	only	of	the	words	“viagra”	and	“rolex,”	and	that	half of	all	spam	messages	are	for	“cheap	viagra”	and	that	the	other	half	are	for	“authentic rolex.”	In	this	case,	the	Naive	Bayes	estimate	that	a	spam	message	contains	both	“viagra” and	“rolex”	is:

$$P(X_1 = 1, X_2 = 1 | S) = P(X_1 = 1|S)P(X_2 = 1|S) = 0.5\times0.5=0.25$$

since	we’ve	assumed	away	the	knowledge	that	“viagra”	and	“rolex”	actually	never	occur together.	

The	same	Bayes’s	Theorem	reasoning	we	used	for	our	“viagra-only”	spam	filter	tells	us that	we	can	calculate	the	probability	a	message	is	spam	using	the	equation:

$$P(S|X=x)=\frac{P(X=x|S)}{P(X=x|S)+P(X=x|\neg S)}$$

The	Naive	Bayes	assumption	allows	us	to	compute	each	of	the	probabilities	on	the	right simply	by	multiplying	together	the	individual	probability	estimates	for	each	vocabulary word. 

In	practice,	you	usually	want	to	avoid	multiplying	lots	of	probabilities	together,	to	avoid	a problem	called	underflow,	in	which	computers	don’t	deal	well	with	floating-point	numbers that	are	too	close	to	zero.

$$log(ab) = log(a) + log(b)$$
$$exp(log(x)) = x$$

$$p_1\times ... \times p_n \Leftrightarrow exp(log(p_1) + ... + log(p_n))$$

Imagine	that in	our	training	set	the	vocabulary	word “data”	only	occurs	in	nonspam	messages

$$P(w_i|S) = 0 	\Rightarrow P(S|message\ contains\ word\ w_i) = 0$$

To	avoid	this	problem,	we	usually	use	some	kind	of	smoothing.

In	particular,	we’ll	choose	a	pseudocount	—	k	—	and	estimate	the	probability	of	seeing the	ith	word	in	a	spam	as:
$$P(X_i|S) = \frac{k + number\ of\ spams\ containing\ w_i}{2k + number\ of\ spams}$$
$$P(X_i|\neg S) = \frac{k + number\ of\ nonspams\ containing\ w_i}{2k + number\ of\ nonspams}$$

### Implementation

In [2]:
from typing import NamedTuple, Set, List, Tuple, Dict, Iterable
import math
from collections import defaultdict

In [3]:
def tokenize(message: str) -> Set[str]:
    message = message.lower() # Convert to lowercase,
    all_words = re.findall("[a-z0-9']+", message)  # extract the words, and

    return set(all_words) # remove duplicates.

class Message(NamedTuple):
    text: str
    is_spam: bool

In [4]:
class NaiveBayesClassifier:
    def __init__(self, k: float = 0.5) -> None:
        self.k = k  # smoothing factor
        
        self.words: Set[str] = set()
        self.word_spam_counts: Dict[str, int] = defaultdict(int)
        self.word_ham_counts: Dict[str, int] = defaultdict(int)
        self.spam_messages = self.ham_messages = 0

    def train(self, messages: Iterable[Message]) -> None:
        #count spam and non-spam messages
        for message in messages:
            # Increment message counts
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages += 1
            
            # Increment word counts
            for word in tokenize(message.text):
                self.words.add(word)

            if message.is_spam:
                self.word_spam_counts[word] += 1 #count times we saw that word in spam messages
            else:
                self.word_ham_counts[word] += 1 #count times we saw that word in nonspam messages

    
    def _probabilities(self, word: str) -> Tuple[float, float]:
        """returns P(word | spam) and P(word | ham)"""
        spam = self.word_spam_counts[word]
        ham = self.word_ham_counts[word]
        
        p_word_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
        p_word_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)
        
        return p_word_spam, p_word_ham


    #use these word probabilities (and our Naive Bayes assumptions) to assign probabilities to message
    def predict(self, message: str) -> float:
        message_words = tokenize(message)
        log_prob_if_spam = log_prob_if_ham = 0.0 

        #iterate through each word in our vocabulary
        for word in self.words:
            prob_if_spam, prob_if_ham = self._probabilities(word)
            #if *word* appears in the message,
            #add the log probability of seeing it
            if word in message_words:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)

            # otherwise add the log probability of _not_ seeing it
            # which is log(1 - probability of seeing it)
            else:
                log_prob_if_spam += math.log(1.0 - prob_if_spam)
                log_prob_if_ham += math.log(1.0 - prob_if_ham)

        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)

        return prob_if_spam / (prob_if_spam + prob_if_ham)

    
    def predict_nonsmooth(self, message: str) -> float:
        message_words = tokenize(message)

        #iterate through each word in our vocabulary
        for word in self.words:
            prob_if_spam, prob_if_ham = self._probabilities(word)


        return prob_if_spam / (prob_if_spam + prob_if_ham)

### Testing Our Model

#### Data Description

Split into three parts, as follows:

  - spam: 501 spam messages, all received from non-spam-trap sources.

  - easy_ham: 2551 non-spam messages.  These are typically quite easy to
    differentiate from spam, since they frequently do not contain any spammish
    signatures (like HTML etc).

  - hard_ham: 250 non-spam messages which are closer in many respects to
    typical spam: use of HTML, unusual HTML markup, coloured text,
    "spammish-sounding" phrases etc.

Total count: 3302 messages, with about a 15% spam ratio.

After extracting the data we have three	folders: spam, easy_ham, and hard_ham. Each folder contains	many emails, each contained in a single file. To keep things really simple, we’ll just look at the subject lines of each email.

In [5]:
import glob, re

In [6]:
#identify the subject line
# modify the path to wherever you've put the files
path = 'MessagesExample/*/*'
data: List[Message] = []
    
# glob.glob returns every filename that matches the wildcarded path
for filename in glob.glob(path):
    is_spam = "ham" not in filename
    # There are some garbage characters in the emails, the errors='ignore'
    # skips them instead of raising an exception.
    with open(filename, errors='ignore') as email_file:
        for line in email_file:
            if line.startswith("Subject:"):
                subject = line.lstrip("Subject: ")
                data.append(Message(subject, is_spam))
                break  # done with this file

In [7]:
#split data
random.seed(0) #to get the same answers as instruction
train_data, test_data = split_data(data, 0.75) #train = 0.75, test = 0.25

#build classifier
classifier = NaiveBayesClassifier() 
classifier.train(train_data)

In [8]:
from collections import Counter
#(actual is_spam, predicted spam probability) 
predictions = [(message, classifier.predict(message.text))
               for message in test_data]

# Assume that spam_probability > 0.5 corresponds to spam prediction
# and count the combinations of (actual is_spam, predicted is_spam)
confusion_matrix = Counter((message.is_spam, spam_probability > 0.5)
                           for message, spam_probability in predictions)

print(confusion_matrix)

Counter({(False, False): 417, (False, True): 282, (True, True): 103, (True, False): 23})


In [9]:
tp = confusion_matrix[(True, True)]
tn = confusion_matrix[(False, False)]
fp = confusion_matrix[(False, True)]
fn = confusion_matrix[(True, False)]

acc = accuracy(tp, fp, fn, tn)
rec = recall(tp, fp, fn, tn)
pre = precision(tp, fp, fn, tn)
f1 = f1_score(tp, fp, fn, tn)

print("Accuracy = ", acc)
print("Recall = ", rec) #correct positive predicted in total actual positive 
print("Precision = ", pre) #correct positive predicted in total predict positve
print("F1-score = ", f1)

Accuracy =  0.6303030303030303
Recall =  0.8174603174603174
Precision =  0.2675324675324675
F1-score =  0.40313111545988256


In [10]:
predictions = [(message, classifier.predict_nonsmooth(message.text))
               for message in test_data]

confusion_matrix = Counter((message.is_spam, spam_probability > 0.5)
                           for message, spam_probability in predictions)

print(confusion_matrix)

Counter({(False, True): 699, (True, True): 126})


In [11]:
tp = confusion_matrix[(True, True)]
tn = confusion_matrix[(False, False)]
fp = confusion_matrix[(False, True)]
fn = confusion_matrix[(True, False)]

acc = accuracy(tp, fp, fn, tn)
rec = recall(tp, fp, fn, tn)
pre = precision(tp, fp, fn, tn)
f1 = f1_score(tp, fp, fn, tn)

print("Accuracy = ", acc)
print("Recall = ", rec) 
print("Precision = ", pre)
print("F1-score = ", f1)

Accuracy =  0.15272727272727274
Recall =  1.0
Precision =  0.15272727272727274
F1-score =  0.26498422712933756


In [12]:
def p_spam_given_word(word: str, model: NaiveBayesClassifier) -> float:
    """uses bayes's theorem to compute p(spam | message contains word)"""
    prob_if_spam, prob_if_ham = model._probabilities(word)
        
    return prob_if_spam / (prob_if_spam + prob_if_ham)

#sort by p(spam | message contains word) from smallest to largest 
words = sorted(classifier.words, key=lambda t: p_spam_given_word(t, classifier))

#the highest predicted spam probabilities
print("spammiest_words", words[-10:])
#the lowest predicted spam probabilities
print("hammiest_words", words[:10])

spammiest_words ['zzzz', 'weight', 'hiring', 'your', 'warranties', 'rich', 'want', 'assistance', 'lowest', 'per']
hammiest_words ['2', 'test', 'deployment', 'bliss', 'sept', 're', 'roman', 'bad', 'tough', 'name']


#### Optimization

- Look at the message content, not just the	subject	line. Have to be careful how to deal with the message headers.

- The classifier takes into account every word that	appears in the training	set, even words that appear only once. Modify the classifier to accept an optional min_count threshhold and ignore tokens that don’t appear at least that many times.

- The tokenizer has no notion of similar words (e.g., “cheap” and “cheapest”). Modify the classifier to take an optional stemmer function that converts words to equivalence classes of words. Creating a good stemmer function is	hard. People frequently use the	Porter Stemmer. Example of a really simple stemmer function:

In [13]:
# drop 1 's' character at the end of the word
def drop_final_s(word):
    return re.sub("s$", "", word)

- Although our features are all of the form	 “message contains word $w_i$,” there’s no reason why this has to be the case. In the implementation, we could add extra features like “message contains a number” by creating phony tokens like *contains:number* and modifying the *tokenizer* to	emit them when appropriate.