### character-level LSTM in Pytorch

#### 该笔记记录的模型如下所示
<img src="assets/charseq.jpeg" width="500">

#### 导入相关的包

In [1]:
import numpy as np 
import torch 
from torch import nn
import torch.nn.functional as F

####  读取数据

In [2]:
with open('data/anna.txt', 'r') as f:
    text = f.read()

In [3]:
text[:100]

'Chapter 1\n\n\nHappy families are all alike; every unhappy family is unhappy in its own\nway.\n\nEverythin'

#### 对其进行标记（tokenization）
即是为其数据建造一个词典，让每个字符均有对应的标签

In [4]:
chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch: ii for ii, ch in int2char.items()}

# encode the text
encoded = np.array([char2int[ch] for ch in text])

In [5]:
print(encoded[:100])

[51 52 28 26 43 53  1 77 64 15 15 15 16 28 26 26 10 77 78 28 67 37  2 37 53
 46 77 28  1 53 77 28  2  2 77 28  2 37 56 53 61 77 53  3 53  1 10 77 76 54
 52 28 26 26 10 77 78 28 67 37  2 10 77 37 46 77 76 54 52 28 26 26 10 77 37
 54 77 37 43 46 77 79 58 54 15 58 28 10  4 15 15 74  3 53  1 10 43 52 37 54]


#### 将其转换为one-hot编码
构造一个函数，使其具有使得原本的文本转化为one-hot功能

In [6]:
def one_hot_encode(arr,n_labels):
    
    one_hot = np.zeros((np.multiply(*arr.shape),n_labels),dtype = np.float32)
    
    one_hot [np.arange(one_hot.shape[0]),arr.flatten()] = 1
    
    one_hot = one_hot.reshape((*arr.shape,n_labels))
    
    return one_hot

In [7]:
test_seq = np.array([[3,5,1]])
one_hot = one_hot_encode(test_seq , 8)
print(one_hot)

[[[ 0.  0.  0.  1.  0.  0.  0.  0.]
  [ 0.  0.  0.  0.  0.  1.  0.  0.]
  [ 0.  1.  0.  0.  0.  0.  0.  0.]]]


#### 建造一个mini-batch
此处对应的是英文文本的batch，且训练数据是一段英文文本，故其batch的组成是其不同的英文序列
且为了保留batch的完整，需要丢弃掉最后一部分空余的数据

In [30]:
def get_batches(arr,batch_size,seq_length):
    
    batch_size_total = batch_size * seq_length
    
    n_batches = len(arr) //batch_size_total
    
    arr = arr[:n_batches * batch_size_total]
    
    arr = arr.reshape((batch_size , -1))
    
    for i in range(0,arr.shape[1],seq_length):
    
        x = arr[:,i:i+seq_length]
        
        y = np.zeros_like(x)
        
        try:
            
            y[:,:-1] ,y[:,-1] = x[:,1:], arr[:,n_batches+seq_length]
            
        except IndexError:
            
            y[:,:-1] , y [:,-1] = x[:,1:] , arr[:,0]
    
        yield x,y

In [31]:
batches = get_batches(encoded, 8, 50)
x, y = next(batches)
print('x\n', x[:10, :10])
print('\ny\n', y[:10, :10])

x
 [[51 52 28 26 43 53  1 77 64 15]
 [46 79 54 77 43 52 28 43 77 28]
 [53 54 66 77 79  1 77 28 77 78]
 [46 77 43 52 53 77 19 52 37 53]
 [77 46 28 58 77 52 53  1 77 43]
 [19 76 46 46 37 79 54 77 28 54]
 [77  0 54 54 28 77 52 28 66 77]
 [21 72  2 79 54 46 56 10  4 77]]

y
 [[52 28 26 43 53  1 77 64 15 15]
 [79 54 77 43 52 28 43 77 28 43]
 [54 66 77 79  1 77 28 77 78 79]
 [77 43 52 53 77 19 52 37 53 78]
 [46 28 58 77 52 53  1 77 43 53]
 [76 46 46 37 79 54 77 28 54 66]
 [ 0 54 54 28 77 52 28 66 77 46]
 [72  2 79 54 46 56 10  4 77 45]]


