# 循环神经网络
### 从零开始实现

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
X, W_xh = torch.randn(1, 5), torch.randn(5, 4) #X为5维数据，时序为1
H, W_hh = torch.randn(1, 4), torch.randn(4, 4)
b = torch.ones(1,1)
h_t = F.relu(torch.matmul(X, W_xh) + torch.matmul(H, W_hh) + b)
h_t

tensor([[1.4502, 0.8332, 0.0000, 2.4592]])

### Pytorch模块定义

In [5]:
class RnnNet(nn.Module):
    def __init__(self, dim_input, dim_hidden, dim_output):
        super(RnnNet, self).__init__()
        self.fc_x2h = nn.Linear(dim_input, dim_hidden)
        self.fc_h2h = nn.Linear(dim_hidden, dim_hidden, bias = False)
        self.fc_h2y = nn.Linear(dim_hidden, dim_output)  #4x1
        self.dim_hidden = dim_hidden
        
    def forward(self, x):
        h = x.new_zeros(1, self.dim_hidden)
        for t in range(x.size(0)):
            h = F.relu(self.fc_x2h(x[t:t+1]) + self.fc_h2h(h)) #
        return self.fc_h2y(h)

rnn = RnnNet(5, 20, 10)
t = torch.randn(20, 5) #时序长为20
rnn(t)

tensor([[ 0.0656,  0.1010, -0.1321,  0.2298, -0.0287, -0.2559, -0.3701, -0.3275,
         -0.5198,  0.5142]], grad_fn=<AddmmBackward0>)

In [6]:
rnn = nn.RNN(10, 20, 2, batch_first=True) # inputsize, hidden size, num_layers
input = torch.randn(3, 5, 10) # batchsize 3  时序长度为5， 10：特征维度
h0 = torch.randn(2, 3, 20) # 层数，batchsize, hiddensize
output, hn = rnn(input, h0)  #
print(hn.shape)
output.shape  # W_hy x H  输出size默认为h的维度

torch.Size([2, 3, 20])


torch.Size([3, 5, 20])

###  使用RNNCell进行单个样本运算

In [7]:
rnn = nn.RNNCell(10, 20)
input = torch.randn(6, 3, 10) # (time_steps, batch, input_size)
hx = torch.randn(3, 20)
output = []
for i in range(input.size(0)):
    hx = rnn(input[i], hx)
    output.append(hx)
output[0].shape

torch.Size([3, 20])

### gating RNN 

In [8]:
class RecNetWithGating(nn.Module):
    def __init__(self, dim_input, dim_recurrent, dim_output):
        super(RecNetWithGating, self).__init__()
        self.fc_x2h = nn.Linear(dim_input, dim_recurrent)
        self.fc_h2h = nn.Linear(dim_recurrent, dim_recurrent, bias = False)
        self.fc_x2z = nn.Linear(dim_input, dim_recurrent)
        self.fc_h2z = nn.Linear(dim_recurrent, dim_recurrent, bias = False)
        self.fc_h2y = nn.Linear(dim_recurrent, dim_output)
        self.dim_hidden = dim_recurrent
    def forward(self, input):
        h = input.new_zeros(1, self.dim_hidden)
        for t in range(input.size(0)):
            z = torch.sigmoid(self.fc_x2z(input[t:t+1]) + self.fc_h2z(h))
            hb = F.relu(self.fc_x2h(input[t:t+1]) + self.fc_h2h(h))
            h = z * h + (1 - z) * hb
        return self.fc_h2y(h)
    
rnn = RecNetWithGating(5, 4, 4)
t = torch.randn(20, 5) #时序长为20
rnn(t)

tensor([[ 0.1977,  0.0974,  0.0241, -0.1042]], grad_fn=<AddmmBackward0>)

### Pytorch LSTM

In [9]:
lstm = nn.LSTMCell(10, 20) #input_dim, recurrent dim
input = torch.randn(2, 3, 10) # (time_steps, batch, input_size)
hx = torch.randn(3, 20) # (batch, hidden_size)
cx = torch.randn(3, 20)
output = []
for i in range(input.size()[0]):
    hx, cx = lstm(input[i], (hx, cx)) # 每次输入一个时间样本
    output.append(hx)
