# 1. RNN

## RNN Basics

In [None]:
import torch

In [None]:
seq = torch.arange(1., 16.)

print(type(seq))
print(seq)
print(seq.size())

In [None]:
# Number of previous data points to be taken in account
seq_length = 5
batch_size = len(seq) // seq_length
# Number of features
input_size = 1

In [None]:
X = seq.view(batch_size, seq_length, input_size)

print(X.size())

In [None]:
import torch.nn as nn

In [None]:
# Number of features in hidden state
hidden_size = 10
# Number of RNN layers stacked
num_layers = 1

In [None]:
singleRNN = nn.RNN(
    input_size=input_size,
    hidden_size=hidden_size,
    num_layers=num_layers,
    nonlinearity='tanh',
    batch_first=True,
    dropout=0,
    bidirectional=False
)

In [None]:
y, h = singleRNN(X)

print(y.size())    # (batch_size, seq_length, hidden_size * num_directions)
print(h.size())    # (num_layers * num_directions, batch_size, hidden_size)

## Image Classification with RNN

In [None]:
import torchvision
import torchvision.transforms as transforms

In [None]:
transform = transforms.Compose([
    transforms.ToTensor()
])

trainset = torchvision.datasets.MNIST(root='./mnist', train=True, download=True, transform=transform)
testset = torchvision.datasets.MNIST(root='./mnist', train=False, transform=transform)

In [None]:
batch_size = 1000
num_workers = 0

trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

