In [None]:
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import accuracy_score, classification_report
from sklearn.datasets import load_iris
import pandas as pd
from re import findall
import math

# **Classificador Naive Bayes para Texto**


---


Considere um classificador de Naive Bayes para distinguir entre mensagens de spam e não spam. O classificador é treinado usando um conjunto de mensagens rotulado, onde cada mensagem é associada a uma etiqueta indicando se é spam ou não.

## Divisão de Texto

Dada uma mensagem de texto, a função `dividir_texto` realiza a seguinte pré-processamento:

\begin{align*}
\text{Texto original} & : \text{Converte o texto para minúsculas} \\
& : \text{Extrai as palavras usando uma expressão regular} \\
& : \text{Retorna um conjunto de palavras únicas na mensagem}
\end{align*}

## Probabilidades

Para determinar se uma mensagem é spam ou não dado que uma mensagem é enviada:

\begin{align*}
P(\text{spam} | \text{mensagem}) & = \frac{P(\text{mensagem} | \text{spam}) \cdot P(\text{spam})}{P(\text{mensagem} | \text{spam}) \cdot P(\text{spam}) + P(\text{mensagem} | \neg \text{spam}) \cdot P(\neg \text{spam})}
\end{align*}

Precisamos determinar a probabilidade da mesma ocorrer dada que ela é spam ou não. Dessa forma, a função `probabilidades` calcula as probabilidades condicionais de uma palavra ocorrer em mensagens de spam e não spam:

\begin{align*}
& P(\text{palavra} | \text{spam}) = \frac{\text{freq_palavra_spam} + k}{\text{mensagens_spam} + 2k} \\
& P(\text{palavra} | \neg \text{spam}) = \frac{\text{freq_palavra_nspam} + k}{\text{mensagens_nspam} + 2k}
\end{align*}

onde `freq_palavra_spam` é a contagem de ocorrências da palavra em mensagens de spam, `mensagens_spam` é o total de mensagens de spam, e assim por diante.

Como os eventos são independentes, para uma mensagem com uma série de palavras, a probabilidade de ser spam é o produto das probabilidades de cada palavra, uma vez que são independentes:

\begin{align*}
P(X_1 = x_1, \ldots, X_n = x_n | \text{spam}) & = P(X_1 = x_1 | \text{spam}) \times \ldots \times P(X_n = x_n | \text{spam})
\end{align*}

### Overflow
Temos um problema em relação a probabilidades remotas de algumas palavras, levando a problemas numéricos de precisão em computadores. Dessa forma, para calcular o produto dessas probabilidades vamos utilizar logaritmos `log(ab) = log(a) + log(b)` e depois obter os valores originais com `exp(log())`.

\begin{align*}
&\text{Sejam } (p_1, p_2, \ldots, p_n) \text{ as probabilidades das palavras. A multiplicação de probabilidades é dada por:} \\
&p_{\text{total}} = p_1 \cdot p_2 \cdot \ldots \cdot p_n \\
&\text{Ao usar log-probabilidades, a expressão torna-se:} \\
&\log(p_{\text{total}}) = \log(p_1) + \log(p_2) + \ldots + \log(p_n) \\
&\text{Finalmente, para obter a probabilidade total, aplicamos a exponenciação:} \\
&p_{\text{total}} = e^{\log(p_{\text{total}})}
\end{align*}

Isso ajuda a evitar problemas numéricos associados à multiplicação de muitos valores pequenos, tornando o cálculo mais estável e eficiente.

## Parâmetros do Modelo

Para evitar que alguma probabilidade resulte em zero e torne o produto dessa probabilidade igual a 0, fazendo o nosso classificador tendencioso, precisamos usar algum tipo de suavização. Para isso, usamos o parametro k no modelo:

\begin{align*}
k & : \text{Parâmetro de suavização} \\
\end{align*}

