In [1]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
from torch.autograd import Variable

In [2]:
with open('anna.txt', 'r') as f:
    text = f.read()

In [3]:
chars = set(text)
int2char = dict(enumerate(chars))
char2int = {ch: ii for ii, ch in int2char.items()}

In [4]:
encoded = np.array([char2int[ch] for ch in text])

In [49]:
def get_batches(arr, n_seqs, n_steps):
    
    batch_size = n_seqs * n_steps
    n_batches = len(arr)//batch_size
    arr = arr[:n_batches * batch_size]
    arr = arr.reshape((n_steps, -1))
    
    for n in range(0, arr.shape[1], n_seqs):
        x = arr[:, n:n+n_seqs]
        y = np.zeros_like(x)
        y[:, :-1], y[:, -1] = x[:, 1:], x[:, 0]
        yield torch.from_numpy(x), torch.from_numpy(y)

In [112]:
class CharRNN(nn.Module):
    def __init__(self, labels, embed_dim=50, n_steps=100, 
                              n_hidden=256, n_layers=2,
                              dropout=0.5, lr=0.001):
        super().__init__()
        self.dropout = dropout
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        
        self.chars = labels
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        self.embed = nn.Embedding(len(self.chars), embed_dim)
        self.dropout = nn.Dropout(dropout)
        self.lstm = nn.LSTM(embed_dim, n_hidden, n_layers, dropout=dropout)
        self.fc = nn.Linear(n_hidden, len(self.chars))
        
        self.init_weights()
        
        self.opt = torch.optim.Adam(self.parameters(), lr=lr)
        self.criterion = nn.CrossEntropyLoss()
        
    def forward(self, x, hc):
        x = self.embed(x)
        x = self.dropout(x)
        x, (h, c) = self.lstm(x, hc)
        x = self.dropout(x)
        
        # Stack up LSTM outputs
        x = x.view(x.size()[0]*x.size()[1], self.n_hidden)
        
        x = self.fc(x)
        
        return x, (h, c)
    
    def sample(self, size, prime='The'):
        self.eval()
        chars = [ch for ch in prime]
        h = self.init_hidden(1)
        for ch in prime:
            h = tuple([Variable(each.data) for each in h])
            x = np.array([[char2int[ch]]])
            inputs = Variable(torch.from_numpy(x))
            out, h = self.forward(inputs, h)

        p = F.softmax(out).data.numpy().squeeze()
        char = np.random.choice(np.arange(len(self.chars)), p=p)
        chars.append(self.int2char[char])

        for ii in range(size):
            h = tuple([Variable(each.data) for each in h])

            x = np.array([[char2int[chars[-1]]]])
            inputs = Variable(torch.from_numpy(x))
            out, h = self.forward(inputs, h)

            p = F.softmax(out).data.numpy().squeeze()
            char = np.random.choice(np.arange(len(self.chars)), p=p)
            chars.append(self.int2char[char])

        return ''.join(chars)
    
    def init_weights(self):
        initrange = 0.1
        self.embed.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.fill_(0)
        self.fc.weight.data.uniform_(-initrange, initrange)
        
    def init_hidden(self, n_seqs):
        # Create two new tensors with sizes n_layers x n_seqs x n_hidden,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data
        return (Variable(weight.new(self.n_layers, n_seqs, self.n_hidden).zero_()),
                Variable(weight.new(self.n_layers, n_seqs, self.n_hidden).zero_()))
        

In [115]:
def train(net, epochs, cuda=False, print_every=10):
    net.train()
    if cuda:
        net.cuda()
    counter = 0
    for e in range(epochs):
        h = net.init_hidden(n_seqs)
        for x, y in get_batches(encoded, n_seqs, n_steps):
            counter += 0
            
            inputs, targets = Variable(x), Variable(y)
            if cuda:
                inputs, targets = inputs.cuda(), targets.cuda()

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([Variable(each.data) for each in h])

            net.zero_grad()
            
            output, h = net.forward(inputs, h)
            loss = net.criterion(output, targets.view(n_seqs*n_steps))

            loss.backward()
            net.opt.step()
            
            if counter % print_every == 0:
                print("Epoch: {}/{}".format(e+1, epochs),
                      "Loss: {:.4f}".format(loss.data[0]))
    return net

In [116]:
net = CharRNN(chars)
train(net, 5)