output = torch.stack(output, dim=0)
output.shape

torch.Size([2, 3, 20])

In [10]:
class lstmNet(nn.Module):
    def __init__(self, dim_input, dim_recurrent, num_layers, dim_output):
        super(lstmNet, self).__init__()
        self.lstm = nn.LSTM(dim_input, dim_recurrent, num_layers)
        self.fc = nn.Linear(dim_recurrent, dim_output)
    def forward(self, x):
        hx, cx = self.lstm(x)
        o = hx[-1,:,:]
        o = o.squeeze(axis=0)
        return self.fc(o)

input = torch.randn(2, 3, 10) #T N C
lstm = lstmNet(10, 20, 1, 7)
output = lstm(input)
output.shape

torch.Size([3, 7])

### GRU

In [11]:
class gruNet(nn.Module):
    def __init__(self, dim_input, dim_recurrent, num_layers, dim_output):
        super(gruNet, self).__init__()
        self.gru = nn.GRU(dim_input, dim_recurrent, num_layers)
        self.fc = nn.Linear(dim_recurrent, dim_output)
    def forward(self, x):
        hx, cx = self.gru(x)
        o = hx[-1,:,:]
        o = o.squeeze(axis=0)
        return self.fc(o)

input = torch.randn(2, 3, 10) #T N C
gru = gruNet(10, 20, 1, 7)
output = gru(input)
output.shape

torch.Size([3, 7])

### IMDB 文本情感分类

In [5]:
import torch
from torch.utils.data import DataLoader,Dataset
import os
import re
 
# 路径需要根据情况修改，文件太大的时候可以引用绝对路径
data_base_path = r"F:\SZTU-教学文件\UG-深度学习方法与应用\examples\data\aclImdb_v1\aclImdb"
 
#1. 定义tokenize的方法，对评论文本分词
def tokenize(text):
    # fileters = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n'
    fileters = ['!','"','#','$','%','&','\(','\)','\*','\+',',','-','\.','/',':',';','<','=','>','\?','@'
        ,'\[','\\','\]','^','_','`','\{','\|','\}','~','\t','\n','\x97','\x96','”','“',]
    # sub方法是替换
    text = re.sub("<.*?>"," ",text,flags=re.S)# 去掉<...>中间的内容，主要是文本内容中存在<br/>等内容
    text = re.sub("|".join(fileters)," ",text,flags=re.S)# 替换掉特殊字符，'|'是把所有要匹配的特殊字符连在一起
    return [i.strip() for i in text.split()]# 去掉前后多余的空格


#2. 准备dataset
class ImdbDataset(Dataset):
    def __init__(self,mode):
        super(ImdbDataset,self).__init__()
        # 读取所有的训练文件夹名称
        if mode=="train":
            text_path = [os.path.join(data_base_path,i)  for i in ["train/neg","train/pos"]]
        else:
            text_path =  [os.path.join(data_base_path,i)  for i in ["test/neg","test/pos"]]
 
        self.total_file_path_list = []
        # 进一步获取所有文件的名称
        for i in text_path:
            self.total_file_path_list.extend([os.path.join(i,j) for j in os.listdir(i)])
 
 
    def __getitem__(self, idx):
        cur_path = self.total_file_path_list[idx]
        cur_filename = os.path.basename(cur_path)
        # 标题的形式是：3_4.txt	前面的3是索引，后面的4是分类
        # 原本的分类是1-10，现在变为0-9
        label = int(cur_filename.split("_")[-1].split(".")[0]) -1 #处理标题，获取label，-1是因为要转化为[0-9]
        text = tokenize(open(cur_path).read().strip()) #直接按照空格进行分词
        return label,text
 
    def __len__(self):
        return len(self.total_file_path_list)
    
# 测试是否能成功获取数据
dataset = ImdbDataset(mode="train")
print(dataset[0])

