In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import itertools
from typing import List

In [None]:
seq_len = 30
batch_size = 128
train_test_split = 0.9
nr_articles = 350  # how many articles should be loaded from the dataset
nr_source_lines = 130000  # how many lines of source code should be loaded (python source code dataset)

#### Some helper methods

In [None]:
def get_one_hot_encoded(char):
    one_hot = torch.zeros(len(chars))
    one_hot[chars2idx[char]] = 1
    return one_hot

def one_hot_encoded_to_char(one_hot):
    idx = int((one_hot == 1).nonzero()[0][0])
    return idx2chars[idx]

In [None]:
def get_train_test_split(data: str, train_test_split: float, print_summary=True):
    train_text = in_text[:int(train_test_split*len(in_text))]
    test_text = in_text[int(train_test_split*len(in_text)):]
    print(f'Train size: {len(train_text)} characters')
    print(f'Test size: {len(test_text)} characters')
    if print_summary:
        print(f'Read {nr_articles} articles with a total of {len(in_text)} characters.')
    return train_text, test_text

#### German news dataset

In [None]:
in_text = []
with open('data/Ten_Thousand_German_News_Articles/train.csv') as in_file:
    for line in in_file.readlines()[:nr_articles]:
        article = list(itertools.chain(line.split(';')[1:]))
        in_text += article
    in_text = "".join(in_text)
train_text, test_text = get_train_test_split(in_text, train_test_split)

#### English news dataset (Huffington Post)

In [None]:
news_data = pd.read_csv('data/english_news/articles1.csv', engine='python', error_bad_lines=False, encoding='utf-8', nrows=nr_articles)
in_text = news_data['content'].str.cat(sep=' ')
train_text, test_text = get_train_test_split(in_text, train_test_split)

#### Python source code

In [None]:
in_text = []
with open('data/python_code/python.txt', encoding='utf-8') as in_file:
    for line in in_file.readlines()[:nr_source_lines]:
        in_text += line
    in_text = "".join(in_text)
train_text, test_text = get_train_test_split(in_text, train_test_split, print_summary=False)

In [None]:
train_text_in = train_text[:len(train_text) - (len(train_text)%seq_len)-1]
train_text_out = train_text[1:len(train_text) - (len(train_text)%seq_len)]
test_text_in = test_text[:len(test_text) - (len(test_text)%seq_len)-1]
test_text_out = test_text[1:len(test_text) - (len(test_text)%seq_len)]
print(f'len of train_text_in {len(train_text_in)}')
print(f'len of train_text_out {len(train_text_out)}')
print(f'len of test_text_in {len(test_text_in)}')
print(f'len of test_text_out {len(test_text_out)}')
chars = set(train_text + test_text)
nr_chars = len(chars)
print(f'nr. of unique chars: {nr_chars}')
idx2chars = {}
chars2idx = {}
for i, char in enumerate(chars):
    idx2chars[i] = char
    chars2idx[char] = i

train_text_encoded_in = torch.zeros((len(train_text_in), len(chars)))
train_text_encoded_out = torch.zeros((len(train_text_out), len(chars)))
test_text_encoded_in = torch.zeros((len(test_text_in), len(chars)))
test_text_encoded_out = torch.zeros((len(test_text_out), len(chars)))
print(train_text_encoded_in.shape)
print(train_text_encoded_out.shape)
print(test_text_encoded_in.shape)
print(test_text_encoded_out.shape)
for i, char in enumerate(train_text_in):
    train_text_encoded_in[i][chars2idx[char]] = 1

for i, char in enumerate(train_text_out):
    train_text_encoded_out[i][chars2idx[char]] = 1

for i, char in enumerate(test_text_in):
    test_text_encoded_in[i][chars2idx[char]] = 1

for i, char in enumerate(test_text_out):
    test_text_encoded_out[i][chars2idx[char]] = 1

# print(one_hot_encoded_to_char(train_text_encoded_in[0]))
# print(one_hot_encoded_to_char(train_text_encoded_out[0]))
# print(train_text_encoded_in[0])
# print(one_hot_encoded_to_char(test_text_encoded_in[0]))
# print(one_hot_encoded_to_char(test_text_encoded_out[0]))

In [None]:
print(train_text[:50])
print(test_text[:50])

In [None]:
get_one_hot_encoded('A')

In [None]:
print(train_text_encoded_in.shape)
print(test_text_encoded_in.shape)

In [None]:
nr_batches_train = int(np.floor(train_text_encoded_in.shape[0] / seq_len / batch_size))
nr_samples_train = nr_batches_train * batch_size * seq_len
nr_batches_test = int(np.floor(test_text_encoded_in.shape[0] / seq_len / batch_size))
nr_samples_test = nr_batches_test * batch_size * seq_len

train_text_encoded_in = train_text_encoded_in[:nr_samples_train].reshape((nr_batches_train, batch_size, seq_len, nr_chars))
train_text_encoded_out = train_text_encoded_out[:nr_samples_train].reshape((nr_batches_train, batch_size, seq_len, nr_chars))
test_text_encoded_in = test_text_encoded_in[:nr_samples_test].reshape((nr_batches_test, batch_size, seq_len, nr_chars))
test_text_encoded_out = test_text_encoded_out[:nr_samples_test].reshape((nr_batches_test, batch_size, seq_len, nr_chars))

print(one_hot_encoded_to_char(train_text_encoded_in[0][0][0]))
print(one_hot_encoded_to_char(train_text_encoded_out[0][0][0]))
print(one_hot_encoded_to_char(test_text_encoded_in[0][0][0]))
print(one_hot_encoded_to_char(test_text_encoded_out[0][0][0]))

