In [86]:
import numpy as np
import os

from sklearn.model_selection import KFold

In [87]:
# class CountVectorizer:

#     unique_words = set()

#     def fit(self, X):
#         for x in X:
#             for word in x:
#                 self.unique_words.add(word)

#         return self

#     def transform(self, X):
#         output = np.zeros((len(X), len(self.unique_words)))
#         unique_words = list(self.unique_words)
#         for idx, x in enumerate(X):
#             for word in x:
#                 if word in self.unique_words:
#                     output[idx, unique_words.index(word)] += 1

#         return output

from sklearn.feature_extraction.text import CountVectorizer

In [133]:
class NaiveBayesClassifier:

    def __init__(self, alpha, penalties):
        self.alpha = alpha
        self.penalties = penalties
        self.num_classes = len(penalties)
        self.prior_ = None
        self.word_likelihood_ = None

    def fit(self, X, y):
        num_samples = X.shape[0]
        X_by_class = [[x for x, t in zip(X, y) if t == c] for c in np.unique(y)]

        self.prior_ = np.array([len(i) / num_samples for i in X_by_class])

        word_counts = np.array([np.array(i).sum(axis=0) for i in X_by_class]) + self.alpha
        self.word_likelihood_ = word_counts / word_counts.sum(axis=1).reshape(-1, 1)

    def predict_proba(self, X):
        probabilities = np.zeros(shape=(X.shape[0], self.prior_.shape[0]))

        # nested loop goes brughhh
        for i, x in enumerate(X):

            lk_message = np.zeros(self.prior_.shape[0])
            for word in x:
                if word != 0.:
                    lk_message += np.log(self.word_likelihood_[:, int(word)])

            probabilities[i] = lk_message + np.log(self.prior_) + np.log(self.penalties)

        return probabilities

Готовим данные

In [116]:
num_folders=10

In [117]:
def read_folder(i):
    path = "messages/part" + str(i)
    X, y = list(), list()
    
    for filename in os.listdir(path):
        with open(path + "/" + filename, 'r') as file:
            subject = list(map(int, file.readline().split()[1:]))
            file.readline()
            text = list(map(int, file.readline().split()))
            corpus = subject + text
            X.append(corpus)
            y.append(0 if "legit" in filename else 1)
    
    return X, y

In [118]:
def create_n_gram(l, n):
    n_gram = []
    for i in range(len(l) - n + 1):
        n_gram.append("SEP".join(map(str, l[i:(i + n)])))        
    return n_gram

In [119]:
def get_fold(i, n=1):
    all_folds = list(range(10))
    
    X_test, y_test = read_folder(i)
    
    X_train, y_train = list(), list()
    
    for f in all_folds:
        
        if f+1 != i:
            X, y = read_folder(f + 1)
            
            X_train.extend(X)
            y_train.extend(y)
            
    for i in range(len(X_train)):
        X_train[i] = create_n_gram(X_train[i], n)
        
    for i in range(len(X_test)):
        X_test[i] = create_n_gram(X_test[i], n)
        
    assert len(X_train) == len(y_train)
    assert len(X_test) == len(y_test)
    
    assert len(X_train) + len(X_test) == 1090
        
    return X_train, y_train, X_test, y_test

In [122]:
def fit_bayes(alpha, penalties=[1., 1.], n_gram=1):
    
    
    for fold in range(1, num_folders + 1):
        X_train, y_train, X_test, y_test = get_fold(fold, n_gram)
        
        v = CountVectorizer()
        
        X_train = [" ".join(x) for x in X_train]
        X_test = [" ".join(x) for x in X_test]
        
        X_train = v.fit_transform(X_train).toarray()
        X_test = v.transform(X_test).toarray()
        
        print(X_train.shape)
        
        classifier = NaiveBayesClassifier(alpha, penalties)

        classifier.fit(X_train, np.array(y_train))

        y_hat = classifier.predict_proba(X_test)
        
        y_pred = np.argmax(y_hat)
        
        print((y_pred == y_test).sum() / len(y_test))

In [134]:
X_train, y_train, X_test, y_test = get_fold(1, 1)

v = CountVectorizer()

X_train = [" ".join(x) for x in X_train]
X_test = [" ".join(x) for x in X_test]

X_train = v.fit_transform(X_train).toarray()
X_test = v.transform(X_test).toarray()

classifier = NaiveBayesClassifier(1.0, [1., 1.])

classifier.fit(X_train, np.array(y_train))

y_hat = classifier.predict_proba(X_test)

In [140]:
(1 - np.argmax(y_hat, 1) == y_test).sum()

61