In [1]:
import torch
from torch import nn

from torchtext.legacy import data
from torchtext.vocab import Vectors

import spacy
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score

import time
import matplotlib.pyplot as plt

In [2]:
# data files
train_file = "../../data/train.csv"
val_file = "../../data/validation.csv"
test_file = "../../data/test.csv"

# gloVe pretrained
w2v_file = "glove.840B.300d.txt"

# hyperparameters
embed_size = 300
hidden_size = 10
output_size = 4
max_epochs = 30
lr = 0.5
batch_size = 128

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [29]:
# used to convert data to torchtext object
def get_data_train(train_file, val_file):
    train = pd.read_csv(train_file, header=None, names=["quadrant", "lyrics"], skiprows=1)
    valid = pd.read_csv(val_file, header=None, names=["quadrant", "lyrics"], skiprows=1)
    full = pd.concat([train, valid])
    
    return full

def get_data_test(test_file):
    test = pd.read_csv(test_file, header=None, names=["quadrant", "lyrics"], skiprows=1)
    return test

In [30]:
# loads data
# set up iterators for datasets
# create vocabulary and word embeddings
def load_data(w2v_file, train_file, test_file, val_file):
    NLP = spacy.load("en_core_web_sm")
    tokenizer = lambda sent: [x.text for x in NLP.tokenizer(sent) if x.text != " "]
    
    # fields
    TEXT = data.Field(sequential=True, tokenize=tokenizer, lower=True)
    LABEL = data.Field(sequential=False, use_vocab=False)
    datafields = [("quadrant", LABEL), ("lyrics", TEXT)]
    
    # 1. load data from files
    train_data = get_data_train(train_file, val_file)
    train_examples = [data.Example.fromlist(i, datafields) for i in train_data.values.tolist()]
    train_data = data.Dataset(train_examples, datafields)
    
    test_data = get_data_test(test_file)
    test_examples = [data.Example.fromlist(i, datafields) for i in test_data.values.tolist()]
    test_data = data.Dataset(test_examples, datafields)
    
    # get val from train
    train_data, val_data = train_data.split(split_ratio=0.9)
    
    # 2. iterators
    train_iter = data.BucketIterator(dataset=train_data,
                                    batch_size=batch_size,
                                    sort_key=lambda x: len(x.lyrics),
                                    repeat=False, shuffle=True)
    val_iter, test_iter = data.BucketIterator.splits(datasets=(val_data, test_data),
                                                    batch_sizes=(batch_size, batch_size),
                                                    sort_key=lambda x: len(x.lyrics),
                                                    repeat=False, shuffle=False)
    
    print ("Loaded {} training examples".format(len(train_data)))
    print ("Loaded {} test examples".format(len(test_data)))
    print ("Loaded {} validation examples".format(len(val_data)))
    
    # 3. vocabs and embeddings
    TEXT.build_vocab(train_data, vectors=Vectors(w2v_file))
    word_emeddings = TEXT.vocab.vectors
    vocab = TEXT.vocab
    
    return train_iter, val_iter, test_iter, (train_data, val_data, test_data), word_emeddings, vocab

In [None]:
train_iter, val_iter, test_iter, datasets, word_emeddings, vocab = load_data(w2v_file, train_file, test_file, val_file)

Loaded 12685 training examples
Loaded 3273 test examples
Loaded 1409 validation examples


100%|█████████████████████████████▉| 2196016/2196017 [03:50<00:00, 9530.59it/s]


In [37]:
# fastText model
class fastTextNN(nn.Module):
    def __init__(self, vocab_size, word_embeddings):
        super(fastTextNN, self).__init__()
        
        # embedding layer
        self.embeddings = nn.Embedding(vocab_size, embed_size)
        self.embeddings.weight = nn.Parameter(word_embeddings, requires_grad=False)
        # hidden layer & output layer
        self.fc1 = nn.Linear(embed_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        # softmax
        self.softmax = nn.Softmax()
        
    def forward(self, x):
        embedds = self.embeddings(x).permute(1, 0, 2)
        h = self.fc1(embedds.mean(1))
        z = self.fc2(h)
        
        return self.softmax(z)

In [None]:
model = fastTextNN(len(vocab), word_embeddings)
model.to(device)

In [None]:
def train_func(data, data_):

    # Train the model
    train_loss = 0
    train_acc = 0
    for (lyrics, label) in data:
        optimizer.zero_grad()
        label, lyrics = label.to(device), lyrics.to(device)
        output = model(lyrics)
        loss = criterion(output, label)
        train_loss += loss.item()
        loss.backward()
        optimizer.step()
        train_acc += (output.argmax(1) == label).sum().item()

    # Adjust the learning rate
    scheduler.step()

    return train_loss / len(data_), train_acc / len(data_)

def test(data, data_):
    loss = 0
    acc = 0
    for (lyrics, label) in data:
        label, lyrics = label.to(device), lyrics.to(device)
        with torch.no_grad():
            output = model(lyrics)
            loss = criterion(output, label)
            loss += loss.item()
            acc += (output.argmax(1) == label).sum().item()

    return loss / len(data_), acc / len(data_)

In [None]:
N_EPOCHS = 5
min_valid_loss = float('inf')

criterion = torch.nn.NLLLoss().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=4.0)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.9)

train_losses, valid_losses = [], []
train_accs, valid_accs = [], []

for epoch in range(N_EPOCHS):

    start_time = time.time()
    train_loss, train_acc = train_func(train_iter, datasets[0])
    valid_loss, valid_acc = test(val_iter, datasets[1])

    secs = int(time.time() - start_time)
    mins = secs / 60
    secs = secs % 60

    print('Epoch: %d' %(epoch + 1), " | time in %d minutes, %d seconds" %(mins, secs))
    print(f'\tLoss: {train_loss:.4f}(train)\t|\tAcc: {train_acc * 100:.1f}%(train)')
    print(f'\tLoss: {valid_loss:.4f}(valid)\t|\tAcc: {valid_acc * 100:.1f}%(valid)')
    
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    valid_losses.append(valid_loss)
    valid_accs.append(valid_acc)
    

# plots

In [None]:
print('Checking the results of test dataset...')
test_loss, test_acc = test(test_iter, datasts[2])
print(f'\tLoss: {test_loss:.4f}(test)\t|\tAcc: {test_acc * 100:.1f}%(test)')