In [1]:
import numpy as np
import torch 
from torch import nn
import torch.nn.functional as F

In [2]:
with open("anna.txt", 'r') as f:
    text = f.read()

In [3]:
text[:100]

'Chapter 1\n\n\nHappy families are all alike; every unhappy family is unhappy in its own\nway.\n\nEverythin'

In [4]:
chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch : ii for ii, ch in int2char.items()}

encoded = np.array([char2int[ch] for ch in text])
encoded[:100]

array([17, 22, 72, 15, 57, 74, 81, 10, 76, 44, 44, 44,  3, 72, 15, 15, 64,
       10, 53, 72, 34, 60, 33, 60, 74, 26, 10, 72, 81, 74, 10, 72, 33, 33,
       10, 72, 33, 60, 27, 74, 56, 10, 74, 59, 74, 81, 64, 10, 14, 63, 22,
       72, 15, 15, 64, 10, 53, 72, 34, 60, 33, 64, 10, 60, 26, 10, 14, 63,
       22, 72, 15, 15, 64, 10, 60, 63, 10, 60, 57, 26, 10, 67, 78, 63, 44,
       78, 72, 64, 31, 44, 44, 21, 59, 74, 81, 64, 57, 22, 60, 63])

In [5]:
print (int2char)

{0: 'F', 1: '/', 2: 'W', 3: 'H', 4: 'O', 5: 'Q', 6: '!', 7: 'M', 8: ':', 9: 'L', 10: ' ', 11: 'z', 12: 'K', 13: '%', 14: 'u', 15: 'p', 16: '*', 17: 'C', 18: 'b', 19: '$', 20: 'G', 21: 'E', 22: 'h', 23: 'N', 24: '_', 25: 'U', 26: 's', 27: 'k', 28: 'I', 29: '-', 30: 'Z', 31: '.', 32: '6', 33: 'l', 34: 'm', 35: 'd', 36: 'j', 37: '0', 38: '8', 39: ')', 40: 'c', 41: 'D', 42: 'x', 43: '2', 44: '\n', 45: 'Y', 46: 'A', 47: 'S', 48: 'R', 49: 'V', 50: '7', 51: '?', 52: '`', 53: 'f', 54: 'J', 55: 'q', 56: ';', 57: 't', 58: '3', 59: 'v', 60: 'i', 61: '4', 62: '&', 63: 'n', 64: 'y', 65: '9', 66: 'X', 67: 'o', 68: 'g', 69: 'T', 70: ',', 71: '(', 72: 'a', 73: 'B', 74: 'e', 75: '5', 76: '1', 77: 'P', 78: 'w', 79: "'", 80: '"', 81: 'r', 82: '@'}


In [6]:
def one_hot_encode(arr, n_labels):
    one_hot = np.zeros((arr.shape[0]*arr.shape[1], n_labels), dtype=np.float32)
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1
    one_hot = one_hot.reshape((arr.shape[0],arr.shape[1], n_labels))
    return one_hot

In [7]:
test_seq = np.array([[3, 5, 1]])
print (one_hot_encode(test_seq, 6))

[[[0. 0. 0. 1. 0. 0.]
  [0. 0. 0. 0. 0. 1.]
  [0. 1. 0. 0. 0. 0.]]]


In [8]:
def get_batches(arr, batch_sz, seq_length):
    batch_sz_total = batch_sz * seq_length
    n_batches = len(arr) // batch_sz_total
    arr = arr[:n_batches*batch_sz_total]
    arr = arr.reshape((batch_sz,-1))
    for n in range(0, arr.shape[1], seq_length):
        x = arr[: ,n:n+seq_length]
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n + seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [9]:
batches = get_batches(encoded, 8, 50)
x, y = next(batches)
# printing out the first 10 items in a sequence
print('x\n', x[:10, :10])
print('\ny\n', y[:10, :10])

