# PyTorch RNN experiments

In [1]:
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torch.optim as optim

import numpy as np
import matplotlib.pyplot as plt

% pylab inline

Populating the interactive namespace from numpy and matplotlib


## simple RNN

In [2]:
class RNN(nn.Module):
    
    def __init__(self, data_size, hidden_size, output_size):
        # inherit the parent's initialization without knowing parent's name
        super().__init__()
        # add itself new features
        self.hidden_size = hidden_size
        input_size = data_size + hidden_size
        
        # define nn functions or transform matrix
        # here is the example given by pytorch website. 
        # However, I think it only includes two linear transform matrix in a RNN cell
        # and it is not sufficient. A non-linear activation function should be added.
        # A rnn cell should look like: h = tanh(W_i2h * input), output = W_h2o * h.
        self.i2h = nn.Linear(input_size, hidden_size)
        self.h2o = nn.Linear(hidden_size, output_size)
        
    def forward(self, data, last_hidden):
        # last_hidden implys the hidden state from last iteration
        indata = torch.cat([data, last_hidden], 1)
        hidden = self.i2h(indata)
        output = self.h2o(hidden)
        
        return hidden, output
    
rnn = RNN(50, 20, 10)

In [3]:
loss_fn = nn.MSELoss()

batch_size = 10
time_steps = 5

# Create some fake data for testing
batch = Variable(torch.randn(batch_size, 50))
hidden = Variable(torch.zeros(batch_size, 20))
target = Variable(torch.zeros(batch_size, 10))

loss = 0.0
for t in range(time_steps):
    hidden, output = rnn(batch, hidden)
    loss += loss_fn(output, target)
    
# backward
loss.backward()

In [4]:
print(loss)

Variable containing:
 0.6244
[torch.FloatTensor of size 1]



## lstm

Pytorch provide LSTMCell. Also it provide other RNN cell, like GRUCell. Also it provides layer-level modules like nn.RNN, nn.LSTM, nn.LSTM. However, for later social-LSTM development, here use LSTMCell as example. Also here LSTM network is used to implement XOR function.

Useful Link:
   * RNN module and functions: http://pytorch.org/docs/master/nn.html#lstmcell
   * optimizers: http://pytorch.org/docs/master/optim.html
   * loss: http://pytorch.org/docs/master/nn.html#crossentropyloss

In [29]:
class LSTM(nn.Module):
    def __init__(self, data_size, hidden_size, output_size):
        super().__init__()
        # the output of LSTMCell is h and c.
        self.lstm = nn.LSTMCell(data_size, hidden_size)
        # define a [output_size, hidden_size] linear mapping matrix h2o.
        # transform h into o [batch_size, output_size] by h2o.
        self.h2o = nn.Linear(hidden_size, output_size)

    def forward(self, indata, h, c, time_steps):
        """
        define forward function. With different time_steps, 
        we can generate prediction sequence of different length.
        
        Inputs:
        - indata: [batch_size, time_steps, data_size]
        - h: [batch_size, hidden_size]
        - c: [batch_size, hidden_size]
        - time_steps: for Backpropagation through time(BPTT). Make it a finite-layers network.
        """
        output = []
        for i in range(time_steps):
            h, c = self.lstm(indata[:,i,:], (h, c))
            output.append(self.h2o(h))
        # stack o together along dim=1 and get the final output with shape [batch_size, time_steps, output_size].
        return torch.stack(output, 1)
        

In [3]:
def xor(num_samples):
    # randomly generate 12-bits sequence
    data = ["{0:012b}".format(np.random.randint(0,2**12-1)) for i in range(num_samples)]
    shuffle(data)
    data = [list(map(int,i)) for i in data]
    data = np.array(data)
    data = data.reshape(num_samples,12,1)
 
    output = np.zeros([num_samples,12],dtype=np.int)
    for sample,out in zip(data,output):
        count = 0
        for c,bit in enumerate(sample):
            if bit[0]==1:
                count += 1
            out[c] = 1 - int(count%2==0)
    return data, output

In [84]:
data_size = 1
hidden_size = 64 # use 64 lstm cells
output_size = 2

net = LSTM(data_size, hidden_size, output_size)

time_steps = 12
batch_size = 48

# optim
optimizer = optim.Adam(net.parameters(), lr = 0.1)

# loss function
# inputs: input(N,C), target(N,) with int values from 0 to C-1 
loss_fn = nn.CrossEntropyLoss()
#loss_fn = nn.MSELoss() 

# train network
for e in range(100):
    
    # randomly generate a batch of data
    # data: [batch_size, times_step, data_size]
    # target: [batch_size, times_step]
    data, target = xor(batch_size)
    data = Variable(torch.from_numpy(data).float())
    target = Variable(torch.from_numpy(target))
    
    # in every epoch, we need to reassign these Variables.
    # hidden: hidden state in LSTM, cell: cell state in LSTM.
    hidden = Variable(torch.zeros(batch_size, hidden_size))
    cell = Variable(torch.zeros(batch_size, hidden_size))
    
    output = net(data, hidden, cell, time_steps)
    
    # here we need a for loop. And it seems that nn.CrossEntropyLoss only allows
    # 2-dimensional input like (N,C). Also we can use tensor.view() to reshape tensor first.
    # But I haven't tested the latter method yet.
    loss = 0.0
    for i in range(batch_size):
        loss += loss_fn(output[i], target[i])
    
    # torch.max returns max values and indices along a specified dimension.
    _, prediction = torch.max(output, -1)
    acc = prediction.eq(target).float().sum() / (batch_size*time_steps)
    
    if (e+1)%10 == 0:
        print("epoch: {}, loss: {:4f}, acc: {:4f}.".format(e+1, loss.data.numpy()[0], acc.data.numpy()[0]))
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

epoch: 10, loss: 33.288563, acc: 0.520833.
epoch: 20, loss: 33.064865, acc: 0.517361.
epoch: 30, loss: 32.018353, acc: 0.565972.
epoch: 40, loss: 27.318220, acc: 0.626736.
epoch: 50, loss: 3.587850, acc: 1.000000.
epoch: 60, loss: 0.384672, acc: 1.000000.
epoch: 70, loss: 0.091834, acc: 1.000000.
epoch: 80, loss: 0.043020, acc: 1.000000.
epoch: 90, loss: 0.032155, acc: 1.000000.
epoch: 100, loss: 0.026772, acc: 1.000000.


In [88]:
# test phase
batch_size = 100
data, target = xor(batch_size)
data = Variable(torch.from_numpy(data).float())
target = Variable(torch.from_numpy(target))

# in every epoch, we need to reassign these Variables.
# hidden: hidden state in LSTM, cell: cell state in LSTM.
hidden = Variable(torch.zeros(batch_size, hidden_size))
cell = Variable(torch.zeros(batch_size, hidden_size))

output = net(data, hidden, cell, time_steps)

_, prediction = torch.max(output, -1)
test_acc = prediction.eq(target).float().sum() / (batch_size*time_steps)
print("test accuracy: {}".format(test_acc.data.numpy()[0]))

test accuracy: 1.0
