In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from sklearn.metrics import f1_score

In [2]:
# get data
import pandas as pd

train = pd.read_csv('train.csv')
valid = pd.read_csv('valid.csv')
test = pd.read_csv('test.csv')

In [3]:
# tokenizer
from tokenizers import Tokenizer
from tokenizers import ByteLevelBPETokenizer
from tokenizers.pre_tokenizers import Whitespace

tokenizer = ByteLevelBPETokenizer()
tokenizer.pre_tokenizer = Whitespace()
tokenizer.enable_padding(pad_id=0, pad_token='<pad>')

In [4]:
texts_path = 'texts.txt'

with open(texts_path, 'w') as f:
    for text in list(train['text'].values):
        f.write("%s\n" % text)

In [5]:
tokenizer.train(
    files=[texts_path],
    vocab_size=5_000,
    min_frequency=2,
    special_tokens=['<pad>', '<unk>']
    )

In [6]:
class CustomDataset(Dataset):

    def __init__(self, tokens, labels, max_len):
        self.tokens = tokens
        self.labels = labels
        self.max_len = max_len


    def __len__(self):
        return len(self.tokens)


    def __getitem__(self, idx):
        label = self.labels[idx]
        label = torch.tensor(label)
        tokens = self.tokens[idx]
        out = torch.zeros(self.max_len, dtype=torch.long)
        out[:len(tokens)] = torch.tensor(tokens, dtype=torch.long)[:self.max_len]
        return out, label

In [7]:
max_len = 64
BATCH_SIZE = 16

In [8]:
train_labels = list(train['label'])
train_tokens = [tokenizer.encode(str(text)).ids for text in list(train['text'])]
train_dataset = CustomDataset(train_tokens, train_labels, max_len)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [9]:
test_labels = list(test['label'])
test_tokens = [tokenizer.encode(str(text)).ids for text in list(test['text'])]
test_dataset = CustomDataset(test_tokens, test_labels, max_len)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [10]:
class LSTM_classifier(nn.Module):


    def __init__(self, hidden_dim=128, vocab_size=5000, embedding_dim=300, linear_dim=128, dropout=0.3, n_classes=2):
        super().__init__()
        self.embedding_layer = nn.Embedding(vocab_size, embedding_dim)
        self.lstm_layer = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.dropout_layer = nn.Dropout(dropout)        
        self.fc_layer = nn.Linear(hidden_dim * 2, linear_dim)
        self.batchnorm = nn.BatchNorm1d(linear_dim)
        self.relu = nn.ReLU()
        self.out_layer = nn.Linear(linear_dim, n_classes)


    def forward(self, inputs):
        batch_size = inputs.size(0)
        embeddings = self.embedding_layer(inputs)
        lstm_out, (ht, ct) = self.lstm_layer(embeddings)
        out = ht.transpose(0, 1)
        out = out.reshape(batch_size, -1)
        out = self.fc_layer(out)
        out = self.batchnorm(out)
        out = self.relu(out)
        out = self.dropout_layer(out)
        out = self.out_layer(out)
        out = torch.squeeze(out, 1)
        out = torch.sigmoid(out)
        return out

In [11]:
def init_weights(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_uniform_(m.weight)
        m.bias.data.fill_(0.01)

In [12]:
def eval_nn(model, data_loader):
    predicted = []
    labels = []
    model.eval()
    with torch.no_grad():
        for data in data_loader:
            x, y = data
            x = x.to(device)
            outputs = model(x)
            _, predict = torch.max(outputs.data, 1)
            predict = predict.cpu().detach().numpy().tolist()
            predicted += predict
            labels += y
        score = f1_score(labels, predicted, average='binary')
    return score

In [13]:
def train_nn(model, optimizer, loss_function, train_loader, test_loader, device, epochs=20):
    best_score = 0
    for epoch in range(epochs):
        model.train()
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            predict = model(inputs)
            loss = loss_function(predict, labels)
            loss.backward()
            optimizer.step()
        score = eval_nn(model, test_loader)
        print(epoch, 'valid:', score)
        if score > best_score:
            torch.save(model.state_dict(), 'lstm.pt')
            best_score = score
    return best_score

In [14]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [15]:
model = LSTM_classifier(hidden_dim=256, vocab_size=5000, embedding_dim=300, linear_dim=128, dropout=0.1)   

In [16]:
model.apply(init_weights);

In [17]:
model.to(device);

In [18]:
optimizer = optim.AdamW(model.parameters())

In [19]:
loss_function = nn.CrossEntropyLoss().to(device)

In [20]:
train_nn(model, optimizer, loss_function, train_loader, test_loader, device, epochs=20)

0 valid: 0.5928917609046851
1 valid: 0.6776859504132231
2 valid: 0.7710184552289816
3 valid: 0.7767918088737201
4 valid: 0.7840112201963534
5 valid: 0.7737430167597765
6 valid: 0.7612809315866085
7 valid: 0.7653589933382681
8 valid: 0.7385826771653544
9 valid: 0.7675753228120515
10 valid: 0.771631205673759
11 valid: 0.7884872824631861
12 valid: 0.7671840354767183
13 valid: 0.7829131652661064
14 valid: 0.7801516195727084
15 valid: 0.7709978463747308
16 valid: 0.7720861172976986
17 valid: 0.7810218978102189
18 valid: 0.7833219412166781
19 valid: 0.7702127659574468


0.7884872824631861

In [21]:
model = LSTM_classifier(hidden_dim=256, vocab_size=5000, embedding_dim=300, linear_dim=128, dropout=0.1)
model.load_state_dict(torch.load('lstm.pt'));

In [22]:
score = eval_nn(model, test_loader)

In [23]:
score

0.7884872824631861