In [None]:
import pandas as pd
from tqdm.notebook import tqdm

from tg.grammar_ru.features import PyMorphyFeaturizer

In [None]:
from tg.grammar_ru import Loc

CORPUS_NAMES = [
    "books.base.zip",
    "pub.base.zip",
    "lenta.base.zip"
]
#TODO: add smth else?

CORPUS_LIST = [Loc.corpus_path / corpus_name for corpus_name in CORPUS_NAMES]

In [None]:
from yo_fluq_ds import *
from tg.grammar_ru import Separator
from tg.grammar_ru.corpus.corpus_reader import CorpusReader

In [None]:
viewer = Separator.Viewer()

texts = list(CorpusReader.read_frames_from_several_corpora(CORPUS_LIST)
             .feed(fluq.with_progress_bar(console=None))
             .select(viewer.to_text)
             )

In [None]:
import jsonlines

with jsonlines.open('texts.jsonl', 'w') as write:
    write.write_all(texts)

In [None]:
import jsonlines

with jsonlines.open('texts.jsonl') as read:
    texts2 = [t for t in read]

In [None]:
import itertools

sents = list(itertools.chain.from_iterable(text.splitlines() for text in texts2))

In [None]:
import random

random.shuffle(sents)

In [None]:
len(sents)

In [None]:
from tg.common import DataBundle

db = DataBundle.load('mytest_db.zip')

In [None]:
from tg.grammar_ru.features import PyMorphyFeaturizer

In [None]:
from tg.grammar_ru.features import PyMorphyFeaturizer

db = Separator.build_bundle(sents[:100000], [PyMorphyFeaturizer()])

In [None]:
db.src = db.src.join(db.pymorphy, on='word_id')
db.src.head()

In [None]:
db.pymorphy[db.pymorphy.POS == 'ADJS'].info()

In [None]:
db.pymorphy[db.pymorphy.POS == 'ADJF'].info()

In [None]:
db.adjectives.head()

In [None]:
import numpy as np

WINDOW_SIZE = 7

In [None]:


features = ['POS', 'gender', 'number', 'case', 'animacy', 'aspect', 'transitivity', 'person', 'tense', 'mood', 'voice', 'involvement']

def get_offset_word_feats(word_row, offset) -> dict:
    result = {}
    for feat in features:
        result[f'{feat}_{offset}'] = word_row[feat]
    result[f'OFFSET_{offset}'] = f'OFFSET_{offset}'
    return result

def get_empty_feats(offset) -> dict:
    result = {}
    for feat in features:
        result[f'{feat}_{offset}'] = np.nan
    result[f'OFFSET_{offset}'] = f'OFFSET_{offset}'
    return result


adj_dataset = []

for sent_id, sentence_df in tqdm(db.src.groupby('sentence_id')):
    for idx in sentence_df.index[sentence_df.POS.eq('ADJS') | sentence_df.POS.eq('ADJF')]:
        adj_window_data = {}
        for offset in range(-WINDOW_SIZE, WINDOW_SIZE + 1):
            if idx + offset not in sentence_df.index:
                adj_window_data.update(get_empty_feats(offset))
            else:
                adj_window_data.update(get_offset_word_feats(sentence_df.loc[idx + offset], offset))
        
        adj_dataset.append(adj_window_data)
                

In [None]:
adj_df = pd.DataFrame.from_records(adj_dataset)
adj_df.head()

In [None]:
adj_df = adj_df.fillna('missing')
adj_df.info()

In [None]:
adj_df = adj_df.astype('category')
adj_df.info(memory_usage='deep')

In [None]:
db.data_frames['adjectives'] = adj_df
db.adjectives

In [None]:
db.save_as_zip('mytest_db.zip')

In [None]:
adj_center_predict_features = ['gender_0', 'number_0', 'case_0', 'animacy_0']
adj_center_expect_features = ['POS_0']

In [None]:
from sklearn.model_selection import train_test_split

train_df, test_df = train_test_split(db.adjectives, test_size=0.2, random_state=228)

In [None]:
len(train_df), len(test_df)

In [None]:
input_features = [col for col in db.adjectives.columns if col[-1] != '0' or col in adj_center_expect_features]
label_features = [col for col in db.adjectives.columns if col in adj_center_predict_features]

In [None]:
for label_feat in label_features:
    print(label_feat, db.adjectives[label_feat].value_counts())

In [None]:
from sklearn.preprocessing import OneHotEncoder

ohe = OneHotEncoder(sparse_output=False, handle_unknown='infrequent_if_exist')
ohe.fit(train_df[label_features])

In [None]:
_ = ohe.transform(db.adjectives[label_features])

In [None]:
label_features

In [None]:
NEW = {'ая', 'ого', 'ое', 'ой', 'ом', 'ому',
       'ую', 'ые', 'ый', 'ым', 'ыми', 'ых'}
# NOTE выкинули 'ою'

GOOD = {'ая', 'его', 'ее', 'ей', 'ем', 'ему',
        'ие', 'ий', 'им', 'ими', 'их', 'ую', 'яя', 'юю',
        'ого','ое', 'ой', 'ому', 'ом'} # легкий

BIG = {'ая', 'ие', 'им', 'ими', 'их', 'ого',
       'ое', 'ой', 'ом', 'ому', 'ую',
       'ые', 'ым', 'ыми', 'ых'} # золотой
# NOTE выкинули 'ою'

