In [None]:
import textract
import re
import os
from sklearn import *
from nltk.stem import SnowballStemmer
from nltk.corpus import stopwords
import tensorflow as tf
import numpy as np
import psycopg2 as pg

In [None]:
def load_files(dataset): # Получение расположения файлов
    files_type = ['train', 'test']
    if (dataset in files_type):
        conn = pg.connect('dbname=articles user=postgres password=1234') # Подключение к БД
        cur = conn.cursor()
        cur.execute(f'SELECT link, class_id FROM {dataset};') # Извлечение из БД ссылок на статьи и разметки классов
        files = cur.fetchall()
        cur.close()
        conn.close()
        return files
    else:
        print('Указан неверный dataset')

def load_data(files): # Загрузка данных из файлов
    snowball = SnowballStemmer(language='russian')
    regex = re.compile('([А-Яа-я]{2,100})')
    content = [] 
    for i in range(len(files)):
        words = list(map(str.lower, regex.findall(textract.process(files[i][0]).decode())))
        if(len(words) != 0):
            words = list(filter(lambda word: word not in stopwords.words('russian'), words))
            words = list(map(snowball.stem, words))
            content.append((str.join(' ', words), files[i][1]))
    return content

def val_split(dataset, frac): # Разделение на валидационный набор
    val_split = np.unique([content[1] for content in dataset])
    val_content = []
    for label in val_split:
        indices = []
        for i in range(len(dataset)):
            if dataset[i][1] == label:
                indices.append(i)
        [val_content.append(dataset.pop(index)) for index in [indices[::frac][j]-j for j in range(len(indices[::frac]))]]
    return val_content

In [None]:
train_files = load_files('train')
# train_content = load_data(train_files)

In [None]:
print(f'Размер обучающих данных: {len(train_content)} экземпляров')
print(train_content[0])

In [None]:
test_files = load_files('test')            
test_content = load_data(test_files)

In [None]:
print(f'Размер тестовых данных: {len(test_content)} экземпляров')
print(test_content[0])

In [None]:
vectorizer = sklearn.feature_extraction.text.TfidfVectorizer()
vectorizer.fit([data[0] for data in train_content])

x_train = vectorizer.transform([data[0] for data in train_content]).toarray()
x_test = []

for data in test_content:
    x_test.append(vectorizer.transform([data[0]]).toarray()[0])
    
x_test = np.array(x_test)

In [None]:
y_train = [data[1] for data in train_content]
y_test = [data[1] for data in test_content]

In [None]:
%%time
# Naive Bayes
from sklearn import naive_bayes

nb = naive_bayes.GaussianNB()
nb.fit(x_train, y_train)

In [None]:
nb.score(x_test, y_test)

In [None]:
%%time
# KNN

knn = neighbors.KNeighborsClassifier(n_neighbors=5)
knn.fit(x_train, y_train)

In [None]:
%%time
knn.score(x_test, y_test)

In [None]:
%%time
# Decision Tree

tree = sklearn.tree.DecisionTreeClassifier(criterion='log_loss')
tree.fit(x_train, y_train)

In [None]:
tree.score(x_test, y_test)

In [None]:
%%time
# SVM

svm = sklearn.svm.SVC(kernel='linear', probability=True, cache_size=2000)
svm.fit(x_train, y_train)
svm.score(x_test, y_test)

In [None]:
%%time
svm = sklearn.svm.SVC(kernel='rbf', cache_size=2000)
svm.fit(x_train, y_train)
svm.score(x_test, y_test)

In [None]:
svm = sklearn.svm.SVC(kernel='sigmoid', cache_size=1000)
svm.fit(x_train, y_train)
svm.score(x_test, y_test)

In [None]:
%%time
# Random forest

rf = sklearn.ensemble.RandomForestClassifier(n_estimators=150, n_jobs=-1)
rf.fit(x_train, y_train)

In [None]:
rf.score(x_test, y_test)

In [None]:
%%time
# Bagging SVM

base = sklearn.svm.SVC(kernel='linear')
bag = sklearn.ensemble.BaggingClassifier(base_estimator=base, n_estimators=10, max_samples=0.5, max_features=0.5, n_jobs=-1)
bag.fit(x_train, y_train)
bag.score(x_test, y_test)

In [None]:
# Bagging NB

base = sklearn.naive_bayes.GaussianNB()
bag = sklearn.ensemble.BaggingClassifier(base_estimator=base, n_estimators=10, max_samples=0.5, max_features=0.5, n_jobs=-1)
bag.fit(x_train, y_train)
bag.score(x_test, y_test)

In [None]:
# KERAS NN

from tensorflow.keras.layers import TextVectorization
vect = TextVectorization(output_mode='tf_idf')#, max_tokens=max_features)
with tf.device("/CPU:0"):
    vect.adapt([data[0] for data in train_content])

