In [1]:
from dask import dataframe as ddf
from dask.distributed import Client, wait, LocalCluster
from dask import delayed, compute
from dask_ml.wrappers import ParallelPostFit

import nltk
from nltk.stem import LancasterStemmer

from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

import string
import re
import numpy as np
import pandas as pd

import gc
import ctypes

from os.path import isfile
import lzma
import pickle

from keras.preprocessing.text import Tokenizer, one_hot
from keras.preprocessing.sequence import pad_sequences
from keras.layers import Normalization, Dense, LSTM, Embedding, Dropout
from keras.models import Sequential, load_model
from tensorflow.keras.callbacks import LearningRateScheduler, ModelCheckpoint
from tensorflow.keras.optimizers import Adam

import tensorflow as tf

from IPython.display import clear_output

**Порядок исследования**

Изучая LSTM, я узнал о том, что на смену LSTM уже пришли трансформеры, а на смену им Attention.<br>
Поэтому в этой работе я решил не столько изучить LSTM (а точнее рекурентные сети как таковые), сколько с ими познакомиться и покрутить в руках NLP.<br>
Но поскольку перепостить чужой код скучно, я нашел себе приключение.
1. Познакомиться, наконец, с Dask.
2. Проверить результаты работы <a href="https://arxiv.org/abs/1903.07288#:~:text=Since%20LSTMs%20and%20CNNs%20take,comes%20to%20performance%20and%20accuracies.">EFFECTS OF PADDING ON LSTMS AND CNNS</a><br>
В работе утверждается, что есть существенное влияние на точность модели от применения pre или post padding.

In [2]:
gc.enable()

Я буду, само-собой, использовать локальный кластер. Хотя нет особой проблемы подключиться к тому же GCP.

In [3]:
cluster = LocalCluster(memory_limit='24GB')
client = Client(cluster)
cluster

distributed.diskutils - INFO - Found stale lock file and directory '/home/doska/Hillel_ML_HW/Homework15/dask-worker-space/worker-r1rtddc8', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/doska/Hillel_ML_HW/Homework15/dask-worker-space/worker-gmwfbcqz', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/doska/Hillel_ML_HW/Homework15/dask-worker-space/worker-dzqdixoa', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/doska/Hillel_ML_HW/Homework15/dask-worker-space/worker-uxqft4by', purging


Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…

Не знаю, свойство ли это локального кластера исключительно, но на nix системах имеется баг - не освобождается вовремя память. <br>
Функция ниже взята из официальной документации как раз для устранения этой проблемы.

In [4]:
def trim_memory() -> int:
     libc = ctypes.CDLL("libc.so.6")
     return libc.malloc_trim(0)

def clear_memory():
     temp = client.run(trim_memory)
     temp = client.run(gc.collect)
     gc.collect()

In [5]:
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('stopwords')

[nltk_data] Downloading package punkt to /home/doska/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /home/doska/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to /home/doska/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

Это глупая строчка, на самом деле. Какой смысл использовать Dask там где датасет помещается в память? Никакого, но я просто пробую инструмент.

In [6]:
src_df = ddf.read_csv(urlpath='./data/trip-advisor-hotel-reviews.zip', compression='zip', blocksize=None).repartition(npartitions=32)
src_df = client.persist(src_df)
wait_result = wait(src_df)

In [7]:
src_df.groupby(by='Rating').count().compute()

Unnamed: 0_level_0,Review
Rating,Unnamed: 1_level_1
1,1421
2,1793
3,2184
4,6039
5,9054


In [8]:
def proceed_string(text=None):
    step_1 = re.sub("<.*?>", " ", text)
    step_2 = step_1.translate(str.maketrans("", "", string.punctuation))
    step_3 = nltk.word_tokenize(step_2)
    step_4 = [LancasterStemmer().stem(x) for x in step_3]
    return " ".join(step_4)

Здесь я раскидываю выполнение функции на всех workers. Получается быстрее.

In [9]:
X = None

if isfile("./data/X.xz"):
    with lzma.open("./data/X.xz", "rb") as m_file:
        X = pickle.load(m_file)
else:
    futures = client.map(proceed_string, src_df['Review'])
    X = client.gather(futures)

    del futures  
    with lzma.open("./data/X.xz", "wb") as m_file:
        pickle.dump(X, m_file)


y = src_df.compute()['Rating'].tolist()

del src_df, wait_result

In [10]:
clear_memory()

У Dask есть замечательная возможность объявить тяжолую фичу тяжелой. То есть не загружать её целиком в память, а пользовать по кусочкам по мере надобности.

In [11]:
y_ = [0 if x <=3 else 1 for x in y]

y_binary_mem = client.scatter(y_)
y_multi_mem = client.scatter(y)

