In [195]:
# bayesian classifier

import typing as t
import numpy as np

import re
import string
import html
import math
import mailbox
import random

# get a list of tokens from sequence
def normalize_text(text: str)-> t.Sequence[str]:
    # convert to lower case
    text = text.lower()
    # remove punctuations
    text = text.translate(str.maketrans('', '', string.punctuation))
    
    return text.split()


# parse mbox file
def parse_mbox_body(path_mbox: str)-> t.List[str]:
    contents = []
    mb_spam = mailbox.mbox(path_mbox)
    for message in mb_spam.itervalues():
        body = None
        if message.is_multipart():
            for part in message.walk():
                if part.is_multipart():
                    for subpart in part.walk():
                        if subpart.get_content_type() == 'text/plain':
                            body = subpart.get_payload(decode=True)
                elif part.get_content_type() == 'text/plain':
                    body = part.get_payload(decode=True)
        elif message.get_content_type() == 'text/plain':
            body = message.get_payload(decode=True)
        content = ""
        if body is not None:
            content = re.sub("<[^>]+>", "", body.decode('unicode_escape'))
        contents.append(content)
        
    return contents


# get a list of tokens from sequence
def load_emails(path_mbox: str)-> t.List[t.Dict[str, int]]:
    emails = []
    contents = parse_mbox_body(path_mbox)
    for content in contents:
        tokens = normalize_text(content)
        
        token2count = {}
        for token in tokens:
            token2count[token] = 1
        emails.append(token2count)
    return emails


# return P(word|C) in category C
def log_prob_t_C(texts: t.Sequence[t.Dict[str,int]]) -> t.Dict[str, float]:
    cat_freq = {}
    for text in texts:
        for token in text.keys():
            if token not in cat_freq:
                cat_freq[token] = 0
            cat_freq[token] += 1  
    
    cat_prob = {}
    for token, count in cat_freq.items():
        # always add 1 + freq to make the distribution more smooth
        p = 1.0 * (1 + count) / (1 + len(texts))
        cat_prob[token] = math.log10(p)
    
    return cat_prob

# evaluate log_P(t1,t2,t3,...|C) token sequence
def evaluate_prob(
    tokens: t.Sequence[str], # t1, t2, t3, ...
    log_prob_t_C: t.Dict[str,float], # log_P(t1|C)
    N: int, # corpus size
) -> float:
    log_prob_sequence = 0
    for token in tokens:
        if token in log_prob_t_C:
            log_prob_sequence += log_prob_t_C[token]
        else:
            log_prob_sequence += math.log10(1.0/(N+1))
    return log_prob_sequence

# return 1 if Positive, otherwise 0 if Negative
def classify_bayesian(
    tokens, # list of str tokens
    log_prob_t_pos, 
    N_pos,
    log_prob_t_neg,
    N_neg,
):
    # log_P(t1,t2,t3, ... | postive)
    log_prob_seq_pos = evaluate_prob(tokens, log_prob_t_pos, N_pos)
    # log_P(t1,t2,t3, ... | negative)
    log_prob_seq_neg = evaluate_prob(tokens, log_prob_t_neg, N_neg)
    # log_P(positive) 
    log_prob_pos = math.log10(1.0*N_pos/(N_pos+N_neg))
    # log_P(positive) 
    log_prob_neg = math.log10(1.0*N_neg/(N_pos+N_neg))
    
    # P(prob_seq_pos) * P(prob_pos)
    score_pos = log_prob_seq_pos + log_prob_pos
    # P(prob_seq_neg) * P(prob_neg)
    score_neg = log_prob_seq_neg + log_prob_neg
    
    return 1 if score_pos > score_neg else 0
    

# label = 1
texts_pos = load_emails("promotion.mbox")

# label = 0
texts_neg = load_emails("update.mbox")

texts_pos_train = []
texts_pos_eval = []
texts_neg_train = []
texts_neg_eval = []

for t in texts_pos: 
    if random.random() < 0.7:
        texts_pos_train.append(t)
    else:
        texts_pos_eval.append(t)

