In [1]:
import warnings
warnings.filterwarnings('ignore')

In [2]:
from parse_data import TextsinlevelsDB

In [3]:
textsinlevels = TextsinlevelsDB(db_name="textsinlevels")
df_news = textsinlevels.write_from_table_to_df("newsinlevels")
df_days = textsinlevels.write_from_table_to_df("daysinlevels")
del textsinlevels

In [6]:
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import spacy
from wordfreq import zipf_frequency
import seaborn as sns

In [7]:
class Text:
    def __init__(self, tokens):
        self.tokens = tokens
        self.lemmas = self.lemmatize()
    
    def lemmatize(self):
        return [token.lemma_.lower() for token in self.tokens
                       if token.is_alpha and not token.is_stop]
    
    def count_words(self):
        n_words = 0
        for token in self.tokens:
            if token.is_alpha:
                n_words += 1
        return n_words
    
    def count_sentences(self):
        n_sentences = 0
        for sent in self.tokens.sents:
            n_sentences += 1
        return n_sentences
    
    def count_type_token_ratio(self):
        return len(set(self.lemmas)) / len(self.lemmas) 
    
    def count_words_from_wordlist(self, wordlist):
        words_from_wordlists = 0
        
        for lemma in self.lemmas:
            if lemma in wordlist:
                words_from_wordlists += 1
        
        return words_from_wordlists / len(self.lemmas)
    
    def count_words_from_level_lists(self, word2level):
        levels = ("A1", "A2", "B1", "B2", "C1", "C2")
        level_freqs = {level: 0 for level in levels}
        
        for lemma in self.lemmas:
            level = word2level.get(lemma)
            if level:
                level_freqs[level] += 1

        for level in level_freqs:
            level_freqs[level] /= len(self.lemmas)
            
        return level_freqs
    
    def count_zipf_freqs(self):
        zipf_freqs = {}
        
        for lemma in self.lemmas:
            zipf_freq = math.floor(zipf_frequency(lemma, "en"))
            if zipf_freq in zipf_freqs:
                zipf_freqs[zipf_freq] += 1
            else:
                zipf_freqs[zipf_freq] = 1
        
        for zipf_freq in zipf_freqs:
            zipf_freqs[zipf_freq] /= len(self.lemmas)
        
        return zipf_freqs

In [8]:
class TextDataset:
    def __init__(self, dataset, dataset_name, nlp, preprocess):
        self.dataset_name = dataset_name
        self.dataset = dataset
        if preprocess:
            docs = spacy.tokens.DocBin(store_user_data=False)
            for doc in nlp.pipe(tqdm(self.dataset["article_text"])):
                docs.add(doc)
            docs.to_disk(self.dataset_name)
        else:
            docs = spacy.tokens.DocBin().from_disk(self.dataset_name)
        self.texts = [Text(doc) for doc in docs.get_docs(spacy.blank("en").vocab)]
    
    def count_words(self):
        return [text.count_words() for text in self.texts]
    
    def count_sentences(self):
        return [text.count_sentences() for text in self.texts]
    
    def lemmatize(self):
        return [text.lemmatize() for text in self.texts]
    
    def count_type_token_ratio(self):
        return [text.count_type_token_ratio() for text in self.texts]
    
    def count_words_from_wordlist(self, wordlist):
        return [text.count_words_from_wordlist(wordlist) for text in self.texts]
    
    def count_words_from_level_lists(self, word2level):
        return [text.count_words_from_level_lists(word2level) for text in self.texts]
        
    def count_zipf_freqs(self):
        return [text.count_zipf_freqs() for text in self.texts]
                
    def show_counts_info(self):
        print(self.dataset_name)
        d = {"Number of words": self.count_words(),
            "Number of sentences": self.count_sentences()}
        df_stats = pd.DataFrame(d)
        print(df_stats.describe())

        f = plt.figure(figsize=(10, 4))
        plt.suptitle(self.dataset_name)
        gs = f.add_gridspec(1, 2)

        for i, col in enumerate(d):
            ax = f.add_subplot(gs[0, i])
            ax = sns.distplot(df_stats[col], bins=20)

        plt.show()
        f.savefig(f"{self.dataset_name}-words_sentences_counts.png")
    
    def create_lexical_df(self, abstract_nouns, concrete_nouns):
        zipf_freqs = self.count_zipf_freqs()
        df_lexical = pd.DataFrame({f"zipf_freqs_{i}": [dct.get(i, 0) for dct in zipf_freqs]
                                     for i in range(1, 7)})
        for level in levels:
            df_lexical[level] = [dct[level] for dct in self.count_words_from_level_lists(word2level)]

        df_lexical["type_token_ratio"] = self.count_type_token_ratio()
        df_lexical["abstract_nouns"] = self.count_words_from_wordlist(abstract_nouns)
        df_lexical["concrete_nouns"] = self.count_words_from_wordlist(concrete_nouns)
        df_lexical["level"] = self.dataset["level"]
        return df_lexical

