In [9]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils import check_random_state
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tensorflow.keras.preprocessing.text import Tokenizer

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device.type

'cuda'

In [28]:
# hyperparameter

max_vocab_size = 50000
# Training on 19 words to predict the 20th
sentence_len = 20
pred_len = 1
train_len = sentence_len - pred_len

epochs = 10
batch_size = 64
lr = 0.1


In [11]:
# data retrieval 

with open('processed_texts.csv', 'r', encoding='UTF-8') as file:
    train_data = [line.strip('\n') for line in file]

print('Number of training sentences: ', len(train_data))

max_words = 50000 # Max size of the dictionary
tokenizer = Tokenizer(num_words=max_words)
tokenizer.fit_on_texts(train_data)
sequences = tokenizer.texts_to_sequences(train_data)

# Flatten the list of lists resulting from the tokenization. This will reduce the list
# to one dimension, allowing us to apply the sliding window technique to predict the next word
text = [item for sublist in sequences for item in sublist]
vocab_size = len(tokenizer.word_index)

# Sliding window to generate train data
seq = []
for i in range(len(text)-sentence_len):
    seq.append(text[i:i+sentence_len])

# Reverse dictionary to decode tokenized sequences back to words
reverse_word_map = dict(map(reversed, tokenizer.word_index.items()))

# Each row in seq is a 20 word long window. We append the first 19 words as the input to predict the 20th word
X = []
y = []
for i in seq:
    X.append(i[:train_len])
    y.append(i[-1])

print("vocab size:", vocab_size)

X = np.array(X)
y = np.array(y)

Number of training sentences:  2477
vocab size: 62237


In [29]:
def data_loader(X, y, test_size=.3, train_size=None):
    
    random_state = check_random_state(0)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, train_size=train_size, random_state=random_state)

    # setting type for device
    X_type = torch.cuda.LongTensor if device.type == 'cuda' else torch.LongTensor
    y_type = torch.cuda.LongTensor if device.type == 'cuda' else torch.LongTensor

    # create feature and targets tensor for train set.
    torch_X_train = torch.from_numpy(X_train).type(X_type)
    torch_y_train = torch.from_numpy(y_train).type(y_type)

    # create feature and targets tensor for test set.
    torch_X_test = torch.from_numpy(X_test).type(X_type)
    torch_y_test = torch.from_numpy(y_test).type(y_type)

    # Pytorch train and test sets
    train = torch.utils.data.TensorDataset(torch_X_train, torch_y_train)
    test = torch.utils.data.TensorDataset(torch_X_test, torch_y_test)

    # data loader
    train_loader = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=False)
    test_loader = torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=False)

    return train_loader, test_loader


train_loader, test_loader = data_loader(X, y)

In [30]:
def train(model, loss_f, optimizer, data_loader):
    model.train()
    correct = 0
    batch_losses = 0
    num_batches = len(data_loader) 
    i = 0
    for X_batch, y_batch in data_loader:
        
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
        optimizer.zero_grad()
        pred = model(X_batch)
        loss = loss_f(pred, y_batch)
        loss.backward()
        optimizer.step()

        # Total correct predictions
        predicted = torch.max(pred.data, 1)[1] 
        correct += (predicted == y_batch).sum()
        batch_losses += loss.item()
        # i+=1
        # print("{} of {} batches".format(i, num_batches))
    
    # average loss and accuracies per epoch
    loss = batch_losses  / float(batch_size * num_batches)
    accu = correct * 100 / float(batch_size * num_batches)
    
    return loss, accu


def test(model, loss_f, data_loader):
    model.eval()
    correct = 0
    batch_losses = 0
    num_batches = len(data_loader) 
    i = 0
    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            pred = model(X_batch)
            loss = loss_f(pred, y_batch)
            
            # Total correct predictions
            predicted = torch.max(pred.data, 1)[1] 
            correct += (predicted == y_batch).sum()
            batch_losses += loss.item()
            i+=1
            print("{}\tof\t{}\tbatches".format(i, num_batches))
    
    # average loss and accuracies per epoch
    loss = batch_losses  / float(batch_size * num_batches)
    accu = correct * 100 / float(batch_size * num_batches)
    
    return loss, accu


def fit(model, train_loader, test_loader, optimizer, loss_f):
    
    train_eval = []
    test_eval = []

    for epoch in range(epochs):

        train_loss, train_accu = train(model, loss_f, optimizer, train_loader)
        train_eval.append( (train_loss, train_accu) )
        
        test_loss, test_accu = test(model, loss_f, test_loder)
        test_eval.append( (test_loss, test_accu) )

        print('Epoch: {}'.format(epoch + 1))
        print('Train:  Loss: {:.6f}   Accuracy: {:.2f}%  '.format(train_loss, train_accu))
        print('Test:   Loss: {:.6f}   Accuracy: {:.2f}%\n'.format(test_loss , test_accu ))

    return np.array(train_eval), np.array(test_eval)


def evaluate(model, test_loader, train_eval=None, test_eval=None):
    correct = 0 
    for test_imgs, test_labels in test_loader:
        test_imgs = Variable(test_imgs).float()
        pred = model(test_imgs)
        predicted = torch.max(pred,1)[1]
        correct += (predicted == test_labels).sum()

    test_accu = float(correct*100) / float(batch_size*len(test_loader))
    print("Test accuracy: {:.3f}% ".format( test_accu ))

    if(train_eval is None or test_eval is None): return

    train_losses = train_eval[:, 0]
    train_accus  = train_eval[:, 1]
    test_losses  =  test_eval[:, 0]
    test_accus   =  test_eval[:, 1]

    plt.figure(figsize=(12,4))
    plt.subplot(121)
    plt.plot(train_losses, label="train")
    plt.plot(test_losses, label="test")
    plt.title("evaluation of losses")
    plt.legend()
    plt.subplot(122)
    plt.plot(train_accus, label="train")
    plt.plot(test_accus, label="test")
    plt.title("evaluation of accuracy")
    plt.legend()
    plt.show()


In [26]:
class TextGenModel(nn.Module):
    
    def __init__(self):
        super(TextGenModel, self).__init__()
        self.embed = nn.Embedding(num_embeddings=vocab_size + 1, embedding_dim=50, padding_idx=train_len)
        # self.lstm = nn.LSTM(input_size=50, hidden_size=100, num_layers=2, batch_first=True) #seq2seq
        self.lstm = nn.LSTM(50, 100, 2)
        self.dense1 = nn.Linear(in_features=100, out_features=100)
        self.dense2 = nn.Linear(in_features=100, out_features=vocab_size)
    
    def forward(self, x):
        x = self.embed(x)
        lstm_output, _ = self.lstm(x)  # _ = (h, c)
        x = lstm_output[:,-1,:]  # get last step of output
        x = F.relu( self.dense1(x) )
        x = F.dropout(x, p=0.1)
        x = F.log_softmax( self.dense2(x) )
        return x


model = TextGenModel().to(device)

optimizer = optim.Adam(model.parameters(), lr=lr)
loss_f = nn.CrossEntropyLoss().to(device)

In [31]:
fit(model, train_loader, test_loader, optimizer, loss_f)

KeyboardInterrupt: 

In [43]:
# updating output (unrelated code)

import IPython
out = display(IPython.display.Pretty('Starting'), display_id=True)
time.sleep(1)

for i in range(8):
  out.update(IPython.display.Pretty('Going' + '.' * (i % 3 + 1)))
  time.sleep(0.5)

out.update(IPython.display.Pretty('Done.'))

Done.