In [None]:
val_content = val_split(train_content, 8)

In [None]:
x_train = vect([data[0] for data in train_content]).numpy()
y_train = np.array([data[1] for data in train_content])#.reshape(-1,1)
y_train = tf.one_hot(y_train, len(np.unique(y_train))).numpy()
print(x_train.shape)
print(y_train.shape)

In [None]:
x_test = vect([data[0] for data in test_content]).numpy()
y_test = np.array([data[1] for data in test_content])#.reshape(-1,1)
y_test = tf.one_hot(y_test, len(np.unique(y_test))).numpy()
print(x_test.shape)
print(y_test.shape)

In [None]:
print('Вектор слов:')
print(x_train)
print('Вектор классов:')
print(y_train)

In [None]:
x_val = vect([data[0] for data in val_content]).numpy()
y_val = np.array([data[1] for data in val_content])#.reshape(-1,1)
y_val = tf.one_hot(y_val, len(np.unique(y_val))).numpy()
print(x_val.shape)
print(y_val.shape)

In [None]:
from tensorflow.keras import layers
import matplotlib.pyplot as plt

In [None]:
num_labels = len(np.unique([c[1] for c in train_content]))

In [None]:
model = tf.keras.Sequential(
        [
            layers.Dense(400, activation="relu"),
            layers.Dense(300, activation="relu"),
            layers.Dense(200, activation="relu"),
            layers.Dense(num_labels, activation="softmax"),
        ]
    )

In [None]:
epochs = 5
model.compile(
    loss="categorical_crossentropy", optimizer=tf.keras.optimizers.Adam(), metrics=["categorical_accuracy"])

In [None]:
history = model.fit(
    x=x_train, y=y_train, batch_size=None, validation_data=(x_val, y_val), epochs=epochs)


def plot_result(item):
    plt.plot(history.history[item], label=item)
    plt.plot(history.history["val_" + item], label="val_" + item)
    plt.xlabel("Epochs")
    plt.ylabel(item)
    plt.title("Train and Validation {} Over Epochs".format(item), fontsize=14)
    plt.legend()
    plt.grid()
    plt.show()


plot_result("loss")
plot_result("categorical_accuracy")

In [None]:
loss, categorical_acc = model.evaluate(x=x_test, y=y_test)
print(f'Функция ошибки на тестовых данных: {loss}')
print(f'Точность на тестовых данных: {categorical_acc*100}%')

In [None]:
pred = svm.predict(x_test)
comparison = np.array([[pred[i], test_content[i][1]] for i in range(len(pred))])
accuracy = round(sum([1 if p[0] == p[1] else 0 for p in comparison])/len(comparison), 2)

pred_labels = [list(filter(lambda p_: p_[0]==label, comparison)) for label in np.unique(comparison)]
target_labels = [list(filter(lambda p_: p_[1]==label, comparison)) for label in np.unique(comparison)]

In [None]:
macro_precision = [round(sum([1 if p[0] == p[1] else 0 for p in pred_labels[label]])/len(pred_labels[label]), 2) for label in np.unique(comparison)]
macro_recall = [round(sum([1 if p[0] == p[1] else 0 for p in target_labels[label]])/len(target_labels[label]), 2) for label in np.unique(comparison)]
print('===Macro-average===\n')
print('precision:\t', ';\t'.join('Class {0}: {1}'.format(*p) for p in enumerate(macro_precision)))
print('recall: \t', ';\t'.join('Class {0}: {1}'.format(*p) for p in enumerate(macro_recall)))
print('F-score:\t', ';\t'.join('Class {0}: {1}'.format(*p) for p in enumerate(macro_f)))

In [None]:
conn = pg.connect('dbname=articles user=postgres password=1234')
cur = conn.cursor()
cur.execute('SELECT * FROM classes;')
section_map = {}
for section in cur.fetchall():
    section_map[section[0]] = section[1]
cur.close()
conn.close()

In [None]:
test_articles = load_data([test_files[12], test_files[96], test_files[182]])
names = []
for article in [test_files[12], test_files[96], test_files[182]]:
    names.append(f'\n{article[0].split("/")[-1]}')
    print(f'Класс: {article[1]} - {section_map[article[1]]}\nПуть к файлу: {article[0]}')

In [None]:
X = vec.transform([data[0] for data in test_articles]).toarray()
for i in range(len(X)):
    print(names[i])
    prediction = svm.predict_proba(X[i].reshape(1,-1))
    prediction = [round(proba, 2) for proba in prediction.reshape(-1,)]
    for j in range(len(prediction)):
        print(f'Класс {j} - {section_map[j]}: {prediction[j]*100}%')