In [9]:
model = "en_core_web_lg"
nlp = spacy.load(model)

In [10]:
news = TextDataset(dataset=df_news, dataset_name="news", nlp=nlp, preprocess=False)
days = TextDataset(dataset=df_days, dataset_name="days", nlp=nlp, preprocess=False)

In [11]:
news_lemmas = [" ".join(news.texts[i].lemmas) for i in range(len(news.texts))]

In [12]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
from sklearn.preprocessing import MaxAbsScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import StratifiedKFold
from sklearn.naive_bayes import ComplementNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import ExtraTreesClassifier
from lightgbm import LGBMClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer

In [13]:
class MyTfidfVectorizer(TfidfVectorizer):
        
    def fit_transform(self, raw_documents, y):
        X = TfidfVectorizer.fit_transform(self, raw_documents, y=None)
        return X/X.sum(axis=1)

    def transform(self, raw_documents):
        X = TfidfVectorizer.transform(self, raw_documents)
        X.sum(axis=1)
        return X/X.sum(axis=1)

In [14]:
skf = StratifiedKFold(n_splits=4, shuffle=True, random_state=42)

In [15]:
target = list(news.dataset["level"])

In [19]:
def preprocess_word_list(filename):
    with open(f"wordlists/{filename}", encoding='utf-8') as f:
        words_from_list = f.read().split('\n')
        
    words_from_list = ' '.join([w for w in words_from_list if " " not in w and "-" not in w])
    words_from_list = nlp(words_from_list)
    words_from_list = set(w.lemma_.lower() for w in words_from_list)
    return words_from_list

In [20]:
abstract_nouns = preprocess_word_list("abstract_nouns.txt")
concrete_nouns = preprocess_word_list("concrete_nouns.txt")
print(len(abstract_nouns), len(concrete_nouns))

160 384


In [21]:
dct = {}
df = pd.read_csv("wordlists/Vocabulary Framework – British English.csv")
sub_df = df[["Base Word", "Level"]]
sub_df_min = sub_df.groupby("Base Word").min()
for d, data in sub_df_min.reset_index().groupby("Level"):
    dct[d] = list(data["Base Word"])

In [22]:
word2level = {}
for level, words in dct.items():
    for i in range(len(words)):
        if ' ' not in words[i]:
            word2level[words[i].lower()] = level

In [24]:
levels = ("A1", "A2", "B1", "B2", "C1", "C2")

In [25]:
news_lexical_df = news.create_lexical_df(abstract_nouns, concrete_nouns)
days_lexical_df = days.create_lexical_df(abstract_nouns, concrete_nouns)

In [26]:
news_lexical_df["lemmas"] = news_lemmas
news_lexical_df.drop(columns=["level"], inplace=True)

In [27]:
# X_train, X_test, y_train, y_test = train_test_split(news_lemmas, target, 
#                                                     train_size=0.8, 
#                                                     random_state=42,
#                                                    stratify=target)
X_train, X_test, y_train, y_test = train_test_split(news_lexical_df, target, 
                                                    train_size=0.8, 
                                                    random_state=42,
                                                   stratify=target)

In [28]:
scaler = MaxAbsScaler()
vectorizer = MyTfidfVectorizer()

In [46]:
clfs = {"naive_Bayes": ComplementNB(),
        "lr": LogisticRegression(),
        "svm": LinearSVC(),
        "tree": DecisionTreeClassifier(),
        "adaboost": AdaBoostClassifier(),
        "random_forest": RandomForestClassifier(),
        "extra_trees": ExtraTreesClassifier(),
        "lgbm": LGBMClassifier()}