# Word2Sequence
class Word2Sequence:
    # 未出现过的词
    UNK_TAG = "UNK"
    PAD_TAG = "PAD"
    # 填充的词
    UNK = 0
    PAD = 1
 
    def __init__(self):
        self.dict = {
            self.UNK_TAG: self.UNK,
            self.PAD_TAG: self.PAD
        }
        self.count = {}
 
    def to_index(self, word):
        """word -> index"""
        return self.dict.get(word, self.UNK)
 
    def to_word(self, index):
        """index -> word"""
        if index in self.inversed_dict:
            return self.inversed_dict[index]
        return self.UNK_TAG
 
    def __len__(self):
        return len(self.dict)
 
    def fit(self, sentence):
        """count字典中存储每个单词出现的次数"""
        for word in sentence:
            self.count[word] = self.count.get(word, 0) + 1
 
    def build_vocab(self, min_count=None, max_count=None, max_feature=None):
        """
        构建词典
        只筛选出现次数在[min_count,max_count]之间的词
        词典最大的容纳的词为max_feature，按照出现次数降序排序，要是max_feature有规定，出现频率很低的词就被舍弃了
        """
        if min_count is not None:
            self.count = {word: count for word, count in self.count.items() if count >= min_count}
 
        if max_count is not None:
            self.count = {word: count for word, count in self.count.items() if count <= max_count}
 
        if max_feature is not None:
            self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_feature])
        # 给词典中每个词分配一个数字ID
        for word in self.count:
            self.dict[word] = len(self.dict)
        # 构建一个数字映射到单词的词典，方法反向转换，但程序中用不太到
        self.inversed_dict = dict(zip(self.dict.values(), self.dict.keys()))
 
    def transform(self, sentence, max_len=None):
        """
        根据词典给每个词分配的数字ID，将给定的sentence（字符串序列）转换为数字序列
        max_len：统一文本的单词个数
        """
        if max_len is not None:
            r = [self.PAD] * max_len
        else:
            r = [self.PAD] * len(sentence)
        # 截断文本
        if max_len is not None and len(sentence) > max_len:
            sentence = sentence[:max_len]
        for index, word in enumerate(sentence):
            r[index] = self.to_index(word)
        return np.array(r, dtype=np.int64)
 
    def inverse_transform(self, indices):
        """数字序列-->单词序列"""
        sentence = []
        for i in indices:
            word = self.to_word(i)
            sentence.append(word)
        return sentence

# 自定义的collate_fn方法
def collate_fn(batch):
    # 手动zip操作，并转换为list，否则无法获取文本和标签了
    batch = list(zip(*batch))
    labels = torch.tensor(batch[0], dtype=torch.int32)
    texts = batch[1]
    texts = torch.tensor([ws.transform(i, max_len) for i in texts])
    del batch
    # 注意这里long()不可少，否则会报错
    return labels.long(), texts.long()

train_batch_size = 64
test_batch_size = 500
max_len = 50
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def get_dataloader(train=True):
    if train:
        mode = 'train'
    else:
        mode = "test"
    dataset = ImdbDataset(mode)
    batch_size = train_batch_size if train else test_batch_size
    return DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

dataloader = get_dataloader()

for idx,(label,text) in enumerate(dataloader):
    print("idx：",idx)
    print("label:",label)
    print("text:",text)



FileNotFoundError: [Errno 2] No such file or directory: './model/ws.pkl'

In [3]:
# 建立词表
def fit_save_word_sequence():
    word_to_sequence = Word2Sequence()
    train_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
    # total_file_path_list存储总的需要读取的txt文件
    total_file_path_list = []
    for i in train_path:
        total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
    # tqdm是显示进度条的
    for cur_path in tqdm(total_file_path_list, ascii=True, desc="fitting"):
        word_to_sequence.fit(tokenize(open(cur_path, encoding="utf-8").read().strip()))
    word_to_sequence.build_vocab()
    # 对wordSequesnce进行保存
    pickle.dump(word_to_sequence, open("model/ws.pkl", "wb"))

    
ws = pickle.load(open("./model/ws.pkl", "rb"))


NameError: name 'pickle' is not defined