Epoch: 1/5 Loss: 4.4093
Epoch: 1/5 Loss: 4.3762
Epoch: 1/5 Loss: 4.3369
Epoch: 1/5 Loss: 4.2919
Epoch: 1/5 Loss: 4.2138
Epoch: 1/5 Loss: 4.0801
Epoch: 1/5 Loss: 3.8209
Epoch: 1/5 Loss: 3.5027
Epoch: 1/5 Loss: 3.5417
Epoch: 1/5 Loss: 3.4775
Epoch: 1/5 Loss: 3.3658
Epoch: 1/5 Loss: 3.3107
Epoch: 1/5 Loss: 3.3307
Epoch: 1/5 Loss: 3.3153
Epoch: 1/5 Loss: 3.2786
Epoch: 1/5 Loss: 3.2761
Epoch: 1/5 Loss: 3.2722
Epoch: 1/5 Loss: 3.2627
Epoch: 1/5 Loss: 3.2466
Epoch: 1/5 Loss: 3.2674
Epoch: 1/5 Loss: 3.2350
Epoch: 1/5 Loss: 3.2283
Epoch: 1/5 Loss: 3.2015
Epoch: 1/5 Loss: 3.2185
Epoch: 1/5 Loss: 3.2245
Epoch: 1/5 Loss: 3.2110
Epoch: 1/5 Loss: 3.1948
Epoch: 1/5 Loss: 3.1992
Epoch: 1/5 Loss: 3.1970
Epoch: 1/5 Loss: 3.1600
Epoch: 1/5 Loss: 3.1618
Epoch: 1/5 Loss: 3.1953
Epoch: 1/5 Loss: 3.1905
Epoch: 1/5 Loss: 3.1731
Epoch: 1/5 Loss: 3.1881
Epoch: 1/5 Loss: 3.2116
Epoch: 1/5 Loss: 3.1939
Epoch: 1/5 Loss: 3.1999
Epoch: 1/5 Loss: 3.1924
Epoch: 1/5 Loss: 3.1508
Epoch: 1/5 Loss: 3.1562
Epoch: 1/5 Loss:

Epoch: 2/5 Loss: 2.6313
Epoch: 2/5 Loss: 2.6144
Epoch: 2/5 Loss: 2.6280
Epoch: 2/5 Loss: 2.6490
Epoch: 2/5 Loss: 2.6275
Epoch: 2/5 Loss: 2.6495
Epoch: 2/5 Loss: 2.6255
Epoch: 2/5 Loss: 2.5962
Epoch: 2/5 Loss: 2.6020
Epoch: 2/5 Loss: 2.6198
Epoch: 2/5 Loss: 2.6237
Epoch: 2/5 Loss: 2.6258
Epoch: 2/5 Loss: 2.6106
Epoch: 2/5 Loss: 2.6111
Epoch: 2/5 Loss: 2.6114
Epoch: 2/5 Loss: 2.6448
Epoch: 2/5 Loss: 2.6165
Epoch: 2/5 Loss: 2.6277
Epoch: 2/5 Loss: 2.6117
Epoch: 2/5 Loss: 2.6256
Epoch: 2/5 Loss: 2.6180
Epoch: 2/5 Loss: 2.5867
Epoch: 2/5 Loss: 2.6007
Epoch: 2/5 Loss: 2.6015
Epoch: 2/5 Loss: 2.5976
Epoch: 2/5 Loss: 2.6048
Epoch: 2/5 Loss: 2.5863
Epoch: 2/5 Loss: 2.6225
Epoch: 2/5 Loss: 2.6276
Epoch: 2/5 Loss: 2.5987
Epoch: 2/5 Loss: 2.6073
Epoch: 2/5 Loss: 2.6230
Epoch: 2/5 Loss: 2.5725
Epoch: 2/5 Loss: 2.5964
Epoch: 2/5 Loss: 2.5840
Epoch: 2/5 Loss: 2.6268
Epoch: 2/5 Loss: 2.6223
Epoch: 2/5 Loss: 2.5916
Epoch: 2/5 Loss: 2.6230
Epoch: 2/5 Loss: 2.5900
Epoch: 2/5 Loss: 2.6120
Epoch: 2/5 Loss:

KeyboardInterrupt: 

In [111]:
net.sample(100)

'Thelrd A buni s trethar oyis tery ids lercauce f, s d te ond ony areny wk ve louves theuois an Vfe"s inc'

In [39]:
def repackage_hidden(h):
    """Wraps hidden states in new Variables, to detach them from their history."""
    if type(h) == Variable:
        return Variable(h.data)
    else:
        return tuple(repackage_hidden(v) for v in h)

In [102]:
def sample(net, size, prime='The'):
    net.eval()
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        h = tuple([Variable(each.data) for each in h])
        x = np.array([[char2int[ch]]])
        inputs = Variable(torch.from_numpy(x))
        out, h = net.forward(inputs, h)
    
    p = F.softmax(out).data
    chars.append(int2char[p.max(1)[1].numpy()[0,0]])
    
    for ii in range(size):
        h = tuple([Variable(each.data) for each in h])
        
        x = np.array([[char2int[chars[-1]]]])
        inputs = Variable(torch.from_numpy(x))
        out, h = net.forward(inputs, h)
        
        p = F.softmax(out).data
        #print(p.numpy().squeeze())
        char = np.random.choice(np.arange(len(net.chars)), p=p.numpy().squeeze())
        chars.append(int2char[char])
        
    return ''.join(chars)

In [103]:
print(sample(net, 200, prime='Anna'))

Annall3)&uK6kZaKUfsg0o9rGyf%ORE7E7DnCPTV&(a%49yfDPhRQ0:BU.44)d?&G7BnM':68kaW*%Z:oTezUFxKFJV2TsS4JZ
`u-MxlzFpVXzk?
;`_Y8gSv@hrDe,BInJa(`68alj-VRkx"Dkn;4`)4567a/5ZXT aOXgk?7K@"wD*4o6?F*%:nH(.IqBosI8z!N
NSD J