In [30]:
tree_based = {"tree", "adaboost", "random_forest", "extra_trees", "lgbm"}
with_coefs = {"lr", "svm"}

In [31]:
pipeline = Pipeline(steps=[("vectorizer", TfidfVectorizer(min_df=3))])

In [32]:
from tqdm.notebook import tqdm

In [33]:
# accuracies = {clf_name: [] for clf_name in clfs}

# for train_index, test_index in skf.split(X_train, y_train):
#     x_train_fold = []
#     x_test_fold = []
#     y_train_fold = []
#     y_test_fold = []
    
#     for i in train_index:
#         x_train_fold.append(X_train[i])
#         y_train_fold.append(y_train[i])
        
#     for i in test_index:
#         x_test_fold.append(X_train[i])
#         y_test_fold.append(y_train[i])
        
#     x_train_fold = pipeline.fit_transform(x_train_fold, y_train_fold)
#     x_test_fold = pipeline.transform(x_test_fold)
    
#     for clf_name, clf in tqdm(clfs.items()):
#         clf.fit(x_train_fold, y_train_fold)
#         y_pred = clf.predict(x_test_fold)
    
#         accuracies[clf_name].append(accuracy_score(y_test_fold, y_pred))

# print(accuracies)

In [34]:
lr_params = dict(vectorizer__word__min_df=[3, 5, 7], lr__C=[0.1, 0.5, 1], lr__penalty=["l1", "l2"])
svm_params = dict(vectorizer__word__min_df=[3, 5, 7], svm__C=[0.01, 0.05, 0.1, 0.5])
naive_bayes_params = dict(vectorizer__word__min_df=[3, 5, 7], naive_bayes__alpha=[0.01, 0.1, 1])
tree_params = dict(vectorizer__word__min_df=[3, 5, 7], tree__max_depth=[None, 7, 10],
                tree__min_samples_leaf=[1, 10])
adaboost_params = dict(vectorizer__word__min_df=[3, 5, 7], adaboost__base_estimator=[DecisionTreeClassifier(max_depth=1),
                        DecisionTreeClassifier(max_depth=2)])
random_forest_params = dict(vectorizer__word__min_df=[3, 5, 7], random_forest__max_depth=[None, 10],
                       random_forest__min_samples_leaf=[1, 5])
extra_trees_params = dict(vectorizer__word__min_df=[3, 5, 7], extra_trees__max_depth=[None, 15],
                       extra_trees__min_samples_leaf=[1, 15])
lgbm_params = dict(vectorizer__word__min_df=[3, 5, 7], lgbm__min_split_gain=[0, 0.5], lgbm__colsample_bytree=[0.25, 0.5, 1])

In [35]:
results = {"lexical": {}}

In [47]:
clfs = {
#     "naive_bayes": [naive_bayes, naive_bayes_params],
     "lr": [lr, lr_params],
#         "svm": [svm, svm_params],
#         "tree": [tree, tree_params],
#         "adaboost": [adaboost, adaboost_params],
#          "random_forest": [random_forest, random_forest_params],
#          "extra_trees": [extra_trees, extra_trees_params],
#         "lgbm": [lgbm, lgbm_params]
}

NameError: name 'lr' is not defined

In [48]:
def visualize_coefs(estimator, n_top_features, filename):
     
    coef = estimator[-1].coef_
    n_classes = coef.shape[0]
    feature_names = np.array(estimator.named_steps['vectorizer'].get_feature_names())
    print(len(feature_names))
    

    for n in range(n_classes):
        print('class', n)
        coefs = np.argsort(coef[n])
        
        if len(feature_names) > 2 * n_top_features:
            # индексы признаков, получивших cамые большие положительные коэффициенты
            pos_coefs = coefs[-n_top_features:]

            # индексы признаков, получивших самые низкие отрицательные коэффициенты
            neg_coefs = coefs[:n_top_features]

            interesting_coefs = np.hstack([neg_coefs, pos_coefs])

            plt.figure(figsize=(9, 3))
            colors = ["red" if c < 0 else "green" for c in coef[n][interesting_coefs]]
            plt.bar(np.arange(2 * n_top_features), coef[n][interesting_coefs], color=colors)
            plt.xticks(np.arange(2 * n_top_features), feature_names[interesting_coefs], rotation=90, ha="right")
        else:
            colors = ["red" if c < 0 else "green" for c in coef[n][coefs]]
            plt.bar(np.arange(len(feature_names)), coef[n][coefs], color=colors)
            plt.xticks(np.arange(len(feature_names)), feature_names[coefs], rotation=90, ha="right")

        plt.savefig(f'{filename}_class{n}.png', bbox_inches='tight')
        plt.show()