In [None]:
class NaiveBayes:
  def __init__(self, k = 0.5):
      self.k = k
      self.palavras = set()
      self.freq_palavras_spam = dict()
      self.freq_palavras_nspam = dict()
      self.mensagens_spam = 0
      self.mensagens_nspam = 0

  def dividir_texto(self, texto):
    texto = texto.lower()
    palavras = findall(r"[\w']+|[a-z0-9']+", texto)

    return set(palavras)

  def probabilidades(self, palavra):
    #P(palavra | spam) e P(palavra | nspam)
    #P(palavra) = freq_palavra / tot_palavras
    #suavizador: se tenho varias palavras e p(X = pi) = 0, então a multiplicação dessas palavras é 0.
    palavra_spam = self.freq_palavras_spam.get(palavra, 0)
    palavra_nspam = self.freq_palavras_nspam.get(palavra, 0) #pode ser que a palavra não esteja lá.
    p_palavra_spam = (palavra_spam + self.k) / (self.mensagens_spam + 2 * self.k)
    p_palavra_nspam = (palavra_nspam + self.k) / (self.mensagens_nspam + 2 * self.k)

    return p_palavra_spam, p_palavra_nspam

  def treinar(self, messagens):
    for mensagem in messagens:
        if mensagem['is_spam']:
          self.mensagens_spam += 1
        else:
          self.mensagens_nspam += 1

        for palavra in self.dividir_texto(mensagem['texto']):
            self.palavras.add(palavra)
            if mensagem['is_spam']:
              if palavra not in self.freq_palavras_spam.keys():
                self.freq_palavras_spam[palavra] = 0

              self.freq_palavras_spam[palavra] += 1
            else:
              if palavra not in self.freq_palavras_nspam.keys():
                self.freq_palavras_nspam[palavra] = 0
              self.freq_palavras_nspam[palavra] += 1

  def oraculo(self, texto):
    palavras = self.dividir_texto(texto)
    log_prob_spam = log_prob_nspam = 0.0

    # Itera por cada palavra no nosso vocabulário
    for palavra in self.palavras:
        p_palavra_spam, p_palavra_nspam = self.probabilidades(palavra)

        # Se *token* aparece na mensagem,
        # adiciona a probabilidade logarítmica de vê-lo
        if palavra in palavras:
            log_prob_spam += math.log(p_palavra_spam)
            log_prob_nspam += math.log(p_palavra_nspam)

        # Caso contrário, adiciona a probabilidade logarítmica de _não_ vê-lo,
        # que é log(1 - probabilidade de vê-lo)
        else:
            log_prob_spam += math.log(1.0 - p_palavra_spam)
            log_prob_nspam += math.log(1.0 - p_palavra_nspam)

    p_spam = math.exp(log_prob_spam)
    p_nspam = math.exp(log_prob_nspam)

    return p_spam / (p_spam + p_nspam)

## Conjunto de dados

Um conjunto de dados popular (embora um tanto antigo) é o SpamAssassin.

Depois de baixar os dados você deverá ter três pastas: spam, easy_ham e hard_ham. Cada pasta contém muitos emails, cada um contido em um único arquivo. Para manter as coisas realmente simples, veremos apenas as linhas de assunto de cada e-mail.

In [None]:
import glob
from io import BytesIO
import requests
import tarfile

url = "https://spamassassin.apache.org/old/publiccorpus"
files = [
    "20021010_easy_ham.tar.bz2",
    "20021010_hard_ham.tar.bz2",
    "20021010_spam.tar.bz2"
]

diretorio_saida = 'spam_data'

for file in files:
  conteudo = requests.get(f"{url}/{file}").content
  fin = BytesIO(conteudo)

  with tarfile.open(fileobj=fin, mode='r:bz2') as tf:
    tf.extractall(diretorio_saida)


def tratar_emails(path, is_spam):
  mensagens = []
  for nome_arquivo in glob.glob(path):
    with open(nome_arquivo, errors='ignore') as arquivo_email:
      for linha in arquivo_email:
        if linha.startswith("Subject:"):
          assunto = linha.lstrip("Subject: ")
          mensagens.append({'texto': assunto, 'is_spam': is_spam})
          break

  return mensagens

spam_data = tratar_emails('spam_data/spam/*', is_spam=True)
easy_ham_data = tratar_emails('spam_data/easy_ham/*', is_spam=False)
hard_ham_data = tratar_emails('spam_data/hard_ham/*', is_spam=False)

