In [1]:
import numpy as np
import numpy.random as random
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import keras

Using TensorFlow backend.


In [2]:
!pip install pymorphy2
!pip install pymorphy2-dicts
!pip install DAWG

Collecting pymorphy2
  Using cached https://files.pythonhosted.org/packages/a3/33/fff9675c68b5f6c63ec8c6e6ff57827dda28a1fa5b2c2d727dffff92dd47/pymorphy2-0.8-py2.py3-none-any.whl
Collecting dawg-python>=0.7 (from pymorphy2)
  Using cached https://files.pythonhosted.org/packages/6a/84/ff1ce2071d4c650ec85745766c0047ccc3b5036f1d03559fd46bb38b5eeb/DAWG_Python-0.7.2-py2.py3-none-any.whl
Collecting pymorphy2-dicts<3.0,>=2.4 (from pymorphy2)
  Using cached https://files.pythonhosted.org/packages/02/51/2465fd4f72328ab50877b54777764d928da8cb15b74e2680fc1bd8cb3173/pymorphy2_dicts-2.4.393442.3710985-py2.py3-none-any.whl
Collecting docopt>=0.6 (from pymorphy2)
Installing collected packages: dawg-python, pymorphy2-dicts, docopt, pymorphy2
[31mCould not install packages due to an EnvironmentError: [Errno 13] Permission denied: '/usr/local/lib/python2.7/dist-packages/DAWG_Python-0.7.2.dist-info'
Consider using the `--user` option or check the permissions.
[0m
Collecting pymorphy2-dicts
  Using cache

In [3]:
import pymorphy2
morph = pymorphy2.MorphAnalyzer()

In [4]:
#from google.colab import drive
#drive.mount('/gdrive')
#GPREFIX = '/gdrive/My Drive/antivulg/'
GPREFIX = ''

In [5]:
import string

punct_marks = tuple('.,!?;"\'')
english = tuple(['ENG']) + tuple(string.ascii_lowercase)
digits = tuple(['DIG']) + tuple(string.digits)
russian = tuple('абвгдеёжзийклмнопрстуфхцчшщъыьэюя')
real_alphabet = russian + tuple(' -.*')
alphabet = real_alphabet + tuple(['NULL', 'DIG', 'ENG'])

alpha2idx = {c: idx for idx, c in enumerate(alphabet)}
null_idx = alpha2idx['NULL']

reduction = {c: c for c in alphabet}
for lst in punct_marks, english, digits:
    for c in lst:
        reduction[c] = lst[0]
        if c.lower():
            reduction[c.upper()] = lst[0]
for c in russian:
    reduction[c.upper()] = c

def enc(c):
    return alpha2idx[reduction.get(c, '*')]

In [6]:
import re

book = []
for name in 'vim.txt', 'gul.txt':
    with open(GPREFIX + name) as f:
        book.extend(re.split(r'\W+', f.read()))

supervised = []
with open(GPREFIX + 'supervised.txt') as f:
    for idx, line in enumerate(f):
        if idx % 2 == 0:
            text = line[:-1]
        else:
            vulg = np.array(list(map(int, line[:-1])))
            supervised.append((text, vulg))
supervised_text = []
supervised_vulg = []
for text, vulg in supervised:
    for occ in re.finditer(r'\w+', text):
        left, right = occ.span()
        supervised_text.append(text[left:right])
        supervised_vulg.append(vulg[left:right])

norm_words = []
vulg_words = []
words = pd.read_csv(GPREFIX + 'train_words.csv')
for row in words.itertuples():
    if row.label == 0:
        norm_words.append(row.word)
    else:
        vulg_words.append(row.word)

In [7]:
def mutate_word(word):
    mutations = morph.parse(word)[0].lexeme
    return mutations[random.randint(len(mutations))].word

def do_char_repl(word, prob):
    new_word = []
    for c in word:
        if random.uniform() < prob:
            new_word.append(random.choice(real_alphabet))
        else:
            new_word.append(c)
    return ''.join(new_word)

def do_char_swap(word):
    new_word = []
    idx = 0
    while idx < len(word):
        if idx + 1 < len(word) and random.uniform() < 1 / len(word):
            new_word.append(word[idx + 1])
            new_word.append(word[idx])
            idx += 2
        else:
            new_word.append(word[idx])
            idx += 1
    return ''.join(new_word)

def get_punct(full_stop_prob=0.9, max_len=3):
    if random.uniform() < full_stop_prob:
        return '.' * random.randint(1, max_len + 1)
    return '-'

def generate_trash():
    word = []
    for _ in range(random.randint(1, 6)):
        mode = random.choice(['ENG', 'DIG', 'RUS', '.'])
        if mode == 'ENG':
            c = random.choice(english[1:])
        elif mode == 'DIG':
            c = random.choice(digits[1:])
        elif mode == 'RUS':
            c = random.choice(russian)
        else:
            c = random.choice(punct_marks)
        word.extend([c] * random.randint(1, 4))
    return ''.join(word)

def generate_text(min_words=20, max_words=40, do_mutate_word=True, book_prob=0.5, norm_prob=0.1,
                  vulg_prob=0.1, trash_prob=0.1, space_prob=0.75, punct_prob=0.2, char_repl_prob=0.05):
    num_words = random.randint(min_words, max_words + 1)
    if random.uniform() < book_prob:
        source = book
    else:
        source = supervised_text
    source_ptr = random.randint(len(source) - num_words + 1)
    text = []
    is_vulg = []
    
    def append(token, cur_vulg):
        if isinstance(cur_vulg, bool):
            is_vulg.extend([1 if cur_vulg else 0] * len(token))
        else:
            if len(token) != len(cur_vulg):
                if sum(cur_vulg) == 0:
                    cur_vulg = np.zeros(len(token))
                elif sum(cur_vulg) == len(cur_vulg):
                    cur_vulg = np.ones(len(token))
                else:
                    return
            is_vulg.extend(cur_vulg)
        text.append(token)
    
    while len(text) < num_words:
        word_outcome = random.uniform()
        if word_outcome < norm_prob:
            word = random.choice(norm_words)
            cur_vulg = False
        elif word_outcome < norm_prob + vulg_prob:
            word = random.choice(vulg_words)
            cur_vulg = True
        elif word_outcome < norm_prob + vulg_prob + trash_prob:
            word = generate_trash()
            cur_vulg = False
        else:
            word = source[source_ptr]
            if source is book:
                cur_vulg = False
            else:
                cur_vulg = supervised_vulg[source_ptr]
            source_ptr += 1

        if do_mutate_word:
            word = mutate_word(word)
        word = do_char_repl(word, char_repl_prob)
        #word = do_char_swap(word)
        append(word, cur_vulg)
        
        sep_outcome = random.uniform()
        if sep_outcome < space_prob:
            append(' ', False)
        elif sep_outcome < space_prob + punct_prob:
            append(get_punct(), False)
            
    return ''.join(text), np.array(is_vulg)

In [8]:
Y_NULL = 2

def encode_text(text):
    encoding = []
    for c in text:
        encoding.append(enc(c))
    return np.array(encoding)

def encode_batch(batch, with_y=True):

    def first_or_whole(x):
        if isinstance(x, (list, tuple)):
            return x[0]
        return x
    
    max_len = max(len(first_or_whole(sample)) for sample in batch)
    X = np.full([len(batch), max_len], null_idx)
    if with_y:
        y = np.full_like(X, Y_NULL)
    for idx, sample in enumerate(batch):
        sample_len = len(first_or_whole(sample))
        X[idx, :sample_len] = encode_text(first_or_whole(sample))
        if with_y:
            y[idx, :sample_len] = sample[1]
    if with_y:
        return X, y
    return X

In [9]:
def custom_loss_function(y_true, y_pred):
    EPS = 1e-10
    weight = tf.cast(tf.not_equal(y_true, Y_NULL), dtype=tf.float32)
    loss = -weight * (y_true * tf.log(y_pred + EPS) + (1 - y_true) * tf.log(1 - y_pred + EPS))
    return tf.reduce_mean(loss)

In [10]:
def build_model(lr=1e-2, emb_size=8, hid_size=256, kernel_size=15):
    input_layer = keras.layers.Input(shape=[None])
    
    pad_size = (kernel_size - 1) // 2
    pad_input_layer = keras.layers.Lambda(lambda x: tf.pad(x, paddings=[[0, 0], [pad_size, pad_size]],
                                                           constant_values=null_idx))(input_layer)
    
    pad_embedding_layer = keras.layers.Embedding(input_dim=len(alphabet),
                                                 output_dim=emb_size)(pad_input_layer)
    
    #pad_lstm_layer = keras.layers.Bidirectional(keras.layers.CuDNNLSTM(units=hid_size // 2,
    #                                            return_sequences=True))(pad_embedding_layer)
    pad_lstm_layer = keras.layers.Bidirectional(keras.layers.LSTM(units=hid_size // 2,
                                                return_sequences=True))(pad_embedding_layer)
    lstm_layer = keras.layers.Lambda(lambda x: x[:, pad_size:-pad_size])(pad_lstm_layer)
    
    pad_conv_layer = keras.layers.Conv1D(filters=2 * emb_size, kernel_size=kernel_size,
                                         padding='same')(pad_embedding_layer)
    conv_layer = keras.layers.Lambda(lambda x: x[:, pad_size:-pad_size])(pad_conv_layer)
    
    concat_layer = keras.layers.Concatenate(axis=-1)([lstm_layer, conv_layer])
    batch_layer = keras.layers.BatchNormalization()(concat_layer)
    
    dense_layer = keras.layers.TimeDistributed(
        keras.layers.Dense(units=hid_size, activation='elu'))(batch_layer)
    
    output_layer = keras.layers.TimeDistributed(
        keras.layers.Dense(units=1, activation='sigmoid'))(dense_layer)
    squeezed_output_layer = keras.layers.Lambda(lambda x: tf.squeeze(
        x, axis=-1))(output_layer)
    
    model = keras.models.Model(inputs=[input_layer], outputs=[squeezed_output_layer])
    model.compile(optimizer=keras.optimizers.Adam(lr=lr), loss=custom_loss_function, metrics=[])
    return model

def generator(batch_size, **kwargs):
    while True:
        batch = [generate_text(**kwargs) for _ in range(batch_size)]
        yield encode_batch(batch)

In [75]:
model = build_model()
model.fit_generator(generator(batch_size=32), steps_per_epoch=100, epochs=50)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.callbacks.History at 0x7f9cfadcf3c8>

In [76]:
model.compile(optimizer=keras.optimizers.Adam(lr=1e-3), loss=custom_loss_function, metrics=[])
model.fit_generator(generator(batch_size=32), steps_per_epoch=100, epochs=20)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x7f9cf8e4fe10>

In [77]:
from keras.models import load_model

model.save(GPREFIX + 'model_final.h5')
#model = load_model(GPREFIX + 'model.h5',
#                   custom_objects={'tf': tf, 'custom_loss_function': custom_loss_function})

In [78]:
val = []
with open(GPREFIX + 'validation.txt') as f:
    for idx, line in enumerate(f):
        if idx % 2 == 0:
            text = line[:-1]
        else:
            vulg = np.array(list(map(int, line[:-1])))
            val.append((text, vulg))

In [79]:
val_enc, val_true = encode_batch(val)
val_pred = model.predict(val_enc)

In [97]:
from sklearn.metrics import f1_score

THRESHOLD = 0.75
val_cls = (val_pred >= THRESHOLD).astype(np.int)

print('Char-level F1:', f1_score(val_true[val_true != Y_NULL], val_cls[val_true != Y_NULL]))

tok_true = []
tok_cls = []
for (sent, true), pred in zip(val, val_pred):
    for occ in re.finditer(r'\w+', sent):
        left, right = occ.span()
        tok_true.append(1 if true[left:right].mean() >= 0.5 else 0)
        tok_cls.append(1 if pred[left:right].mean() >= THRESHOLD else 0)

print('Token-level F1:', f1_score(tok_true, tok_cls))

Char-level F1: 0.8194703468854905
Token-level F1: 0.8421052631578948


In [98]:
from IPython.display import HTML, display_html

test = [e[0] for e in val[1::10]]
test_enc = encode_batch(test, with_y=False)
test_pred = model.predict(test_enc)

def get_raw_html(text, pred):
    cmap = plt.get_cmap("Reds")

    def get_color_hex(p):
        rgba = cmap(p, bytes=True)
        return '#%02X%02X%02X' % rgba[:3]

    raw_html = []
    template = '<span style="background-color: {color_hex}">{token}</span>'
    for c, p in zip(text, pred):
        raw_html.append(template.format(color_hex=get_color_hex(p), token=c))
    return ''.join(raw_html)

for text, pred in zip(test, test_pred):
    display_html(HTML(get_raw_html(text, pred)))