# LSTM (長短期記憶網路)

## 確認 GPU 是否存在

In [1]:
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

## 載入 IMDB 資料集

In [2]:
import torch
from torchtext.datasets import IMDB

imdb = IMDB(split = "train")

type(imdb)

torch.utils.data.datapipes.iter.grouping.ShardingFilterIterDataPipe

## 測試

In [3]:
train_iter = iter(IMDB(split = "train"))

data = next(train_iter)
data

('pos',
 'Zentropa has much in common with The Third Man, another noir-like film set among the rubble of postwar Europe. Like TTM, there is much inventive camera work. There is an innocent American who gets emotionally involved with a woman he doesn\'t really understand, and whose naivety is all the more striking in contrast with the natives.<br /><br />But I\'d have to say that The Third Man has a more well-crafted storyline. Zentropa is a bit disjointed in this respect. Perhaps this is intentional: it is presented as a dream/nightmare, and making it too coherent would spoil the effect. <br /><br />This movie is unrelentingly grim--"noir" in more than one sense; one never sees the sun shine. Grim, but intriguing, and frightening.')

## 詞彙表處理

In [4]:
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

# 分詞
tokenizer = get_tokenizer("basic_english")

# 建立 Generator 函數
def yield_tokens(data_iter):
    for _, text in data_iter:
        yield tokenizer(text)

# 由 train_iter 建立詞彙字典
vocab = build_vocab_from_iterator(yield_tokens(train_iter), specials = ["<unk>"])

# 設定預設的索引值
vocab.set_default_index(vocab["<unk>"])

In [5]:
# 測試詞彙字典, 取得單字的索引值
vocab(["here", "is", "an", "example"])


[131, 9, 40, 464]

## 設定超參數

In [6]:
EPOCHS = 10
LR = 5
BATCH_SIZE = 64

## 取得標籤個數, 詞彙表大小, 嵌入層大小, 隱藏層維度

In [7]:
num_class = len(set([label for (label, text) in imdb]))
vocab_size = len(vocab)
emsize = 64
hidden_dim = 16

## 定義資料轉換函數

In [8]:
text_pipeline = lambda x: vocab(tokenizer(x))
label_pipline = lambda x: 0 if x == "neg" else 1

## 建立模型
- EmbeddingBag + LSTM + Linear

In [9]:
from torch import nn

class TextClassificationModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_class):
        super().__init__()
        self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse = True)
        self.rnn = nn.LSTM(embed_dim, hidden_dim, bidirectional = True)
        self.fc = nn.Linear(hidden_dim * 2, num_class)
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, text, offsets):
        embedded = self.embedding(text, offsets)
        rnn_out, h_out = self.rnn(embedded)
        return self.fc(rnn_out)

model = TextClassificationModel(vocab_size, emsize, num_class).to(device)

## 定義損失函數, 優化器, 學習率調整器

In [11]:
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr = LR)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma = 0.1)

## 定義訓練及評估函數

In [19]:
import time

# 訓練函數
def train(dataloader):
    model.train()
    total_acc, total_count = 0, 0
    log_interval = 500
    start_time = time.time()
    for idx, (label, text, offsets) in enumerate(dataloader):
        optimizer.zero_grad()
        predicted_label = model(text, offsets)
        loss = criterion(predicted_label, label)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1)
        optimizer.step()
        total_acc += (predicted_label.argmax(1) == label).sum().item()
        total_count += label.size(0)
        if idx % log_interval == 0 and idx > 0:
            elapsed = time.time() - start_time
            print("| epoch {:3d} | {:5d}/{:5d} batches "
                    "| accuracy {:8.3f}".format(epoch, idx, len(dataloader),
                                                total_acc/total_count))
            total_acc, total_count = 0, 0
            start_time = time.time()

# 評估函數
def evaluate(dataloader):
    model.eval()
    total_acc, total_count = 0, 0
    with torch.no_grad():
        for idx, (label, text, offsets) in enumerate(dataloader):
            predicted_label = model(text, offsets)
            loss = criterion(predicted_label, label)
            total_acc += (predicted_label.argmax(1) == label).sum().item()
            total_count += label.size(0)
    return total_acc/total_count