## Treinamento

A função `treinar` recebe um conjunto de mensagens rotulado e atualiza as contagens de palavras associadas a spam e não spam, bem como as contagens de mensagens.

In [None]:
dados_treinamento = spam_data + easy_ham_data + hard_ham_data

naive_bayes = NaiveBayes(k=0.5)
naive_bayes.treinar(dados_treinamento)

novo_texto = "I drink coffee with milk"
probabilidade_spam = naive_bayes.oraculo(novo_texto)

print(f"p(S|M): {probabilidade_spam}")

p(S|M): 0.03233520358086041


In [None]:
mensagens = [
    {'texto': "spam rules", 'is_spam': True},
    {'texto': "ham rules", 'is_spam': False},
    {'texto': "hello ham", 'is_spam': False},
]

nb = NaiveBayes(k=0.5)
nb.treinar(mensagens)

text = "hello spam"
print(nb.oraculo(text))

0.8350515463917525


$$
\text{probs_if_spam} = \left[ \frac{{n_{\text{{spam}}} + k}}{{N_{\text{{spam}}} + 2k}}, 1 -
\frac{{n_{\text{{spam}}} + k}}{{N_{\text{{spam}}} + 2k}}, 1 - \frac{{n_{\text{{ham}}} + k}}{{N_{\text{{ham}}} + 2k}}, 1 - \frac{{n_{\text{{rules, spam}}} + k}}{{N_{\text{{spam}}} + 2k}}, \frac{{n_{\text{{hello, spam}}} + k}}{{N_{\text{{spam}}} + 2k}} \right]
$$


# Naive Bayes em dados contínuos

Ao lidar com dados contínuos, os valores são associados a cada classe são distribuídos de acordo com uma distribuição normal (ou gaussiana). Então, basicamente, uma estrategia semelhante a anterior é aplicada, ou seja, as classes são segmentadas e calculadas as probabilidades normais.

![image.png](https://wikimedia.org/api/rest_v1/media/math/render/svg/685339e22f57b18d804f2e0a9c507421da59e2ab)

## Conjunto de dados Íris
Iremos utilizar um conjunto de dados classico de Fisher, 1936. Um dos primeiros conjuntos de dados conhecidos usados para avaliar métodos de classificação e aprendizado de máquina.

O conjunto de dados contém 3 classes de 50 instâncias cada, onde cada classe se refere a um tipo de planta de íris.

In [None]:
class NaiveBayesGauss:
  def __init__(self):
    self.naive_bayes = GaussianNB()
    self.iris = load_iris()
    self.X = self.iris.data
    self.y = self.iris.target

  def dataframe(self):
    iris_df = pd.DataFrame(data=self.iris.data, columns=self.iris.feature_names)
    iris_df['target'] = self.iris.target_names[self.y]
    return(iris_df)

  def dividir_iris(self):
    X_treino, X_teste, y_treino, y_teste = train_test_split(self.X, self.y, test_size=0.2, random_state=42)

    return X_treino, X_teste, y_treino, y_teste

  def treinar(self, X_treino, y_treino):
    self.naive_bayes.fit(X_treino, y_treino)

  def oraculo(self, X_teste):
    return self.naive_bayes.predict(X_teste)

  def dados_acuracia(self, y_teste, user_prediction):
    print(accuracy_score(y_teste, user_prediction), classification_report(y_teste, user_prediction))

In [None]:
nbg = NaiveBayesGauss()
X_treino, X_teste, y_treino, y_teste = nbg.dividir_iris()
nbg.treinar(X_treino, y_treino)

user_input = [[1.0, 2.5, 6.5, 3.2]]
user_prediction = nbg.oraculo(user_input)
print(f'Previsão do usuário: {nbg.iris.target_names[user_prediction[0]]}')

df = nbg.dataframe()
df

Previsão do usuário: virginica


Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),target
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa
...,...,...,...,...,...
145,6.7,3.0,5.2,2.3,virginica
146,6.3,2.5,5.0,1.9,virginica
147,6.5,3.0,5.2,2.0,virginica
148,6.2,3.4,5.4,2.3,virginica