In [12]:
@delayed
def train_logistic_regression(X_in, y_in):
    X_train, X_test, y_train, y_test = train_test_split(X_in, y_in, shuffle=True, random_state=42, stratify=y_in)

    clf = ParallelPostFit(estimator=LogisticRegression(max_iter=2000, random_state=42, n_jobs=1), scoring='accuracy')
    
    # Можно было бы сделать так. Тогда sklearn использовал бы dask parallel backend.

    # import joblib
    
    # clf = LogisticRegression(max_iter=2000, random_state=42, n_jobs=-1)

    # with joblib.parallel_backend('dask'):
    #     clf.fit(X_train, y_train)
    
    clf.fit(X_train, y_train)

    return accuracy_score(y_test, clf.predict(X_test)) * 100.0

Идея кода ниже следующая. Самые тяжолые задачи я закидываю на кластер с самым высоким приоритетом. <br>
Пока они будут считаться, на свободных workers будут считаться более легкие.<br>
Вобще это интересный подход - оптимизировать очередь выполнения. И эффективный.

In [13]:
def compare_processors():
    results = []
    ngram_values = [3, 2, 1]

    X_ = client.scatter(X)

    for i in ngram_values:
        count_vectorizer = CountVectorizer(stop_words='english', ngram_range=(1, i), lowercase=False)  
        X_mem = client.submit(count_vectorizer.fit_transform, X_)
        results.append(['CountVectorizer', 'binary', i, client.submit(train_logistic_regression, X_mem, y_binary_mem, priority=(i+2))])
        results.append(['CountVectorizer', 'multi', i, client.submit(train_logistic_regression, X_mem, y_multi_mem, priority=(i+3))])

        tfid_vectorizer = TfidfVectorizer(stop_words='english', ngram_range=(1, i), lowercase=False)
        X_mem = client.submit(tfid_vectorizer.fit_transform, X_)
        results.append(['TfidfVectorizer', 'binary', i, client.submit(train_logistic_regression, X_mem, y_binary_mem, priority=i)])
        results.append(['TfidfVectorizer', 'multi', i, client.submit(train_logistic_regression, X_mem, y_multi_mem, priority=(i+1))])

    mb = MultiLabelBinarizer(sparse_output=False)
    X_mem = client.submit(mb.fit_transform, X_)
    results.append(['MultiLabelBinarizer', 'binary', i, client.submit(train_logistic_regression, X_mem, y_binary_mem)])
    results.append(['MultiLabelBinarizer', 'multi', i, client.submit(train_logistic_regression, X_mem, y_multi_mem)])

    results = client.gather(results)
    results = compute(results)

    return results

In [14]:
results = []
if isfile("./data/compare.xz"):
    with lzma.open("./data/compare.xz", "rb") as m_file:
        results = pickle.load(m_file)
else:
    results = compare_processors()
    clear_memory()
            
    with lzma.open("./data/compare.xz", "wb") as m_file:
        pickle.dump(results, m_file)


Dask скушал все эти невменяемые таблицы (даже без sparse_output=True) и не вспотел. По крайней мере ничего не вылетело, лимит памяти не был привышен. Очень полезное свойство!

In [15]:
results = results[0]

In [16]:
a = pd.DataFrame(columns=['Processor', 'Classification', 'n-gram', 'Score'])
a['Processor'] = [x[0] for x in results]
a['Classification'] = [x[1] for x in results]
a['n-gram'] = [x[2] for x in results]
a['Score'] = [x[3] for x in results]

a.groupby(by=['Processor', 'Classification', 'n-gram']).max()

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Score
Processor,Classification,n-gram,Unnamed: 3_level_1
CountVectorizer,binary,1,88.131954
CountVectorizer,binary,2,89.283623
CountVectorizer,binary,3,89.400742
CountVectorizer,multi,1,57.622487
CountVectorizer,multi,2,60.628538
CountVectorizer,multi,3,61.214132
MultiLabelBinarizer,binary,1,73.648253
MultiLabelBinarizer,multi,1,43.607261
TfidfVectorizer,binary,1,89.361702
TfidfVectorizer,binary,2,87.292602


Довольно интересно, что TfidfVectorizer от использования ngrams существенно теряет в точности. 

In [17]:
del y_binary_mem, y_multi_mem
clear_memory()

Дальше будет стандартный пайплайн для LSTM. Токенизация, padding, embedding.

In [18]:
review_lengts = np.array([len(x) for x in [y.split() for y in X]])

pd.Series(review_lengts).describe()

count    20491.000000
mean       104.377239
std        100.665153
min          7.000000
25%         48.000000
50%         77.000000
75%        124.000000
max       1931.000000
dtype: float64

In [19]:
keras_tokinizer = Tokenizer(lower=False)
keras_tokinizer.fit_on_texts(X)

In [20]:
EMBEDDING_DIM = 16
VOCAB_SIZE = len(list(keras_tokinizer.word_index)) + 1
NUM_EPOCHS = 16
BATCH_SIZE = 64
PAD_LENGTH = int(np.array(review_lengts).mean())
INITIAL_LR = 0.01

