In [1]:
from typing import Set
import re

In [2]:
def tokenize(text:str) -> Set[str]:
    text = text.lower()
    all_words = re.findall("[a-z0-9']+", text)
    return set(all_words)

assert tokenize("Data Science is science!") == {'data', 'is', 'science'}

In [3]:
from typing import NamedTuple

In [4]:
class Message(NamedTuple):
    text: str
    is_spam: bool

In [5]:
from typing import List, Tuple, Dict, Iterable
import math
from collections import defaultdict

In [58]:
class NaiveBayesClassifier:
    def __init__(self, k:float = 0.5) -> None:
        self.k = k # 스무딩 인수
        self.tokens: Set[str] = set()
        self.token_spam_counts: Dict[str, int] = defaultdict(int)
        self.token_ham_counts: Dict[str, int] = defaultdict(int)
        self.spam_messages = self.ham_messages = 0
        
    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
        # 메시지의 수를 증가시킨다.
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages += 1
                
            # 단어의 빈도를 증가시킨다.
            for token in tokenize(message.text):
                self.tokens.add(token)
                if message.is_spam:
                    self.token_spam_counts[token] += 1
                else:
                    self.token_ham_counts[token] += 1
                
    def _probabilities(self, token: str) -> Tuple[float, float]:
        """P(단어|스팸)과 P(단어|햄)을 반환"""
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]
        
        p_token_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
        p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)
        
        return p_token_spam, p_token_ham
    
    def predict(self, text:str) -> float:
        text_tokens = tokenize(text)
        log_prob_if_spam = log_prob_if_ham = .0
        
        # 모든 메시지 안의 각 단어를 순회한다.
        for token in self.tokens:
            prob_if_spam, prob_if_ham = self._probabilities(token)
            
            
            # 만약 *token*이 메시지에 나온다면...
            # 단어가 등장할 로그 확률값을 더한다.
            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)

            # 그에 아니라면 단어가 등장하지 않을 로그 확률을 더한다.
            # 이는 log(1-등장할 확률)이다
            else:
                log_prob_if_spam += math.log(1- prob_if_spam)
                log_prob_if_ham += math.log(1- prob_if_ham)
                
        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)
        return prob_if_spam / (prob_if_spam + prob_if_ham)

In [59]:
messages = \
[Message("spam rules", is_spam=True),
Message("ham rules", is_spam=False),
Message("hello ham", is_spam=False)]

In [60]:
model = NaiveBayesClassifier(k=0.5)
model.train(messages)

In [61]:
/print model.spam_messages
/print model.ham_messages
/print model.token_ham_counts
/print model.token_spam_counts

1
2
defaultdict(<class 'int'>, {'rules': 1, 'ham': 2, 'hello': 1})
defaultdict(<class 'int'>, {'spam': 1, 'rules': 1})


In [62]:
model.predict('Hello spam')

0.8350515463917525

### 모델 사용하기

In [147]:
from io import BytesIO
import requests
import tarfile

In [66]:
import os

In [70]:
from glob import glob

In [148]:
BASE_URL = 'https://spamassassin.apache.org/old/publiccorpus/'

In [149]:
FILES = \
['20021010_easy_ham.tar.bz2',
'20021010_hard_ham.tar.bz2',
'20021010_spam.tar.bz2']

In [150]:
OUTPUT_DIR = 'data/spam_data'

In [160]:
# 데이터 다운로드, 1회만 실행
if False:
    for filename in FILES:
        content = requests.get(f"{BASE_URL}/{filename}").content
        fin = BytesIO(content)
        with tarfile.open(fileobj=fin, mode='r:bz2') as tf:
            tf.extractall(OUTPUT_DIR)

In [165]:
path = './data/spam_data/*/*'
data: List[Message] = []

for filename in glob(path):
    is_spam = "ham" not in filename
    
    # 메일에 잘못된 문자가 들어 있는 경우가 있다.
    # errors = 'ignore'는 exception을 반환하는 대신 건너뛰도록 해준다.
    with open(filename, errors='ignore') as email_file:
        for line in email_file:
            if line.startswith("Subject:"):
                subject = line.lstrip("Subjects: ")
                data.append(Message(subject, is_spam))
                break 

In [172]:
os.chdir("C:\\Users\\jongh\\OneDrive\\python\\machine-learning\\Data-Science-from-Scratch-master\\code-python3")
from machine_learning import split_data
os.chdir("C:\\Users\\jongh\\OneDrive\\python\\machine-learning")

In [198]:
import random
random.seed(0)
train_data, test_data = split_data(data, 0.75)

In [199]:
from collections import Counter

In [200]:
print(Counter([_data.is_spam for _data in data]))
print(Counter([t_data.is_spam for t_data in train_data]))
print(Counter([s_data.is_spam for s_data in test_data]))

Counter({False: 2800, True: 500})
Counter({False: 2101, True: 369})
Counter({False: 699, True: 131})


In [201]:
model = NaiveBayesClassifier()

In [202]:
model.train(train_data)

In [203]:
predictions = [(message, model.predict(message.text)) for message in test_data]

In [204]:
confusion_matrix = Counter((message.is_spam, spam_probability > .5) for message, spam_probability in predictions)

In [205]:
/print confusion_matrix

Counter({(False, False): 673, (True, True): 84, (True, False): 47, (False, True): 26})