## 建立 DataLoader

In [17]:
from torch.utils.data import DataLoader
from torch.utils.data.dataset import random_split
from torchtext.data.functional import to_map_style_dataset

# 批次處理
def collate_batch(batch):
    label_list, text_list, offsets = [], [], [0]
    for (_label, _text) in batch:
        label_list.append(label_pipline(_label))
        processed_text = torch.tensor(text_pipeline(_text), dtype = torch.int64)
        text_list.append(processed_text)
        offsets.append(processed_text.size(0))      # 設定每筆資料的起始位置
    label_list = torch.tensor(label_list, dtype = torch.int64)
    offsets = torch.tensor(offsets[:-1]).cumsum(dim = 0)       # 單字的索引值累加
    text_list = torch.cat(text_list)
    return label_list.to(device), text_list.to(device), offsets.to(device)

train_iter, test_iter = IMDB()

# 轉換為 Dataset
train_dataset = to_map_style_dataset(train_iter)
test_dataset = to_map_style_dataset(test_iter)

# 資料切割, 95% 作為訓練資料
num_train = int(len(train_dataset) * 0.95)
split_train_, split_valid_ = \
    random_split(train_dataset, [num_train, len(train_dataset)- num_train])

# 建立 DataLoader
train_dataloader = DataLoader(split_train_, batch_size = BATCH_SIZE,
                                shuffle = True, collate_fn = collate_batch)
valid_dataloader = DataLoader(split_valid_, batch_size = BATCH_SIZE,
                                shuffle = True, collate_fn = collate_batch)
test_dataloader = DataLoader(test_dataset, batch_size = BATCH_SIZE,
                                shuffle = True, collate_fn = collate_batch)

## 模型訓練

In [21]:
total_accu = None
for epoch in range(1, EPOCHS + 1):
    epoch_start_time = time.time()
    train(train_dataloader)
    accu_val = evaluate(valid_dataloader)
    if total_accu is not None and total_accu > accu_val:
        scheduler.step()
    else:
        total_accu = accu_val
    print("-" * 59)
    print("| end of epoch {:3d} | time: {:5.2f}s | "
            "valid accuracy {:8.3f} ".format(epoch,
                                    time.time()-epoch_start_time,
                                    accu_val))
    print("-" * 59)

-----------------------------------------------------------
| end of epoch   1 | time:  4.87s | valid accuracy    0.786 
-----------------------------------------------------------
-----------------------------------------------------------
| end of epoch   2 | time:  4.71s | valid accuracy    0.748 
-----------------------------------------------------------
-----------------------------------------------------------
| end of epoch   3 | time:  4.72s | valid accuracy    0.831 
-----------------------------------------------------------
-----------------------------------------------------------
| end of epoch   4 | time:  4.75s | valid accuracy    0.831 
-----------------------------------------------------------
-----------------------------------------------------------
| end of epoch   5 | time:  4.68s | valid accuracy    0.826 
-----------------------------------------------------------
-----------------------------------------------------------
| end of epoch   6 | time:  4.67s |

## 模型評估

In [22]:
print(f"測試資料準確度: {evaluate(test_dataloader):.3f}")

測試資料準確度: 0.857


## 測試新資料

In [23]:
# 預測
label = {0:"負面", 1:"正面"}
def predict(text, text_pipeline):
    text = torch.tensor(text_pipeline(text))
    output = model(text, torch.tensor([0]))
    return output.argmax(1).item()

# 測試資料
my_test = open("./nlp_data/IMDB.csv", encoding = "utf8").read()
model = model.to("cpu")
res = predict(my_test, text_pipeline)

print("This is a %s news." %label[res])

This is a 負面 news.


## 正面新聞有幾筆??

In [24]:
imdb_iterator = iter(IMDB(split = "train"))
label_rev = {"neg": 0, "pos": 1}
acc = 0
for i in range(20000):
    data = next(imdb_iterator)
    acc += 1 if label_rev[data[0]] == predict(data[1], text_pipeline) else 0
print(acc)

17577