In [None]:
class ImageRNN(nn.Module):
    def __init__(self, batch_size, seq_length, input_size, hidden_size, num_layers, num_classes):
        super().__init__()
        self.batch_size = batch_size
        self.seq_length = seq_length
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.num_classes = num_classes
        
        self.rnn = nn.RNN(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
        self.fc = nn.Linear(self.hidden_size * self.seq_length, self.num_classes)

    def forward(self, x, h0):
        x = x.view(-1, 28, 28)    # (batch_size, channel, width, height) --> (batch_size, width as seq_length, height * channel as feature)
        out, _ = self.rnn(x, h0)    # (batch_size, seq_length, num_directions * hidden_size)
        out = out.reshape(-1, (self.seq_length * self.hidden_size))    # (batch, seq_length * num_directions * hidden_size)
        outputs = self.fc(out)    # (batch_size, num_classes)
        return outputs

In [None]:
import torch.optim as optim

In [None]:
seq_length = 28
input_size = 28
hidden_size = 50
num_layers = 1
num_classes = 10

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
model = ImageRNN(batch_size, seq_length, input_size, hidden_size, num_layers, num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
epochs = 10

model.train()
for epoch in range(epochs):
    train_loss = 0
    train_correct = 0

    for x, y in trainloader:
        x, y = x.to(device), y.to(device)
        h0 = torch.zeros(num_layers, batch_size, hidden_size).to(device)    # (num_layers * num_directions, batch_size, hidden_size)

        optimizer.zero_grad()
        outputs = model(x, h0)
        loss = criterion(outputs, y)
                
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        _, predicted = outputs.max(1)
        train_correct += predicted.eq(y).sum().item()
        
    train_loss = train_loss / len(trainloader)
    train_acc = train_correct / len(trainset)
        
    print('[%2d] TRAIN loss: %.4f, acc: %.4f' % (epoch + 1, train_loss, train_acc))

In [None]:
test_loss = 0
test_correct = 0
test_preds = []

model.eval()
with torch.no_grad():
    for x, y in testloader:
        x, y = x.to(device), y.to(device)
        h0 = torch.zeros(num_layers, batch_size, hidden_size).to(device)

        outputs = model(x, h0)
        loss = criterion(outputs, y)
        
        test_loss += loss.item()
        _, predicted = outputs.max(1)
        test_correct += predicted.eq(y).sum().item()
        
        test_preds.extend(predicted.tolist())
        
print('TEST loss: %.4f, acc: %.4f' % (test_loss/len(testloader), test_correct/len(testset)))

## Stacked RNN

In [None]:
batch_size = 3
input_size = 1
seq_length = 5
hidden_size = 10
num_layers = 4

In [None]:
stackedRNN = nn.RNN(
    input_size=input_size,
    hidden_size=hidden_size,
    num_layers=num_layers,
    batch_first=True
)

In [None]:
X = seq.view(batch_size, seq_length, input_size)

In [None]:
y, h_n = stackedRNN(X)

## Bi-directional RNN

In [None]:
biRNN = nn.RNN(
    input_size=input_size,
    hidden_size=hidden_size,
    num_layers=num_layers,
    batch_first=True,
    bidirectional=True
)

In [None]:
y, h_n = biRNN(X)

In [None]:
print(y.size())    # (batch_size, seq_length, hidden_size * num_directions)
print(h_n.size())    # (num_layers * num_directions, batch_size, hidden_size)

In [None]:
y_bi = y.view(batch_size, seq_length, 2, hidden_size)

print(y_bi.size())

In [None]:
y_forward = y_bi[:,:,0,:]
y_backward = y_bi[:,:,1,:]

print(y_forward.size())
print(y_backward.size())

In [None]:
h_n_bi = h_n.view(num_layers, 2, batch_size, hidden_size)

print(h_n_bi.size())

In [None]:
h_n_forward = h_n_bi[:,:,0,:]
h_n_backward = h_n_bi[:,:,1,:]

print(h_n_forward.size())
print(h_n_backward.size())

## LSTM


In [None]:
lstm = nn.LSTM(
    input_size=input_size,
    hidden_size=hidden_size,
    num_layers=num_layers,
    batch_first=True,
    dropout=0,
    bidirectional=False
)

In [None]:
y, h_n = lstm(X)

In [None]:
print(y.size())    # (batch_size, seq_length, hidden_size * num_directions)

## Character Prediction with RNN

In [None]:
char_set = ['d', 'e', 'h', 'l', 'o', 'r', 'w', ' ']

input_size = len(char_set)
hidden_size = 16
output_size = len(char_set)

In [None]:
x = [[2, 1, 3, 3, 4, 7, 6, 4, 5, 3]] # hello worl
x_onehot = [[[0, 0, 1, 0, 0, 0, 0, 0],  # h
             [0, 1, 0, 0, 0, 0, 0, 0],  # e
             [0, 0, 0, 1, 0, 0, 0, 0],  # l
             [0, 0, 0, 1, 0, 0, 0, 0],  # l
             [0, 0, 0, 0, 1, 0, 0, 0],  # o
             [0, 0, 0, 0, 0, 0, 0, 1],  #
             [0, 0, 0, 0, 0, 0, 1, 0],  # w
             [0, 0, 0, 0, 1, 0, 0, 0],  # o
             [0, 0, 0, 0, 0, 1, 0, 0],  # r
             [0, 0, 0, 1, 0, 0, 0, 0]]]

y = [[1, 3, 3, 4, 7, 6, 4, 5, 3, 0]] # ello world

X = torch.FloatTensor(x_onehot)
Y = torch.LongTensor(y)

In [None]:
class simpleRNN(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.rnn = torch.nn.RNN(input_dim, hidden_dim, batch_first=True)
        self.fc = torch.nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x, _status = self.rnn(x)
        x = self.fc(x)
        return x

In [None]:
model = simpleRNN(input_size, hidden_size, output_size)
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), 0.1)

In [None]:
import numpy as np

In [None]:
epochs = 5

model.train()
for epoch in range(epochs):
    optimizer.zero_grad()
    outputs = model(X)
    loss = criterion(outputs.view(-1, input_size), Y.view(-1))

    loss.backward()
    optimizer.step()

    predicted = outputs.data.numpy().argmax(axis=2)
    prediction = ''.join([char_set[c] for c in np.squeeze(predicted)])
    print('[%2d] TRAIN loss: %.4f, pred: %s' % (epoch + 1, loss.item(), prediction))

## Gender Classficiation with RNN

In [None]:
char_set = ['a', 'd', 'e', 'h', 'i', 'n', 'o', 'p', 'r', 's', 'w']
input_size = len(char_set)
hidden_size = 22
output_size = 1

In [None]:
x = [[0, 5, 1, 8, 2, 10], # andrew,
     [9, 6, 7, 3, 4, 0]]  # sophia

x_onehot = [[[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # a
             [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],  # n
             [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # d
             [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],  # r
             [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0],  # e
             [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]], # w
            
            [[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0],  # s
             [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],  # o
             [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0],  # p
             [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0],  # h
             [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],  # i
             [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]] # a

y = [[0],  # Male
     [1]]  # Female

X = torch.FloatTensor(x_onehot)
Y = torch.FloatTensor(y)

In [None]:
class simpleRNN(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.rnn = torch.nn.RNN(input_dim, hidden_dim, batch_first=True)
        self.fc = torch.nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x, _status = self.rnn(x)
        x = self.fc(x)
        return x

In [None]:
model = simpleRNN(input_size, hidden_size, output_size)
criterion = torch.nn.MSELoss()
optimizer = optim.Adam(model.parameters(), 0.1)

In [None]:
epochs = 20

for epoch in range(epochs):
    optimizer.zero_grad()
    outputs = model(X)
    loss = criterion(outputs[:, -1, :].squeeze(), Y.view(-1))

    loss.backward()
    optimizer.step()

    predicted = ["Male" if x < 0.5 else "Female" for x in outputs[:, -1, :].squeeze().tolist()]
    print('[%2d] TRAIN loss: %.4f, pred: %s' % (epoch + 1, loss.item(), predicted))