# BiLSTM

Import pandas + numpy and load train + test data

In [0]:
import pandas as pd
import numpy as np

data = pd.read_csv('Train_set.csv', encoding='latin1')

# Change de test sets

test = pd.read_csv('Test_set.csv')
# test = pd.read_csv('Zero-shot.csv')
data = data.append(test)

In [0]:
%tensorflow_version 1.x

In [0]:
from numpy.random import seed
seed(1)
from tensorflow import set_random_seed
set_random_seed(1)

Make a list of all words in the train and test set

In [0]:
words = list(set(data["Word"].values))
words.append("ENDPAD")
test_words = list(set(test["Word"].values))
test_words.append("ENDPAD")

In [0]:
test_n_words = len(test_words)
n_words = len(words)

In [0]:
tags = list(set(data["Tag"].values))
test_tags = list(set(test['Tag'].values))

In [0]:
test_n_tags = len(test_tags)
n_tags = len(tags)

Sentence Getter

In [0]:
class SentenceGetter(object):
    
    def __init__(self, data):
        self.n_sent = 1
        self.data = data
        self.empty = False
        agg_func = lambda s: [(w, p, t) for w, p, t in zip(s["Word"].values.tolist(),
                                                           s["POS"].values.tolist(),
                                                           s["Tag"].values.tolist())]
        self.grouped = self.data.groupby("Sentence #").apply(agg_func)
        self.sentences = [s for s in self.grouped]
    
    def get_next(self):
        try:
            s = self.grouped["Sentence: {}".format(self.n_sent)]
            self.n_sent += 1
            return s
        except:
            return None

In [0]:
getter = SentenceGetter(data)
test_getter = SentenceGetter(test)

In [0]:
sent = getter.get_next()
test_sent = test_getter.get_next()

In [0]:
sentences = getter.sentences
test_sentences = test_getter.sentences

Enumerate and pad sentences

In [0]:
max_len = 75
word2idx = {w: i for i, w in enumerate(words)}
tag2idx = {t: i for i, t in enumerate(tags)}
test_word2idx = {w: i for i, w in enumerate(test_words)}
test_tag2idx = {t: i for i, t in enumerate(test_tags)}

In [0]:
from keras.preprocessing.sequence import pad_sequences
X = [[word2idx[w[0]] for w in s] for s in sentences]
test_X = [[test_word2idx[w[0]] for w in s] for s in test_sentences]

In [0]:
X = pad_sequences(maxlen=max_len, sequences=X, padding="post", value=n_words - 1)
test_X = pad_sequences(maxlen=max_len, sequences=test_X, padding="post", value=test_n_words - 1)

In [0]:
y = [[tag2idx[w[2]] for w in s] for s in sentences]
test_y = [[test_tag2idx[w[2]] for w in s] for s in test_sentences]

In [0]:
y = pad_sequences(maxlen=max_len, sequences=y, padding="post", value=tag2idx["O"])
test_y = pad_sequences(maxlen=max_len, sequences=test_y, padding="post", value=test_tag2idx["O"])

Transform to categorical

In [0]:
from keras.utils import to_categorical

In [0]:
y = [to_categorical(i, num_classes=n_tags) for i in y]
test_y = [to_categorical(i, num_classes=n_tags) for i in test_y]

Define Train and Test set

In [0]:
X_tr = X[:len(X)-len(test_X)]
X_te = X[len(X)-len(test_X):]
y_tr = y[:len(X)-len(test_X)]
y_te = y[len(X)-len(test_X):]

In [0]:
# Code for if the train or test sets need to be shuffled

# train set
# import random
# random.seed(23)
# c = list(zip(X_tr, y_tr))
# random.shuffle(c)
# X_tr, y_tr = zip(*c)

# test set
# import random
# random.seed(23)
# c = list(zip(X_te, y_te))
# random.shuffle(c)
# X_te, y_te = zip(*c)

The model

In [0]:
from keras.models import Model, Input
from keras.layers import LSTM, Embedding, Dense, TimeDistributed, Dropout, Bidirectional

In [0]:
input = Input(shape=(max_len,))
model = Embedding(input_dim=n_words, output_dim=50, input_length=max_len)(input)
model = Dropout(0.1)(model)
model = Bidirectional(LSTM(units=100, return_sequences=True, recurrent_dropout=0.1))(model)
out = TimeDistributed(Dense(n_tags, activation="softmax"))(model)  # softmax output layer

In [0]:
model = Model(input, out)

In [0]:
model.compile(optimizer="rmsprop", loss="categorical_crossentropy", metrics=["accuracy"])

In [0]:
history = model.fit(np.array(X_tr), np.array(y_tr), batch_size=32, epochs=5, 
                    validation_split=0.1, verbose=1)

In [0]:
test_pred = model.predict(np.array(X_te), verbose=1)

In [0]:
idx2tag = {i: w for w, i in test_tag2idx.items()}

def pred2label(pred):
    out = []
    for pred_i in pred:
        out_i = []
        for p in pred_i:
            p_i = np.argmax(p)
            out_i.append(idx2tag[p_i].replace("PAD", "O"))
        out.append(out_i)
    return out
    
pred_labels = pred2label(test_pred)
test_labels = pred2label(y_te)

Evaluation

In [0]:
!pip install seqeval

In [0]:
from nervaluate import Evaluator
from seqeval.metrics import precision_score, recall_score, f1_score, classification_report

In [0]:
print(classification_report(test_labels, pred_labels))

In [0]:
evaluator = Evaluator(test_labels, pred_labels, tags= [''], loader='list')

In [0]:
results, results_per_tag = evaluator.evaluate()

In [0]:
results