In [31]:
from typing import Set, NamedTuple, List, Dict, Iterable, Tuple, TypeVar
from collections import defaultdict, Counter
from io import BytesIO
import re
import math
import requests
import tarfile
import glob
import random

In [32]:
def tokenize(text: str) -> Set[str]:
    text = text.lower()
    all_words = re.findall("[a-z0-9]+", text)
    return set(all_words)

assert tokenize("Data Science is science") == {"data", "science", "is"}

In [33]:
class Message(NamedTuple):
    text: str
    is_spam: bool

In [34]:
class NaiveBayesClassifier:
    def __init__(self, k: float = 0.5) -> None:
        self.k = k  # smoothing factor

        self.tokens: Set[str] = set()
        self.token_spam_counts: Dict[str, int] = defaultdict(int)
        self.token_ham_counts: Dict[str, int] = defaultdict(int)
        self.spam_messages = self.ham_messages = 0

    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
            # Increment message counts
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages += 1

            # Increment word counts
            for token in tokenize(message.text):
                self.tokens.add(token)
                if message.is_spam:
                    self.token_spam_counts[token] += 1
                else:
                    self.token_ham_counts[token] += 1

    def _probabilities(self, token: str) -> Tuple[float, float]:
        """returns P(token | spam) and P(token | not spam)"""
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]

        p_token_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
        p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)

        return p_token_spam, p_token_ham

    def predict(self, text: str) -> float:
        text_tokens = tokenize(text)
        log_prob_if_spam = log_prob_if_ham = 0.0

        # Iterate through each word in our vocabulary.
        for token in self.tokens:
            prob_if_spam, prob_if_ham = self._probabilities(token)

            # If *token* appears in the message,
            # add the log probability of seeing it;
            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)

            # otherwise add the log probability of _not_ seeing it
            # which is log(1 - probability of seeing it)
            else:
                log_prob_if_spam += math.log(1.0 - prob_if_spam)
                log_prob_if_ham += math.log(1.0 - prob_if_ham)

        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)
        return prob_if_spam / (prob_if_spam + prob_if_ham)

In [35]:
messages = [Message("spam rules", is_spam=True),
            Message("ham rules", is_spam=False),
            Message("hello ham", is_spam=False)]

model = NaiveBayesClassifier(k=0.5)
model.train(messages)

assert model.tokens == {"spam", "ham", "rules", "hello"}
assert model.spam_messages == 1
assert model.ham_messages == 2
assert model.token_spam_counts == {"spam": 1, "rules": 1}
assert model.token_ham_counts == {"ham": 2, "rules": 1, "hello": 1}

In [36]:
text = "hello spam"

probs_if_spam = [
    (1 + 0.5) / (1 + 2 * 0.5),      # "spam"  (present)
    1 - (0 + 0.5) / (1 + 2 * 0.5),  # "ham"   (not present)
    1 - (1 + 0.5) / (1 + 2 * 0.5),  # "rules" (not present)
    (0 + 0.5) / (1 + 2 * 0.5)       # "hello" (present)
]

probs_if_ham = [
    (0 + 0.5) / (2 + 2 * 0.5),      # "spam"  (present)
    1 - (2 + 0.5) / (2 + 2 * 0.5),  # "ham"   (not present)
    1 - (1 + 0.5) / (2 + 2 * 0.5),  # "rules" (not present)
    (1 + 0.5) / (2 + 2 * 0.5),      # "hello" (present)
]

p_if_spam = math.exp(sum(math.log(p) for p in probs_if_spam))
p_if_ham = math.exp(sum(math.log(p) for p in probs_if_ham))

# Should be about 0.83
assert model.predict(text) == p_if_spam / (p_if_spam + p_if_ham)

AssertionError: 

In [37]:
BASE_URL = "https://spamassassin.apache.org/old/publiccorpus/"
FILES = ["20021010_easy_ham.tar.bz2", "20021010_hard_ham.tar.bz2", "20021010_spam.tar.bz2"]
# aqui terminan los datos, en los subdirectorios /spam, /easy_ham y /hard_ham cambie esto a donde quiera los datos

OUTPUT_DIR = "spam_data"

for filename in FILES:
    # usa request para obtener el contenido del archivo en cada url
    content = requests.get(f"{BASE_URL}{filename}").content
    # envuelve los bytes en memoria para poder usarlos cono "archivo"
    fin = BytesIO(content)
    # y extrae todos los archivos al dir de salida especificado
    with tarfile.open(fileobj=fin, mode='r:bz2') as tf:
        tf.extractall(OUTPUT_DIR)

  tf.extractall(OUTPUT_DIR)


In [38]:
# modifique la ruta donde tenga los archivos
path ='spam_data/*/*'
data: List[Message] = []
# glob.glob devuelve nombres de archivo que coinciden con la ruta comodin
for filename in glob.glob(path):
    is_spam = "ham" not in filename
    # hay caracteres sobrantes en los emails; el errrors='ignore' los salta en lugar de mostrar una excepcion
    with open(filename, errors='ignore') as email_file:
        for line in email_file:
            if line.startswith("Subject: "):
                subject = line.lstrip("Subject: ")
                data.append(Message(subject, is_spam))
                break

In [39]:
X = TypeVar('X')

def split_data(data: List[X], prob: float) -> Tuple[List[X], List[X]]:
    """Copia del código que definimos anteriormente"""
    data = data[:]
    random.shuffle(data)
    cut = int(len(data) * prob)
    return data[:cut], data[cut:]

random.seed(0)
train_messages, test_messages = split_data(data, 0.75)
model = NaiveBayesClassifier()
model.train(train_messages)

In [40]:
predictions = [(message, model.predict(message.text)) for message in test_messages]
# supone que espam_probability > 0.5 corresponde a prediccion de spam y cuenbta las combinaciones de (actual is_spam, predicted id _spam)
confusion_matrix = Counter((message.is_spam, spam_probability > 0.5) for message, spam_probability in predictions)
print(confusion_matrix)

Counter({(False, False): 673, (True, True): 87, (True, False): 39, (False, True): 26})


In [41]:
def p_spam_given_token(token: str, model: NaiveBayesClassifier) -> float:
    # probablemente no habria que usar metodos privados, pero es por una buena causa\
    prob_if_spam, prob_if_ham = model._probabilities(token)
    return prob_if_spam / (prob_if_spam + prob_if_ham)
words = sorted(model.tokens, key=lambda t: p_spam_given_token(t, model))
print("Palabras más indicativas de spam: ", words[-10:])
print("Palabras más indicativas de ham: ", words[:10])

Palabras más indicativas de spam:  ['assistance', 'zzzz', '95', 'attn', 'money', 'clearance', 'per', 'sale', 'systemworks', 'adv']
Palabras más indicativas de ham:  ['spambayes', 'users', 'razor', 'zzzzteana', 'sadev', 'apt', 'perl', 'ouch', 'spamassassin', 'bliss']


In [43]:
def drop_final_s(word):
    return re.sub("s$", "", word)