x
 [[17 22 72 15 57 74 81 10 76 44]
 [26 67 63 10 57 22 72 57 10 72]
 [74 63 35 10 67 81 10 72 10 53]
 [26 10 57 22 74 10 40 22 60 74]
 [10 26 72 78 10 22 74 81 10 57]
 [40 14 26 26 60 67 63 10 72 63]
 [10 46 63 63 72 10 22 72 35 10]
 [ 4 18 33 67 63 26 27 64 31 10]]

y
 [[22 72 15 57 74 81 10 76 44 44]
 [67 63 10 57 22 72 57 10 72 57]
 [63 35 10 67 81 10 72 10 53 67]
 [10 57 22 74 10 40 22 60 74 53]
 [26 72 78 10 22 74 81 10 57 74]
 [14 26 26 60 67 63 10 72 63 35]
 [46 63 63 72 10 22 72 35 10 26]
 [18 33 67 63 26 27 64 31 10 80]]


In [10]:
from torch.nn import LSTM
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [11]:
class CharRNN(nn.Module):
    def __init__(self,tokens,n_hiddens=256,n_layers=2,drop_prob=0.5,lr=0.001):
        super().__init__()
        self.drop_prob=drop_prob
        self.n_layers=n_layers
        self.n_hidden=n_hidden
        self.lr=lr
        self.chars=tokens
        self.int2char= dict(enumerate(self.chars))
        self.char2int={ch: ii for ii, ch in self.int2char.items()}
        
        self.lstm = LSTM(input_size = len(self.chars),
                         hidden_size = n_hiddens ,
                         num_layers = n_layers, 
                         batch_first = True,
                         dropout = drop_prob)
        self.fc = nn.Linear(n_hiddens, len(self.chars))
    
    def forward(self, x, hidden):
        out , hidden = self.lstm(x,hidden)
        out = out.contiguous().view(-1, self.n_hidden)
        out = self.fc(out)
        return out, hidden
    
    def init_hidden(self, batch_size):
#         weight = next(self.parameters()).data
#         h = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
#              weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        h = (torch.zeros(self.n_layers, batch_size, self.n_hidden, device=device),
            torch.zeros(self.n_layers, batch_size, self.n_hidden, device=device))
        return h

In [12]:
a=torch.zeros(1,1,1, 10)
print (a.shape)

torch.Size([1, 1, 1, 10])


In [13]:
def train(net, data, epochs= 10, batch_size = 10, 
          seq_length = 50, lr = 0.001, clip = 5, 
          val_frac = 0.1, print_every = 10):
    net.train()
    opt = torch.optim.Adam(net.parameters(), lr= lr)
    criterion = nn.CrossEntropyLoss()
    
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]
    
    net.to(device)
    counter = 0
    n_chars = len(net.chars)
    
    for e in range(epochs):
        # initialize hidden state
        h = net.init_hidden(batch_size)
        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            inputs, targets = inputs.to(device), targets.to(device)
            
            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])
            
            net.zero_grad()
            output, h = net(inputs, h)
            loss = criterion(output, targets.view(batch_size*seq_length))
            loss.backward()
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            opt.step()
            
            if counter % print_every == 0:
                # Get validation loss
                val_h = net.init_hidden(batch_size)
                val_losses = []
                net.eval()
                for x, y in get_batches(val_data, batch_size, seq_length):
                    # One-hot encode our data and make them Torch tensors
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    
                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs, targets = x, y
                    inputs, targets = inputs.to(device), targets.to(device)

                    output, val_h = net(inputs, val_h)
                    val_loss = criterion(output, targets.view(batch_size*seq_length))
                
                    val_losses.append(val_loss.item())
                
                net.train() # reset to train mode after iterationg through validation data
                
                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))

In [14]:
# define and print the net
n_hidden=512
n_layers=2

net = CharRNN(chars, n_hidden, n_layers)
print(net)

CharRNN(
  (lstm): LSTM(83, 512, num_layers=2, batch_first=True, dropout=0.5)
  (fc): Linear(in_features=512, out_features=83, bias=True)
)


