<a href="https://colab.research.google.com/github/utkarsh0702/Pytorch/blob/master/PyTorch6_Character_wise_RNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F

In [2]:
with open('Anna Karenina.txt', 'r') as f:
  text=f.read()
text[:100]

'Chapter 1\nHappy families are all alike; every unhappy family is unhappy in its own way.\n\nEverything '

In [3]:
#Tokenization
chars= tuple(set(text))
int2char= dict(enumerate(chars))
char2int= {ch:i for i, ch in int2char.items()}

#Encode the text
encoded = np.array([char2int[ch] for ch in text])
encoded[:100]

array([74, 29, 32,  5, 73, 68, 30, 55, 60,  7, 26, 32,  5,  5,  2, 55, 34,
       32, 28, 23, 66, 23, 68, 25, 55, 32, 30, 68, 55, 32, 66, 66, 55, 32,
       66, 23,  8, 68, 12, 55, 68, 46, 68, 30,  2, 55, 21,  9, 29, 32,  5,
        5,  2, 55, 34, 32, 28, 23, 66,  2, 55, 23, 25, 55, 21,  9, 29, 32,
        5,  5,  2, 55, 23,  9, 55, 23, 73, 25, 55, 41, 16,  9, 55, 16, 32,
        2, 65,  7,  7, 20, 46, 68, 30,  2, 73, 29, 23,  9,  6, 55])

In [4]:
#Data Preprocessing
def one_hot_encode(arr,n_labels):
  one_hot= np.zeros((np.multiply(*arr.shape), n_labels), dtype= np.float32)
  one_hot[np.arange(one_hot.shape[0]), arr.flatten()]=1
  one_hot= one_hot.reshape((*arr.shape, n_labels))
  return one_hot

#Checking one hot encoder
test_seq= np.array([[3,5,1]])
one_hot= one_hot_encode(test_seq, 8)
print(one_hot)

[[[0. 0. 0. 1. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0. 1. 0. 0.]
  [0. 1. 0. 0. 0. 0. 0. 0.]]]


In [5]:
#defining the batchs and sequences
def get_batches(arr, batch_size, seq_length):
  batch_size_total= batch_size*seq_length
  n_batches= len(arr)//batch_size_total

  arr= arr[:n_batches*batch_size_total]
  arr= arr.reshape((batch_size,-1))

  for n in range(0, arr.shape[1],seq_length):
    x= arr[:,n:n+seq_length]
    y= np.zeros_like(x)
    try:
      y[:,:-1],y[:,-1]= x[:,1:], arr[:,n+seq_length]
    except IndexError:
      y[:,:-1],y[:,-1]= x[:,1:], arr[:,0]
    
    yield x,y

batches= get_batches(encoded, 8,50)
x,y= next(batches)

print('X\n', x[:10,:10])
print('Y\n', y[:10,:10])

X
 [[74 29 32  5 73 68 30 55 60  7]
 [76 55 76 23 25 39 41  9 25 41]
 [16  9 55 29 41 21 25 68 55 25]
 [ 2 41  9 68 55 16 32 25 55 33]
 [66 68 55 25 28 23 66 68 71 55]
 [68 55 32  9 76 55 76 68 32 73]
 [55 29 32 76 55 23 28 32  6 23]
 [ 9 65 55 62 42 30 68 55 73 29]]
Y
 [[29 32  5 73 68 30 55 60  7 26]
 [55 76 23 25 39 41  9 25 41 66]
 [ 9 55 29 41 21 25 68 55 25 29]
 [41  9 68 55 16 32 25 55 33 21]
 [68 55 25 28 23 66 68 71 55 32]
 [55 32  9 76 55 76 68 32 73 29]
 [29 32 76 55 23 28 32  6 23  9]
 [65 55 62 42 30 68 55 73 29 68]]


In [0]:
#Defining the model
device= torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class CharRNN(nn.Module):
  def __init__(self, tokens, n_hidden=255, n_layers=2, drop_prob=0.5, lr=0.001):
    super().__init__()
    self.n_hidden= n_hidden
    self.n_layers= n_layers
    self.drop_prob= drop_prob
    self.lr= lr

    #Creating character dictonary
    self.chars= tokens
    self.int2char= dict(enumerate(self.chars))
    self.char2int= {ch:i for i, ch in self.int2char.items()}

    #Layers
    self.lstm= nn.LSTM(len(self.chars), n_hidden, n_layers, dropout= drop_prob, batch_first= True)
    self.dropout= nn.Dropout(drop_prob)
    self.fc= nn.Linear(n_hidden, len(self.chars))
  
  def forward(self, x, hidden):
    r_output, hidden= self.lstm(x, hidden)
    out= self.dropout(r_output)
    out= out.view(-1, self.n_hidden)
    out= self.fc(out)
    return out, hidden
  
  def init_hidden(self, batch_size):
    weight= next(self.parameters()).data
    hidden=(weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(), weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
    return hidden

In [0]:
def train(net, data, epochs=10, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10):
  net.train()
  opt= torch.optim.Adam(net.parameters(), lr=lr)
  loss_function= nn.CrossEntropyLoss()
  net.to(device)
  val_idx= int(len(data)*(1-val_frac))
  data, val_data= data[:val_idx], data[val_idx:]
  counter=0
  n_chars=len(net.chars)

  for e in range(epochs):
    counter=0
    h= net.init_hidden(batch_size)
    for x,y in get_batches(data, batch_size, seq_length):
      counter+=1
      x= one_hot_encode(x, n_chars)
      inputs, targets= torch.from_numpy(x), torch.from_numpy(y)
      inputs, targets= inputs.to(device), targets.to(device)
      h= tuple([each.data for each in h])
      net.zero_grad()
      output, h= net(inputs, h)

      loss= loss_function(output, targets.view(batch_size*seq_length))
      loss.backward()
      nn.utils.clip_grad_norm_(net.parameters(), clip)
      opt.step()
      if counter%print_every==0:
        val_h= net.init_hidden(batch_size)
        val_losses=[]
        net.eval()
        for x,y in get_batches(val_data, batch_size, seq_length):
          x= one_hot_encode(x, n_chars)
          inputs, targets= torch.from_numpy(x), torch.from_numpy(y)
          inputs, targets= inputs.to(device), targets.to(device)
          val_h= tuple([each.data for each in val_h])
          output, val_h= net(inputs, val_h)
          val_loss= loss_function(output, targets.view(batch_size*seq_length))
          val_losses.append(val_loss.item())
        net.train()
        print("Epochs: {}/{}.....".format(e+1, epochs), "Steps: {}.....".format(counter), "Loss: {}.....".format(loss.item()), "Val Loss: {}......".format(np.mean(val_losses)))


In [30]:
#Define and print the model
n_hidden=512
n_layers=2
net= CharRNN(chars, n_hidden, n_layers)
print(net)

batch_size= 128
seq_length= 100
n_epochs=20
train(net, encoded, epochs= n_epochs, batch_size= batch_size, seq_length= seq_length,lr=0.001, clip=5, val_frac=0.1, print_every=10 )

CharRNN(
  (lstm): LSTM(78, 512, num_layers=2, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=512, out_features=78, bias=True)
)