In [49]:
def visualize_feature_importances(estimator, n_top_features, filename):
    coef = estimator[-1].feature_importances_
    feature_names = np.array(estimator.named_steps['vectorizer'].get_feature_names())
    word_importances = pd.Series(coef, index=feature_names).sort_values(ascending=False)[:10]
    word_importances.plot(kind='bar')
    plt.tight_layout()
    plt.savefig(filename, bbox_inches='tight')
    plt.show()
    return feature_names

In [50]:
class PassthroughTransformer(BaseEstimator, TransformerMixin):

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        self.X = X
        return X

    def get_feature_names(self):
        return self.X.columns.tolist()

In [51]:
news_lexical_df.columns

Index(['zipf_freqs_1', 'zipf_freqs_2', 'zipf_freqs_3', 'zipf_freqs_4',
       'zipf_freqs_5', 'zipf_freqs_6', 'A1', 'A2', 'B1', 'B2', 'C1', 'C2',
       'type_token_ratio', 'abstract_nouns', 'concrete_nouns', 'lemmas'],
      dtype='object')

In [52]:
lexical_features = ['zipf_freqs_1', 'zipf_freqs_2', 'zipf_freqs_3', 'zipf_freqs_4',
       'zipf_freqs_5', 'zipf_freqs_6', 'A1', 'A2', 'B1', 'B2', 'C1', 'C2',
       'type_token_ratio', 'abstract_nouns', 'concrete_nouns']

In [53]:
column_trans = ColumnTransformer(
            [('word', vectorizer, 'lemmas'), 
            ('feature', PassthroughTransformer(), [*lexical_features])])

In [54]:
def quality(y_actual, y_pred):
    acc = accuracy_score(y_actual, y_pred)
    f1 = f1_score(y_actual, y_pred, average="macro")
    print(f"Accuracy: {acc:.4f}\nF1 macro: {f1:.4f}")
    
    cm = confusion_matrix(y_actual, y_pred)
    sns.heatmap(cm, annot=True, annot_kws={"size": 16})
    plt.ylabel("True")
    plt.xlabel("Predicted")
    plt.show()

    return acc, f1

In [55]:
for clf_name, clf_params in clfs.items():
    clf, params = clf_params
    if clf_name in tree_based:
        pipeline = Pipeline(steps=[("vectorizer", column_trans), (clf_name, clf)])
    else:
        pipeline = Pipeline(steps=[("vectorizer", column_trans), ("scaler", scaler), (clf_name, clf)])
    grid_search = GridSearchCV(pipeline, param_grid=params,
                           scoring={"F1": "f1_macro", "Accuracy": "accuracy"},
                           refit="F1", return_train_score=True, cv=skf, verbose=10) 
    grid_search.fit(X_train, y_train)
    best_f1_val = round(grid_search.best_score_, 4)
    best_params = grid_search.best_params_
    print(f"Best f1 on validation: {best_f1_val}")
    print("Best parameters:", best_params, "\n")
    estimator = grid_search.best_estimator_
    
    y_pred_test = estimator.predict(X_test)
    acc, f1 = quality(y_test, y_pred_test)
    results["lexical"][clf_name] = {"F1 macro (validation)": best_f1_val,
                                    "Best params": best_params,
                                    "Accuracy (test)": round(acc, 4),
                                    "F1 macro (test)": round(f1, 4)}
        
    if clf_name in tree_based:
        feature_names = visualize_feature_importances(estimator, 10, clf_name)
    elif clf_name in with_coefs:
        visualize_coefs(estimator, 10, clf_name)

TypeError: cannot unpack non-iterable ComplementNB object