In [1]:
import os
import csv
from typing import List, Dict, Tuple

## Process Spam files

In [2]:
files = os.listdir("./data/spam")
print(f"Number of spam files: {len(files)}")

Number of spam files: 1397


In [3]:
with open("SubjectsSpam.out", "w") as out:
    category = "spam"
    
    for file in files:
        with open(f"./data/{category}/{file}", errors="ignore") as f:
            lines = f.readlines()
            for line in lines:
                if line.startswith("Subject:"):
                    line = line.replace(",", "")
                    out.write(f"{line[8:].strip()},{category}\n")
                    break

## Process Not Spam files

In [4]:
files = os.listdir("./data/easy_ham")
print(f"Number of easy_ham files: {len(files)}")

Number of easy_ham files: 2551


In [5]:
with open("SubjectsHam.out", "w") as out:
    category = "easy_ham"
    
    for file in files:
        with open(f"./data/{category}/{file}", errors="ignore") as f:
            lines = f.readlines()
            for line in lines:
                if line.startswith("Subject:"):
                    line = line.replace(",", "")
                    out.write(f"{line[8:].strip()},nospam\n")
                    break

## Naive Bayes Algorithm

Use the function `list_words()` to get a list of unique words with more than three characters in lower case:

In [6]:
def list_words(text: str) -> List[str]:
    # Init the list containing the result
    words: List[str] = []
    
    # Split the text into words and transform them to lowercase
    words_tmp = text.lower().split()
    for word in words_tmp:
        # Keep only words that are not already in the list 
        #   and are longer than 3 characters
        if word not in words and len(word) > 3:
            words.append(word)
        
    return words

The `training()` function creates variables to store the data needed for the classification. The `c_words` variable is a dictionary with the unique words and its number of occurrences
in the text (frequency) by category. The `c_categories` variable stores a dictionary of each category and its number of texts. Finally, `c_text` and `c_total_words` store the total count of texts and words, respectively:

In [7]:
def training(texts: List[str]) -> Tuple[Dict, Dict, int, int]:
    c_words: Dict[str, int] = {}
    c_categories: Dict[str, int] = {}
    c_texts: int = 0
    c_total_words: int = 0
    
    # Add the classes to the categories
    for text in texts:
        c_texts += 1
        if text[1] not in c_categories:
            c_categories[text[1]] = 1
        else:
            c_categories[text[1]] += 1
    
    # Add the words with list_words() function
    for text in texts:
        words = list_words(text[0])
        
        for word in words:
            if word not in c_words:
                c_total_words += 1
                c_words[word] = {}
                for category in c_categories:
                    c_words[word][category] = 0
            
            c_words[word][text[1]] += 1
    
    return c_words, c_categories, c_texts, c_total_words

The `classifier()` function applies the Bayes rule and classifies the subject into one of the
two categories, that is, either `spam` or `not spam`. The function also needs the four variables from the `training()` function:

In [8]:
def classifier(subject_line: str, c_words: Dict, c_categories: Dict, 
               c_texts: int, c_total_words: int) -> Tuple[str, float]:
    
    best_category: str = ""
    best_prob: float = 0.0
    
    words = list_words(subject_line)
    
    for category in c_categories:
        # category probability
        prob_c: float = c_categories[category] / c_texts
        prob_total_c: float = prob_c
        
        for word in words:
            # word probability
            if word in c_words:
                prob_p: float = c_words[word][category] / c_total_words
                # probability P(category|word)
                prob_cond: float = prob_p / prob_c
                # probability P(word|category)
                prob: float = prob_cond * prob_p / prob_c
                prob_total_c *= prob
        
        if prob_total_c > best_prob:
            best_category = category
            best_prob = prob_total_c
        
    return best_category, best_prob

Main function:

In [9]:
with open("./training.csv") as f:
    subjects = list(csv.reader(f, delimiter=","))
words, categories, texts, total_words = training(subjects)

In [10]:
# First Test
class_ = classifier("Available on Term Life - Free",
                    words, categories, texts, total_words)
print("Result: {0}".format(class_))

Result: ('spam', 1.593317772662467e-11)


In [11]:
# Second Test
with open("test.csv") as f:
    correct = 0
    tests = csv.reader(f)
    for subject in tests:
        clase = classifier(subject[0],
                           words, categories, texts, total_words)
        if clase[0] == subject[1]:
            correct += 1
    print("Efficiency {0} of 10".format(correct))

Efficiency 10 of 10