In [59]:
x[0,:]

array([65, 51, 19, 67, 35, 51, 19, 17, 31, 36, 77, 51, 65, 19, 54, 28, 28,
       35, 18, 51, 31, 77, 18, 51, 65, 67, 35, 51, 62, 54, 19, 51, 54,  2,
       19, 49, 51, 19, 67, 35, 51, 37, 36, 17, 65, 19,  5, 28, 35, 17])

### 定义模型
下图为大意的模型示意图
<img src="assets/charRNN.png" width=500px>
定义一个类实现一个预测的功能

In [11]:
train_on_gpu = torch.cuda.is_available()
if train_on_gpu:
    print('training on gpu')
else:
    print('training on cpu')

training on cpu


####  模型结构
采用__init__定义模型，结构如下：

①：创建对应的文本字典

②：定义一个LSTM模型，传入的参数包含，input_size,hidden_siize,n_layers,batch_first

③：定义一个dropout层

④：定义一个全连接层 

⑥：设置权重初始化 

In [32]:
class char_rnn(nn.Module):
    
    def __init__(self,tokens,n_hidden = 256,n_layers = 2, drop_prob = 0.5, lr = 0.01):
        
        super().__init__()
        self.drop_prob = drop_prob
        self.n_hidden = n_hidden
        self.n_layers = n_layers
        self.drop_prob = drop_prob
        
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch:li for li,ch in self.int2char.items()}
        
        self.lstm = nn.LSTM(len(self.chars),n_hidden,n_layers,
                            dropout = drop_prob,batch_first = True)
        
        self.dropout = nn.Dropout(drop_prob)
        
        self.fc = nn.Linear(n_hidden,len(self.chars))
        
    def forward(self,x,hidden):
        
        r_output,hidden = self.lstm (x,hidden)
        
        out = self.dropout(r_output)
        
        out = out.contiguous().view(-1,self.n_hidden)
        
        out = self.fc(out)
        
        return out,hidden 
    def init_hidden(self, batch_size):
        ''' Initializes hidden state '''
        # Create two new tensors with sizes n_layers x batch_size x n_hidden,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data
        
        if (train_on_gpu):
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        else:
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                      weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
        
        return hidden
    

####  开始训练模型
通过为模型设定epochs，学习率，以及其他的参数

优化器采用Adam优化器

训练时采用验证集查看其模型训练效果

In [56]:
def train(net,data,epochs=10,batch_size = 10,seq_length=50,lr = 0.01,clip = 5,
         val_frac = 0.1 , print_every = 10):
    """
    参数说明：
    
    net:对应的网络模型
    data:对应的文本模型
    epochs:对应有多少训练轮数
    seq_length:对于每个mini_batch有多少个字符串
    lr:学习率
    clip：梯度裁剪
    val_frac：验证集合的比例
    print_every：经过多少步打印损失
    """
    net.train()
    opt = torch.optim.Adam(net.parameters(),lr = lr)
    criterion = nn.CrossEntropyLoss()
    
    val_idx = int(len(data) * (1-val_frac))
    data,val_data = data[:val_idx] , data[val_idx:]
    
    if train_on_gpu:
        net.cuda()
        
    counter = 0
    n_chars = len(net.chars)
    for e  in range(epochs):
        
        h = net.init_hidden(batch_size)
        
        for x,y in get_batches(data,batch_size , seq_length):
            
            counter += 1
            
            x = one_hot_encode(x,n_chars)
            
            inputs , targets = torch.from_numpy(x) , torch.from_numpy(y)
            
            targets = targets.long()
            
            if train_on_gpu:
                
                inputs , targets = inputs.cuda() , targets.cuda()
            
            h = tuple([each.data for each in h])
            
            net.zero_grad()
            
            output,h = net(inputs,h)
            
            loss = criterion(output,targets.view(batch_size *(seq_length)))
            
            loss.backward()
            
            nn.utils.clip_grad_norm_(net.parameters(),clip)
            
            opt.step()
            
            if counter % print_every == 0:
                
                val_h = net.init_hidden(batch_size)
                
                val_losses = []
                
                net.eval()
                
                for x,y in get_batches(val_data,batch_size , seq_length):
                    
                    x = one_hot_encode(x,n_chars)
                    
                    x, y = torch.from_numpy(x) ,torch.from_numpy(y)
                    
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs , targets = x,y
                    
                    targets = targets.long()
                    
                    if train_on_gpu:
                        
                        inputs ,targets = inputs.cuda() , targets.cuda()
                        
                    output , val_h = net(inputs, val_h)
                    
                    val_loss = criterion(output,targets.view(batch_size*seq_length))
                    
                    val_losses.append(val_loss.item())
                    
                net.train()
                
                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))