for t in texts_neg: 
    if random.random() < 0.7:
        texts_neg_train.append(t)
    else:
        texts_neg_eval.append(t)
        

log_prob_t_pos = log_prob_t_C(texts_pos_train)
log_prob_t_neg = log_prob_t_C(texts_neg_train)

# true positive
TP = 0
# true negative
TN = 0
# false positive
FP = 0
# false negative
FN = 0

for t in texts_pos_eval:
    y = classify_bayesian(
        t.keys(),
        log_prob_t_pos, 
        len(texts_pos_train),
        log_prob_t_neg, 
        len(texts_neg_train))
    if y == 1:
        TP += 1
    else:
        FN += 1

for t in texts_neg_eval:
    y = classify_bayesian(
        t.keys(),
        log_prob_t_pos, 
        len(texts_pos_train),
        log_prob_t_neg, 
        len(texts_neg_train))
    if y == 1:
        FP += 1
    else:
        TN += 1

print (f"label=T\tTP={TP}\tFN={FN}")
print (f"label=F\tFP={FP}\tTN={TN}")
print (f"Bayesian accuracy = {1.0*(TP+TN)/(TP+TN+FP+FN)}")

label=T	TP=329	FN=18
label=F	FP=66	TN=142
Bayesian accuracy = 0.8486486486486486


In [128]:
# logistic regression

In [196]:
# logistic regression

def select_words(
    log_prob_t_pos, 
    N_pos,
    log_prob_t_neg,
    N_neg,
    N_select
):
    # select most discriminative positive words
    tuples_pos = []
    for token, prob_pos in log_prob_t_pos.items():
        if token in log_prob_t_neg:
            prob_neg = log_prob_t_neg[token]
        else:
            prob_neg = math.log10(1.0/(1 + N_neg))
        tuples_pos.append((token, prob_pos-prob_neg))
    
    tuples_pos = sorted(tuples_pos, key = lambda x: float(x[1]), reverse = True)
   
    # select most discriminative negative words
    tuples_neg = []
    for token, prob_neg in log_prob_t_neg.items():
        if token in log_prob_t_pos:
            prob_pos = log_prob_t_pos[token]
        else:
            prob_pos = math.log10(1.0/(1 + N_pos))
        tuples_neg.append((token, prob_neg-prob_pos))
        
    tuples_neg = sorted(tuples_neg, key = lambda x: float(x[1]), reverse = True)
    words = {}
    index = 0
    for x in (tuples_pos[:N_select] + tuples_neg[:N_select]):
        words[x[0]] = index
        index += 1
    return words

topN = 20
words = select_words(
        log_prob_t_pos, 
        len(texts_pos_train),
        log_prob_t_neg, 
        len(texts_neg_train),
        topN
)

def gen_feature(words, texts):
    X = []
    for text in texts:
        features = [0]*len(words)
        for token in text:
            if token in words:
                features[words[token]] = 1
        features.append(1)
        X.append(features)
    return np.array(X)

#numpy.ones

X_train = gen_feature(words, texts_pos_train + texts_neg_train)
Y_train = np.expand_dims(np.append(np.ones(len(texts_pos_train)), np.zeros(len(texts_neg_train))), axis=1)
X = np.append(X_train, Y_train, axis=1)
np.random.shuffle(X)
X_train = X[:,:X.shape[1]-1]
Y_train = X[:,X.shape[1]-1:].squeeze()

X_eval = gen_feature(words, texts_pos_eval + texts_neg_eval)
Y_eval = np.expand_dims(np.append(np.ones(len(texts_pos_eval)), np.zeros(len(texts_neg_eval))), axis=1)
X = np.append(X_eval, Y_eval, axis=1)
np.random.shuffle(X)
X_eval = X[:,:X.shape[1]-1]
Y_eval = X[:,X.shape[1]-1:].squeeze()

from sklearn.linear_model import LogisticRegression 
clf = LogisticRegression(random_state=0, solver='lbfgs').fit(X_train, Y_train)
score =clf.score(X_eval, Y_eval)
print(f"LR accuracy = {score}")

LR accuracy = 0.8126126126126126
