In [1]:
import os

In [2]:
def extract_text(filename):
    with open(os.path.join(dirpath, filename)) as f_text:
        return next(f_text)

for dirpath, dirnames, filenames in os.walk('data/politics/'):
    docs_0 = [extract_text(f) for f in filenames]
    
for dirpath, dirnames, filenames in os.walk('data/sports/'):
    docs_1 = [extract_text(f) for f in filenames]

In [3]:
len(docs_0), len(docs_1)

(35, 34)

## Морфологическая предобработка предложений

In [4]:
import re

from functools import lru_cache

from pymorphy2 import MorphAnalyzer
from nltk.stem import WordNetLemmatizer

ru_morph = MorphAnalyzer()
en_morph = WordNetLemmatizer()

CYRILLIC_PATTERN = re.compile('[а-яА-Я]')
DIGIT_PATTERN = re.compile('\d+')

def has_cyrillic(text):
    return bool(CYRILLIC_PATTERN.search(text))

def has_numeric(text):
    return bool(DIGIT_PATTERN.search(text))

@lru_cache(maxsize=1500)
def morph_process(token):
    if has_cyrillic(token):
        return ru_morph.parse(token)[0].normal_form
    elif has_numeric(token):
        return token
    else:
        return en_morph.lemmatize(token)
    
def morph_sentence(sentence):
    res = re.findall('\d+', sentence.lower())
    words = re.findall('[^\W\d_]+', sentence.lower())
    res.extend(map(morph_process, words))
    return res

## TF-IDF

In [5]:
import numpy as np
from scipy.sparse import csr_matrix


class TfIdf:
    def __init__(self, preprocess, stopwords=None, idf=True, norm=True):
        self.preprocess_ = preprocess
        self.vocabulary_ = {}
        
        self._use_idf = idf
        self._use_norm = norm
        
    def _transform(self, docs, use_vocab=True):
        if not use_vocab:
            self.vocabulary_ = {}
        vocabulary = self.vocabulary_
        
        df_ = {}
        tf_, indices, indptr = [], [], [0]
        
        for doc_id, doc in enumerate(docs):
            tf_counts, df_entries = {}, set()
            
            for term in self.preprocess_(doc):                
                if term not in vocabulary:
                    if use_vocab:
                        continue
                    vocabulary[term] = len(vocabulary)
                term_id = vocabulary[term]
                
                if term_id not in tf_counts:
                    tf_counts[term_id] = 0
                tf_counts[term_id] += 1
                
                df_entries.add(term_id)
                
            indices.extend(tf_counts.keys())
            tf_.extend(tf_counts.values())
            indptr.append(len(indices))
            
            for term_id in df_entries:
                df_[term_id] = df_.get(term_id, 0) + 1
                
        tf_ = csr_matrix((tf_, indices, indptr), dtype=np.uint32,
                         shape=(len(docs), len(vocabulary)))
        tf_.sort_indices()
        tf_ = tf_.toarray()
        
        if self._use_norm:
            # norm = np.sqrt(np.power(tf_, 2).sum(axis=1, keepdims=True))
            norm = tf_.max(axis=1, keepdims=True)
            tf_ = tf_ / norm
        
        if not use_vocab:
            self.idf_ = [np.log10(len(docs) / float(df_[tid])) for tid in range(len(vocabulary))]
            self.idf_ = np.asarray(self.idf_)
        idf_ = self.idf_
        
        if self._use_idf:
            tf_ = tf_ * idf_.reshape(1, -1)
        
        return tf_
        
    def fit_transform(self, docs):
        return self._transform(docs, use_vocab=False)
    
    def fit(self, docs):
        _ = self.fit_transform(docs)
        return self
    
    def transform(self, docs):
        return self._transform(docs, use_vocab=True)

## Naive Bayes Classifier

In [6]:
class NaiveBayes:
    def __init__(self):
        self._ranker_tfidf = TfIdf(preprocess=morph_sentence, idf=False)
    
    def fit(self, docs, y):
        assert len(docs) == len(y)
        
        y = np.asarray(y)
        
        # P(cl) ~ prior
        cl_unique, cl_counts = np.unique(y, return_counts=True)
        cl_counts = cl_counts / cl_counts.sum()
        self.cl_probs = dict(zip(cl_unique, cl_counts))
        
        # P(term|cl) ~ likelihood
        self.t_probs = {}
        tfs = self._ranker_tfidf.fit_transform(docs)
        for cl in self.cl_probs:
            mask = y == cl
            tf = 1 + tfs[mask, :].sum(axis=0)
            self.t_probs[cl] = tf / tf.sum()
            
        self.cl_order = sorted(self.cl_probs.keys())
        
    def predict(self, docs):
        result = []
        tfs = self._ranker_tfidf.transform(docs)
        
        for doc_i in range(len(docs)):
            mask = tfs[doc_i] > 0
            prob = { cl: self.t_probs[cl][mask].prod() * prior
                     for cl, prior in self.cl_probs.items() }
            result.append(prob)
            
        result = np.asarray([[pred[cl] for cl in self.cl_order] for pred in result])
        return result                

In [7]:
def train_valid_split(X, n_train):
    np.random.seed(8888)
    index = np.random.permutation(len(X))
    X_train = [X[i] for i in index[:n_train]]
    X_valid = [X[i] for i in index[n_train:]]
    return X_train, X_valid

# класс 0 -- политика; класс 1 -- спорт
docs_train_0, docs_valid_0 = train_valid_split(docs_0, 10)
docs_train_1, docs_valid_1 = train_valid_split(docs_1, 10)

In [8]:
cl = NaiveBayes()
cl.fit(docs_train_0 + docs_train_1, [0] * 10 + [1] * 10)

In [9]:
print(docs_valid_0[1])
cl.predict([docs_valid_0[1]])

Ежегодное послание президента РФ Федеральному собранию, скорее всего, вновь прозвучит в начале следующего года, а не в декабре, как раньше. О вероятном переносе послания заявил пресс-секретарь президента Дмитрий Песков.


array([[1.21704514e-35, 4.25872451e-37]])

In [10]:
print(docs_valid_1[5])
cl.predict([docs_valid_1[5]])

Нападающий "Коламбуса" Кэм Эткинсон стал первой звездой прошедшей недели в регулярном чемпионате НХЛ, сообщается на официальном сайте лиги. В трех матчах на минувшей неделе форвард набрал восемь очков (пять голов и три результативные передачи), в игре с "Каролиной" (4:1) оформил хет-трик.


array([[1.50306164e-61, 1.05017266e-57]])

## Оценка модели

In [11]:
y_preds = cl.predict(docs_valid_0 + docs_valid_1)
y_preds = np.argmax(y_preds, axis=1)

y_true = [0] * len(docs_valid_0) + [1] * len(docs_valid_1)

In [12]:
from sklearn.metrics import precision_recall_fscore_support

In [13]:
metrics = precision_recall_fscore_support(y_true, y_preds, average='macro')
print(("Precision: {:>.10f}\n" + \
       "Recall:    {:>.10f}\n" + \
       "F-measure: {:>.10f}").format(*metrics))

Precision: 0.7759197324
Recall:    0.7750000000
F-measure: 0.7751355861