In [21]:
batch_size = 1024
seq_length = 100
n_epochs = 20 # start smaller if you are just testing initial behavior

# train the model
train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.02, print_every=10)

Epoch: 1/20... Step: 10... Loss: 2.0822... Val Loss: 2.0070
Epoch: 2/20... Step: 20... Loss: 1.7177... Val Loss: 1.6776
Epoch: 2/20... Step: 30... Loss: 1.5399... Val Loss: 1.5364
Epoch: 3/20... Step: 40... Loss: 1.4430... Val Loss: 1.4724
Epoch: 3/20... Step: 50... Loss: 1.3792... Val Loss: 1.4318
Epoch: 4/20... Step: 60... Loss: 1.3466... Val Loss: 1.4009
Epoch: 5/20... Step: 70... Loss: 1.3138... Val Loss: 1.3761
Epoch: 5/20... Step: 80... Loss: 1.2916... Val Loss: 1.3612
Epoch: 6/20... Step: 90... Loss: 1.2659... Val Loss: 1.3513
Epoch: 6/20... Step: 100... Loss: 1.2435... Val Loss: 1.3593
Epoch: 7/20... Step: 110... Loss: 1.2426... Val Loss: 1.3407
Epoch: 8/20... Step: 120... Loss: 1.2755... Val Loss: 1.3267
Epoch: 8/20... Step: 130... Loss: 1.2117... Val Loss: 1.3162
Epoch: 9/20... Step: 140... Loss: 1.2019... Val Loss: 1.3105
Epoch: 9/20... Step: 150... Loss: 1.1980... Val Loss: 1.3085
Epoch: 10/20... Step: 160... Loss: 1.1909... Val Loss: 1.3096
Epoch: 10/20... Step: 170... Los

In [16]:
# 7 : 1.62
print (device, type(device))

cuda <class 'torch.device'>


In [17]:
def predict(net, char, h=None, top_k=None):
        ''' Given a character, predict the next character.
            Returns the predicted character and the hidden state.
        '''
        
        # tensor inputs
        x = np.array([[net.char2int[char]]])
        x = one_hot_encode(x, len(net.chars))
        inputs = torch.from_numpy(x)
        
        inputs = inputs.to(device)
        
        # detach hidden state from history
        h = tuple([each.data for each in h])
        # get the output of the model
        out, h = net(inputs, h)

        # get the character probabilities
        p = F.softmax(out, dim=1).data
        if(device == torch.device("cuda")):
            p = p.cpu() # move to cpu
        
        # get top characters
        if top_k is None:
            top_ch = np.arange(len(net.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        # select the likely next character with some element of randomness
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())
        
        # return the encoded value of the predicted char and the hidden state
        return net.int2char[char], h

In [18]:
def sample(net, size, prime='The', top_k=None):
        
    net.to(device)
    
    net.eval() # eval mode
    
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = predict(net, ch, h, top_k=top_k)

    chars.append(char)
    
    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = predict(net, chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

In [22]:
print(sample(net, 1000, prime='Anna', top_k=5))

Anna, as they
will appreciate in the first minute. And have you been
unable to come."

"Oh, yes, time in that is or taking off his soul."

Alexey Alexandrovitch was standing at the carden finger. The more they would say to their mouth, and to arrange the standal and what
he was not, happy from the study of his she forgotten, was there and shakon and hurriedly
into the balance of the
more strangers. Besides to the peasant, the children were satisfied, and turning harvesting ages or without him about the
forest over her strange, and at once. Alexey
Alexandrovitch had not an intellegent frost to see him to her, and almost all that strange file all of her side of
something, and had never seen her sister, the most chalk of the prince, the most passages of
the meaning
of the memory of the classion, and was, as though he had never had to say something.

"I'm afraid! All right, about it!" she said, "that's there are the club to her, but I am very well, to touch her father, anyway," he said, an