In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset

In [2]:
from torch.utils.data import DataLoader

In [3]:
import numpy as np

In [4]:
text = ['hey how are you', 'good i am fine', 'have a nice day']

In [5]:
chars = set(''.join(text))

In [6]:
int2char = dict(enumerate(chars))

In [7]:
int2char

{0: 'r',
 1: 'g',
 2: 'i',
 3: 'y',
 4: 'o',
 5: ' ',
 6: 'h',
 7: 'e',
 8: 'f',
 9: 'n',
 10: 'w',
 11: 'a',
 12: 'u',
 13: 'v',
 14: 'm',
 15: 'c',
 16: 'd'}

In [8]:
char2int = {v:k for k,v in int2char.items()}

In [9]:
char2int

{' ': 5,
 'a': 11,
 'c': 15,
 'd': 16,
 'e': 7,
 'f': 8,
 'g': 1,
 'h': 6,
 'i': 2,
 'm': 14,
 'n': 9,
 'o': 4,
 'r': 0,
 'u': 12,
 'v': 13,
 'w': 10,
 'y': 3}

Maxlen

In [10]:
maxlen = len(max(text, key=len))

In [11]:
maxlen

15

Padding

In [12]:
for i in range(len(text)):
    while len(text[i])<maxlen:
        text[i] += " "

input data:
* the last input character should be excluded as it does not need to be fetch into the model

output data:
* one time-step ahead of the input data as this will be the "correct answer" 

In [13]:
input_seq = []
target_seq = []

for i in range(len(text)):
    input_seq.append(text[i][:-1])
    target_seq.append(text[i][1:])

In [14]:
print(input_seq)

['hey how are yo', 'good i am fine', 'have a nice da']


In [15]:
print(target_seq)

['ey how are you', 'ood i am fine ', 'ave a nice day']


Sentence Embeddings

In [16]:
for i in range(len(text)):
    input_seq[i] = [char2int[char] for char in input_seq[i]]
    target_seq[i] = [char2int[char] for char in target_seq[i]]

conversion into one hot vectors

* dict_size: the number of unique characters in the corpus
    * this will determine the one-hot vector size as each character will have an assigned index in that vector
* seq_len: the length of the sequences that we are feeding into the model
    * standardised the length of our vectors
* batch_size: number of sentences that will be feed to the model in one go

In [17]:
dict_size = len(char2int)
seq_len = maxlen-1
batch_size = len(text)

def one_hot_encoder(sequence, dict_size, seq_len, batch_size):
    features = np.zeros((batch_size, seq_len, dict_size), dtype=np.float32)

    for i in range(batch_size):
        for u in range(seq_len):
            features[i, u, sequence[i][u]] = 1
    return features

In [18]:
input_seq = one_hot_encoder(input_seq, dict_size, seq_len, batch_size)

In [19]:
input_seq.shape

(3, 14, 17)

In [20]:
input_seq = torch.from_numpy(input_seq)
target_seq = torch.Tensor(target_seq)

to GPU

In [22]:
is_cuda = torch.cuda.is_available()

if is_cuda:
    device = torch.device("cuda")
    print('GPU is available')
else:
    device = torch.device("cpu")
    print("GPU not available")

GPU not available


Model

In [25]:
class Model(nn.Module):
    def __init__(self, input_size, output_size, hidden_dim, n_layers):
        super(Model, self).__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers

        self.rnn = nn.RNN(input_size, hidden_dim, n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_size)

    def forward(self, x):
        batch_size = x.size(0)
        hidden = self.init_hidden(batch_size)
        out, hidden = self.rnn(x, hidden)
        out = out.contiguous().view(-1, self.hidden_dim)
        out = self.fc(out)

        return out, hidden
    
    def init_hidden(self, batch_size):
        hidden = torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device)
        return hidden

In [26]:
model = Model(input_size=dict_size, output_size=dict_size, hidden_dim=12, n_layers=1)
model = model.to(device)

In [27]:
epochs = 100
lr = 0.01

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr)

In [28]:
input_seq = input_seq.to(device)
for epoch in range(1, epochs+1):
    optimizer.zero_grad()
    output, hidden = model(input_seq)
    output = output.to(device)
    target_seq = target_seq.to(device)
    loss = criterion(output, target_seq.view(-1).long())
    loss.backward()
    optimizer.step()

    if epoch%10==0:
        print(f"Epoch {epoch}/{epochs}.............Loss: {loss.item():.4f}")

Epoch 10/100.............Loss: 2.4802
Epoch 20/100.............Loss: 2.0834
Epoch 30/100.............Loss: 1.6323
Epoch 40/100.............Loss: 1.2232
Epoch 50/100.............Loss: 0.8812
Epoch 60/100.............Loss: 0.6016
Epoch 70/100.............Loss: 0.4078
Epoch 80/100.............Loss: 0.2807
Epoch 90/100.............Loss: 0.2016
Epoch 100/100.............Loss: 0.1550


In [29]:
def predict(model, character):
    character = np.array([[char2int[c] for c in character]])
    character = one_hot_encoder(character, dict_size, character.shape[1],1)
    character = torch.from_numpy(character)
    character = character.to(device)

    out, hidden = model(character)

    prob = F.softmax(out[-1], dim=0).data
    char_ind = torch.max(prob, dim=0)[1].item()
    return int2char[char_ind], hidden

In [30]:
def sample(model, out_len, start='hey'):
    model.eval()
    start = start.lower()
    chars = [ch for ch in start]
    size = out_len - len(chars)

    for i in range(size):
        char, h = predict(model, chars)
        chars.append(char)

    return "".join(chars)

In [32]:
sample(model, 15,'hey')

'hey how are you'

In [None]:
class MyDataset(Dataset):
    def __init__(self, input, seq_len):
        self.input = input
        self.seq_len = seq_len
    def __getitem__(self, item):
        return input[item:item + self.seq_len], input[item + self.seq_len]
    def __len__(self):
        return len(self.input) - self.seq_len

In [None]:
input = np.arange(1,8).reshape(-1, 1)

In [None]:
input = torch.tensor(input, dtype=torch.float)

In [None]:
ds = MyDataset(input, 3)

In [None]:
dl = DataLoader(ds, batch_size=2)
for inp, label in dl:
    print(inp.numpy())

[[[1.]
  [2.]
  [3.]]

 [[2.]
  [3.]
  [4.]]]
[[[3.]
  [4.]
  [5.]]

 [[4.]
  [5.]
  [6.]]]
