In [2]:
import torch
from torch import nn, optim
import numpy as np

In [8]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_dim, output_size, n_layers):
        super(RNN,self).__init__()
            
        self.hidden = hidden_dim
        
        self.rnn = nn.RNN(input_size, hidden_dim, n_layers, batch_first=True)
            
        self.fc = nn.Linear(hidden_dim, output_size)
            
    def forward(self, x, hidden):
        r_out, hidden = self.rnn(x, hidden)
        r_out = r_out.view(-1, self.hidden)
        output = self.fc(r_out)
        
        return output, hidden

In [16]:
model = RNN(input_size=1,hidden_dim=32, output_size=1, n_layers=1)

seq_length = 20

# generate evenly spaced, test data pts
time_steps = np.linspace(0, np.pi, seq_length)
data = np.sin(time_steps)
data.resize((seq_length, 1))

test_input = torch.Tensor(data).unsqueeze(0) # give it a batch_size of 1 as first dimension
print('Input size: ', test_input.size())

# test out rnn sizes
test_out, test_h = model(test_input, None)
print('Output size: ', test_out.size())
print('Hidden state size: ', test_h.size())

Input size:  torch.Size([1, 20, 1])
Output size:  torch.Size([20, 1])
Hidden state size:  torch.Size([1, 1, 32])


In [15]:
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

In [17]:
n_steps = 30
hidden = None
for batch_i, step in enumerate(range(n_steps)):
    # defining the training data 
    time_steps = np.linspace(step * np.pi, (step+1)*np.pi, seq_length + 1)
    data = np.sin(time_steps)
    data.resize((seq_length + 1, 1)) # input_size=1

    x = data[:-1]
    y = data[1:]
        
    # convert data into Tensors
    x_tensor = torch.Tensor(x).unsqueeze(0) # unsqueeze gives a 1, batch_size dimension
    y_tensor = torch.Tensor(y)
    
    optimizer.zero_grad()
    
    pred, hidden = model(x_tensor, hidden)
    
    hidden = hidden.data
    
    loss = criterion(pred, y_tensor)
    loss.backward()
    optimizer.step()
    
    print("Training loss: ", loss.item())

Training loss:  0.5435460209846497
Training loss:  0.4270239770412445
Training loss:  0.5434108376502991
Training loss:  0.4270240366458893
Training loss:  0.5434108376502991
Training loss:  0.4270239770412445
Training loss:  0.5434108376502991
Training loss:  0.4270240366458893
Training loss:  0.5434108376502991
Training loss:  0.4270239770412445
Training loss:  0.5434108376502991
Training loss:  0.4270240366458893
Training loss:  0.5434108376502991
Training loss:  0.4270239770412445
Training loss:  0.5434108376502991
Training loss:  0.4270239770412445
Training loss:  0.5434108376502991
Training loss:  0.4270239770412445
Training loss:  0.5434108376502991
Training loss:  0.4270240366458893
Training loss:  0.5434108376502991
Training loss:  0.4270240366458893
Training loss:  0.5434108376502991
Training loss:  0.4270239770412445
Training loss:  0.5434108376502991
Training loss:  0.4270240366458893
Training loss:  0.5434108376502991
Training loss:  0.4270239770412445
Training loss:  0.54

In [5]:
class LSTM(nn.Module):
    def __init__(self, tokens, embedding_dim, hidden_dim, output_size, n_layers, drop_prob):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        
        self.chars = tokens
        int2char = dict(enumerate(self.chars))
        char2int = {ch:i for i,ch in int2char.items()}
        
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, batch_first=True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc = nn.Linear(hidden_dim, len(self.chars))
        
    def forward(self, x, hidden):
        embeds = self.embedding(x)
        lstm_out, hidden = self.lstm(embeds, hidden)
        
        lstm_out = lstm_out[:,-1,:]
        lstm_out = self.dropout(lstm_out)
        lstm_out = lstm_out.contiguous.view(-1, self.hidden_dim)
        output = self.fc(lstm_out)
        
        return output, hidden
    
    def init_hidden(self, batch_size):
        weights = next(self.parameters()).data
        hidden = (weights.new(self.n_layers, batch_size, self.hidden_dim).zero_(), 
                  weights.new(self.n_layers, batch_size, self.hidden_dim).zero_())
        
        return hidden

In [None]:
model = LSTM(chars, hidden_dim=32, n_layers=2)

In [None]:
for i in range(epochs):
    
    hidden = model.init_hidden(batch_size)
    
    val_idx = len(data)*val_split
    train_data, val_data = data[val_idx:], data[:val_idx]
    
    model.train()
    
    for x, y in get_batches(train_data, batch_size, seq_length):
        one_hot = one_hot_encode(x, len(model.chars))
        inputs, labels = torch.from_numpy(one_hot), torch.from_numpy(y)
        
        hidden = tuple([each.data for each in hidden])
        
        optimizer.zero_grad()
        
        output, hidden = model(inputs, hidden)
        
        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()
        
        val_hidden = model.init_hidden(batch_size)
        val_losses = []
        model.eval()
        
        for x, y in get_batches(val_data, batch_size, seq_length):
            one_hot = one_hot_encode(x, len(model.chars))
            inputs, labels = torch.from_numpy(one_hot), torch.from_numpy(y)
            
            val_hidden = tuple([each.data for each in val_hidden])
            
            optimizer.zero_grad()
            
            output, val_hidden = model(inputs, val_hidden)

            loss = criterion(output, labels)

            print("Validation loss: ", loss.item())

In [None]:
def predict(model, char, h, topk=None):
    
    model.eval()
    
    x = np.array(model.char2int[char])
    x = one_hot_encode(x, len(model.chars))
    inputs = torch.from_numpy(x)
    
    h = tuple([each.data for each in h])
    
    out, h = model(inputs, h)
    
    prob = F.softmax(out, dim=1).data
    
    p, ch = prob.topk(topk)
    ch = ch.numpy().squeeze()
    p = p.numpy().squeeze()
    
    ch = np.random.choice(ch,p=p/p.sum())
    
    return int2char[ch], h

In [None]:
def sample(model, size, prime, topk=None):
    
    chars = [ch for ch in prime]
    h = mode.init_hidden(1)
    for ch in prime:
        ch, h = predict(model, ch, h, topk)
    chars.append(ch)
    
    for i in range(size):
        ch, h = predict(model, chars[-1], topk)
        chars.append(ch)
        
    return "".join(chars)