NEW_list = sorted(list(NEW))
GOOD_list = sorted(list(GOOD))
BIG_list = sorted(list(BIG))
# окончания с повторами. это фича.
ALL_ENDS_list = NEW_list + GOOD_list + BIG_list
POSSIBLE_ENDINGS = set(ALL_ENDS_list)
endings_nums = {e: i for i, e in enumerate(ALL_ENDS_list)}

NEW_num_by_end = {e: i for i, e in enumerate(NEW_list)}
GOOD_num_by_end = {e: i+len(NEW_num_by_end) for i, e in enumerate(GOOD_list)}
BIG_num_by_end = {e: i+len(NEW_num_by_end)+len(GOOD_num_by_end)
                  for i, e in enumerate(BIG_list)}

nums_by_decl_and_end = (
        {('new', e): n for e, n in NEW_num_by_end.items()} |
        {('good', e): n for e, n in GOOD_num_by_end.items()} |
        {('big', e): n for e, n in BIG_num_by_end.items()}
)

In [None]:
import catboost

model = catboost.CatBoostClassifier(
    objective='MultiLogloss',
    iterations=1000,
    custom_metric='F1',
    task_type='GPU'
)

In [None]:
train_data = catboost.Pool(data=train_df[input_features], label=ohe.transform(train_df[label_features]), cat_features=input_features)
test_data = catboost.Pool(data=test_df[input_features], label=ohe.transform(test_df[label_features]), cat_features=input_features)

In [None]:
model.fit(
    train_data,
    eval_set=test_data,
    verbose=50,
)

In [None]:
model.get_feature_importance(prettified=True)[:50]

In [None]:
model.best_score_

In [None]:
import pathlib
import joblib

MODEL_PATH = pathlib.Path('catboost_adjectives.joblib')
OHE_ADJECTIVES_PATH = pathlib.Path('ohe_adjectives.joblib')

In [None]:
model.save_model(MODEL_PATH)
joblib.dump(ohe, OHE_ADJECTIVES_PATH)

In [None]:
import pathlib
import joblib
import pymorphy2
import numpy as np
import pandas as pd
import catboost

features = ['POS', 'gender', 'number', 'case', 'animacy', 'aspect', 'transitivity', 'person', 'tense', 'mood', 'voice', 'involvement']

def get_offset_word_feats(word_row, offset: int) -> dict:
    result = {}
    for feat in features:
        result[f'{feat}_{offset}'] = word_row[feat]
    result[f'OFFSET_{offset}'] = f'OFFSET_{offset}'
    return result

def get_empty_feats(offset: int) -> dict:
    result = {}
    for feat in features:
        result[f'{feat}_{offset}'] = np.nan
    result[f'OFFSET_{offset}'] = f'OFFSET_{offset}'
    return result


def inflect_with_labels(morph, word: str, labels: list) -> str:
    parsed = morph.parse(word)[0]
    return parsed.inflect(set(labels) - {'missing', None}).word
    

class AdjectivesSuggestionsGenerator:
    def __init__(self, model_path: pathlib.Path, one_hot_encoding_path: pathlib.Path, window_size: int = 7):
        self._morph = pymorphy2.MorphAnalyzer(lang='ru')
        self._model = catboost.CatBoostClassifier().load_model(str(model_path))
        self._one_hot_encoding = joblib.load(one_hot_encoding_path)
        self._window_size = window_size

    def get_adjectives_suggestions(self, text: str) -> pd.DataFrame | None:
        text_db = Separator.build_bundle(text, [PyMorphyFeaturizer()])
        text_df = text_db.src.join(text_db.pymorphy, on='word_id')
        
        adj_window_datas = []
        indices = []
        
        for idx in text_df.index[text_df.POS.eq('ADJS') | text_df.POS.eq('ADJF')]:
            adj_window_data = {}
            for offset in range(-self._window_size, self._window_size + 1):
                if idx + offset not in text_df.index:
                    adj_window_data.update(get_empty_feats(offset))
                else:
                    adj_window_data.update(get_offset_word_feats(text_df.loc[idx + offset], offset))

            indices.append(idx)
            adj_window_datas.append(adj_window_data)
        
        if not adj_window_datas:
            return None
        
        inp = pd.DataFrame(adj_window_datas)[input_features]
        inp = inp.fillna('missing')
        pool_inp = catboost.Pool(inp, cat_features=input_features)
        predictions_raw = self._model.predict(pool_inp)
        
        words = text_df.word.loc[indices]
        predicted_labels = self._one_hot_encoding.inverse_transform(predictions_raw)
        inflected_words = [inflect_with_labels(self._morph, word, labels) for word, labels in zip(words, predicted_labels)]
        text_db.src.loc[indices, 'suggestion'] = inflected_words
        return text_db.src

In [None]:
asg = AdjectivesSuggestionsGenerator(MODEL_PATH, OHE_ADJECTIVES_PATH, window_size=WINDOW_SIZE)

In [None]:
def apply_suggestion(word: str, suggestion: str):
    if pd.isna(suggestion):
        return word
    if word.istitle():
        return suggestion.title()
    return suggestion

In [None]:
%%time

# test_text = 'Синий машина ехала по скоростная трассе и врезалась в старенькая столбы на ужасная улице' * 100
test_text = 'цвета морской волны и цвет морской паруса'

df_with_suggestions = asg.get_adjectives_suggestions(test_text)

In [None]:
df_with_suggestions['word'] = df_with_suggestions[['word', 'suggestion']].apply(lambda x: apply_suggestion(*x), axis=1)
Separator.Viewer().to_text(df_with_suggestions)