In [21]:
X_nn = keras_tokinizer.texts_to_sequences(X)

In [22]:
y_binary = np.array(y_)
y_multi = np.array(pd.get_dummies(y).values)

In [23]:
def lr_exp_decay(epoch, lr):
    k = 0.1
    return INITIAL_LR * np.exp(-k * epoch)

def save_callback(model_name='model'):

    model_fn = './data/models/' + model_name + '-best_model.hdf5'

    checkpoint = ModelCheckpoint(filepath=model_fn,
                                 monitor='val_loss',
                                 verbose=1,
                                 save_best_only=True,
                                 mode='min'
                                 )
    return checkpoint

early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, mode='min', verbose=1)

adam_optimizer = Adam(learning_rate=INITIAL_LR)

In [28]:
def train_model(padding_in='pre', truncating_in='pre', model_name='model', clf_type='binary'):
    history_fn = "./data/models/" + model_name + "-history.csv"
    model_fn = './data/models/' + model_name + '-best_model.hdf5'
    
    history = {}
    model = None

    if isfile(history_fn) and isfile(model_fn):
        history = pd.read_csv(filepath_or_buffer = history_fn).to_dict()
        model = load_model(model_fn)
        return model, history

    X_ = pad_sequences(sequences=X_nn, maxlen=PAD_LENGTH, padding=padding_in, truncating=truncating_in)
    
    if clf_type == 'binary':
        model = Sequential(
            [
                Embedding(input_dim=VOCAB_SIZE, output_dim=EMBEDDING_DIM, mask_zero=True),
                LSTM(32),
                Dense(64, activation='relu'),
                Dropout(0.5),
                Dense(1, activation='sigmoid')
            ]
        )

        model.compile(optimizer=adam_optimizer, loss='binary_crossentropy', metrics=['accuracy'])

        Y_ = y_binary
        
    elif clf_type == 'multi':
        model = Sequential(
            [
                Embedding(input_dim=VOCAB_SIZE, output_dim=EMBEDDING_DIM, mask_zero=True),
                LSTM(32),
                Dense(64, activation='relu'),
                Dropout(0.5),
                Dense(5, activation='softmax')
            ]
        )

        model.compile(optimizer=adam_optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

        Y_ = y_multi
    else:
        return None, None

    history = model.fit(
        x=X_, 
        y=Y_,
        validation_split=0.2,
        batch_size=BATCH_SIZE,
        epochs=NUM_EPOCHS,
        callbacks=[early_stop, save_callback(model_name), LearningRateScheduler(lr_exp_decay, verbose=1)],
        verbose=0
    )

    pd.DataFrame.from_dict(history.history).to_csv(history_fn, index=False)

    return model, history.history



In [25]:
histories = {}
models = {}

In [29]:
paddings = ['pre', 'post']
truncatings = ['pre', 'post']
clf_types = ['binary', 'multi']

for current_clf_type in clf_types:
    for current_padding in paddings:
        for current_truncating in truncatings:
            current_model_name = current_clf_type + '_' + current_padding + '_' + current_truncating
            print("==================== {} ====================".format(current_model_name))
            
            a, b = train_model(padding_in=current_padding, truncating_in=current_truncating, model_name=current_model_name, clf_type=current_clf_type)

            models[current_model_name] = a
            histories[current_model_name] = b

            clear_output(wait=True)



In [30]:
for key in histories.keys():
    print("{}:  \t{:.5f}".format(key, max(histories[key]['val_accuracy'].values())))

binary_pre_pre:  	0.87314
binary_pre_post:  	0.89217
binary_post_pre:  	0.88143
binary_post_post:  	0.87875
multi_pre_pre:  	0.57258
multi_pre_post:  	0.59210
multi_post_pre:  	0.56648
multi_post_post:  	0.60015


Вот! Умные люди оказались правы - разница есть и ничего себе какая! <br>
Любопытно, что для мультиклассовой и бинарной классификации результаты не однаковые. <br>
Тут я могу вынести несколько суждений:

1. Наиболее значимое в отзыве идет в начале предложения. Вероятно мы (люди) имеем свойство излагать суть в начале, а потом её разъясняем. 
2. Для определения нюансов оценки вероятно важна именно центральная часть написанного.

Нужно будет глубже капнуть на эту тему у психологов - в будущем точно пригодится.

**Заключение**

NLP очень большая и сложная тема. Не могу сказать, что я с ней познакомился в этой работе. Так, немного в замочную скважину посмотрел.<br>
Но тем не менее, очивидным является следующее. NLP очень контексто зависим. При первом же рассмотрении становится понятным, что обрабатывать текст в разных сообществах придется сильно по-разному. Тут, понятное дело, от задачи нужно исходить. Но в голове этот момент держать нужно.

Dask хорош. Как и всякий сложный инструмент его нужно уметь использовать, но точно его стоит изучить детально.