In [52]:
n_hidden=512
n_layers=2
chars = tuple(set(text))
net = char_rnn(chars, n_hidden, n_layers)
print(net)

char_rnn(
  (lstm): LSTM(83, 512, num_layers=2, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5)
  (fc): Linear(in_features=512, out_features=83, bias=True)
)


In [57]:
batch_size = 128
seq_length = 100
n_epochs = 3 # start smaller if you are just testing initial behavior

# train the model
train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.001, print_every=10)

Epoch: 1/3... Step: 10... Loss: 3.1501... Val Loss: 3.1213


KeyboardInterrupt: 

In [36]:
def train(net,data,epochs = 10,batch_size = 10,seq_length = 50,lr = 0.01,
         clip = 5, val_frac = 0.1,print_every = 10):
    ''' Training a network 
    
        Arguments
        ---------
        
        net: CharRNN network
        data: text data to train the network
        epochs: Number of epochs to train
        batch_size: Number of mini-sequences per mini-batch, aka batch size
        seq_length: Number of character steps per mini-batch
        lr: learning rate
        clip: gradient clipping
        val_frac: Fraction of data to hold out for validation
        print_every: Number of steps for printing training and validation loss
    
    '''
    #模型进行训练
    net.train()
    #设置优化器
    opt = torch.optim.Adam(net.parameters(),lr = lr)
    #设置损失函数
    criterion = nn.CrossEntropyLoss()
    
    #划分验证集和训练集
    val_idx = int(len(data)*(1-val_frac))
    data,val_data = data[:val_idx] ,data[val_idx:]
    
    if train_on_gpu:
        net.cuda()
        
    counter = 0
    n_chars = len(net.chars)
    
    for e in range(epochs):
        
        h = net.init_hidden(batch_size)
        
        for x,y in get_batches(data,batch_size , seq_length):
            
            counter += 1
            x = one_hot_encode(x,n_chars)
            inputs, targets = torch.from_numpy(x),torch.from_numpy(y)
            
            targets = targets.long()
            
            if train_on_gpu:
                inputs,targets = inputs.cuda() , targets.cuda()
                
            h = tuple([each.data for each in h ])
            
            net.zero_grad()
            
            output,h = net(inputs,h)
            
            loss = criterion(output,targets.view(batch_size * seq_length))
            
            loss.backward()
            
            nn.utils.clip_grad_norm_(net.parameters(),clip)
            
            opt.step()
            
            if counter % print_every == 0:
                
                val_h = net.init_hidden(batch_size)
                
                val_losses = []
                
                net.eval()
                
                for x,y in get_batches(val_data,batch_size,seq_length):
                    
                    x = one_hot_encode(x,n_chars)
                    
                    x , y  = torch.from_numpy(x) , torch.from_numpy(y)
                    
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs, targets = x,y
                    
                    targets = targets.long()
                    
                    output,val_h = net(inputs,val_h)
                    
                    val_loss = criterion(output,targets.view(batch_size*seq_length))
                    
                    val_loss.append(val_loss.item())
                net.train()
                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))