In [None]:
print(train_text_encoded_in.shape)
print(test_text_encoded_in.shape)

**LSTM input shape:** input and output tensors are provided as (batch, seq, feature)

Shape: (batch, seq, feature)

In [None]:
class LSTMLanguageModel(nn.Module):

    def __init__(self, hidden_dim, vocab_size, batch_size):
        super(LSTMLanguageModel, self).__init__()
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.hidden_dim = hidden_dim
        self.nb_lstm_layers = 1
        self.batch_size = batch_size
        # The LSTM takes word embeddings as inputs, and outputs hidden states
        # with dimensionality hidden_dim.
        self.lstm = nn.LSTM(input_size=vocab_size, hidden_size=hidden_dim, num_layers=1, batch_first=True)

        # The linear layer that maps from hidden state space to character space
        self.hidden2char = nn.Linear(hidden_dim, vocab_size, bias=True)

    def forward(self, sequence):
        lstm_out, hidden = self.lstm(sequence)
        char_pred = self.hidden2char(lstm_out)
        return char_pred, hidden

    def init_hidden(self):
        # the weights are of the form (nb_layers, batch_size, hidden_dim)
        hidden_a = torch.randn(self.nb_lstm_layers, self.batch_size, self.hidden_dim)
        hidden_b = torch.randn(self.nb_lstm_layers, self.batch_size, self.hidden_dim)

        hidden_a = hidden_a.to(self.device)
        hidden_b = hidden_b.to(self.device)

        hidden_a = Variable(hidden_a)
        hidden_b = Variable(hidden_b)

        return (hidden_a, hidden_b)

In [None]:
def generate(model):
    test_text = "import numpy "  # edit if necessary

    test_text_encoded = torch.zeros((1, seq_len, len(chars)))
    start_enumeration = max(seq_len-len(test_text), 0)
    for i, c in enumerate(test_text, start=start_enumeration):
        test_text_encoded[0][i][chars2idx[c]] = 1
    
    def get_most_probable_char_from_one_hot(one_hot):
        idx = one_hot.argmax().item()
        return idx2chars[idx]

    nr_of_chars_to_generate = 1000
    with torch.no_grad():
        model.eval()
        model.init_hidden()
        test_text_encoded = test_text_encoded.to(device)
        pred, hidden = model(test_text_encoded)
        print(test_text, end='')
        cur_text_encoded = test_text_encoded.to(device)
        for i in range(nr_of_chars_to_generate):
            pred, hidden = model(cur_text_encoded)
            # print(test_text_encoded)
            print(get_most_probable_char_from_one_hot(pred[0][-1]), end='')
            pred_one_hot = torch.zeros(1, len(chars)).to(device)
            pred_one_hot[0][pred[0][-1].argmax().item()] = 1
            test_text_encoded[0] = torch.cat((test_text_encoded[0][1:], pred_one_hot))

In [None]:
hidden_dim = 768
model = LSTMLanguageModel(hidden_dim=hidden_dim, vocab_size=len(chars), batch_size=batch_size)
loss_fn = nn.CrossEntropyLoss()
print(model)
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_mem = []
loss_per_epoch = []
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
model.to(device)
generate_text_after_epochs = [1, 3, 5, 8, 10, 12, 15, 20, 25, 30, 35, 40]
nr_epochs = 40
print_loss_every_n_batches = 50
for epoch in range(1, nr_epochs+1):
    model.train()
    model.init_hidden()
    print(f'epoch {epoch}...')
    running_loss = 0.0
    i = 1
    losses_cur_epoch = []
    for sequence_in, sequence_out in zip(train_text_encoded_in, train_text_encoded_out):
        optimizer.zero_grad()
        model.zero_grad()
        sequence_in = sequence_in.to(device)
        sequence_out = sequence_out.to(device)
        char_predictions, hidden = model(sequence_in)
        loss = 0.0
        for batch_pred, batch_ground in zip(char_predictions.squeeze(), sequence_out.argmax(dim=2).squeeze()):
            loss += loss_fn(batch_pred, batch_ground)

        loss.backward()
        optimizer.step()
        cur_loss = loss.item()/batch_size
        loss_mem.append(cur_loss)
        losses_cur_epoch.append(cur_loss)
        running_loss += loss.item()
        if i % print_loss_every_n_batches == print_loss_every_n_batches-1:    # print every n mini-batches
            progress = 100*(i/len(train_text_encoded_in))
            print(f'[{epoch}, {progress:.2f}%] loss: {running_loss / print_loss_every_n_batches / batch_size}')
            running_loss = 0.0
        i += 1
    loss_per_epoch.append(np.average(losses_cur_epoch))
    if epoch in generate_text_after_epochs:
        print(f'\n\n text generation after epoch {epoch} \n')
        generate(model)
        print('\n\n\n')

In [None]:
window_size = 50  # the bigger the window size, the smoother the loss curve
loss_mem_np = np.copy(loss_mem[:(len(loss_mem) // window_size)*window_size])
rest = np.copy(loss_mem[(len(loss_mem) // window_size)*window_size:])

loss_mov_avg = np.zeros((len(loss_mem_np) // window_size))
for i in range(0, len(loss_mem_np), window_size):
    loss_mov_avg[i//window_size] = np.average(loss_mem_np[i:i+window_size])
loss_mov_avg = np.concatenate((loss_mov_avg, [np.average(rest)]))
plt.figure(figsize=(10,6))
plt.plot(loss_mov_avg);
print(f'losses per epoch: {loss_per_epoch}')

In [None]:
generate(model)