# 自訂資料集 -- 情緒分析(Sentiment Analysis)

## 載入套件

In [18]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchtext
import numpy as np
import os

## 判斷GPU是否存在

In [19]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## 自訂資料集

In [78]:
# 資料集所在目錄
data_base_path = './aclImdb/'

class ImdbDataset(torch.utils.data.Dataset):
    def __init__(self, mode):
        super(ImdbDataset, self).__init__()
        if mode == "train":
            text_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
        else:
            text_path = [os.path.join(data_base_path, i) for i in ["test/neg", "test/pos"]]
        # print(text_path)
 
        self.total_file_path_list = []
        for i in text_path:
            self.total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
        # print(len(self.total_file_path_list))
 
    def __getitem__(self, idx):
        cur_path = self.total_file_path_list[idx]
        cur_filename = os.path.basename(cur_path)
        # print(cur_path)
        label = 0 if cur_path.find('/neg') > 0 else 1
        # text = tokenizer(open(cur_path, encoding="utf-8").read().strip())
        text = open(cur_path, encoding="utf-8").read().strip()
        return label, text
 
    def __len__(self):
        return len(self.total_file_path_list)

In [79]:
# 取得下一筆資料
dataset = ImdbDataset(mode="train")
print(dataset[0])
# for i in range(0, len(dataset), 1000):
#     print(dataset[i][0])

(0, "Story of a man who has unnatural feelings for a pig. Starts out with a opening scene that is a terrific example of absurd comedy. A formal orchestra audience is turned into an insane, violent mob by the crazy chantings of it's singers. Unfortunately it stays absurd the WHOLE time with no general narrative eventually making it just too off putting. Even those from the era should be turned off. The cryptic dialogue would make Shakespeare seem easy to a third grader. On a technical level it's better than you might think with some good cinematography by future great Vilmos Zsigmond. Future stars Sally Kirkland and Frederic Forrest can be seen briefly.")


## 詞彙表處理

In [80]:
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

# 分詞
tokenizer = get_tokenizer('basic_english')

# 建立 Generator 函數
def yield_tokens(data_iter):
    for _, text in data_iter:
        yield tokenizer(text)

# 由 train_iter 建立詞彙字典
vocab = build_vocab_from_iterator(yield_tokens(dataset), specials=["<unk>"])

# 設定預設的索引值
vocab.set_default_index(vocab["<unk>"])

In [81]:
# 測試詞彙字典，取得單字的索引值
vocab(['here', 'is', 'an', 'example'])

[131, 9, 40, 464]

In [82]:
import joblib

joblib.dump(vocab, os.path.join(data_base_path, 'vocab.joblib'))

['./aclImdb/vocab.joblib']

## 參數設定

In [83]:
EPOCHS = 10 # 訓練週期數
LR = 5  # 學習率
BATCH_SIZE = 64 # 訓練批量
# 取得標註個數
num_class = 2
vocab_size = len(vocab)
emsize = 64
hidden_dim = 16

## 定義資料轉換函數

In [84]:
text_pipeline = lambda x: vocab(tokenizer(x)) # 分詞、取得單字的索引值
label_pipeline = lambda x: x 

In [85]:
# 測試資料轉換
print(text_pipeline('here is an example'))
label_pipeline(2)

[131, 9, 40, 464]


2

## 建立模型

In [86]:
class TextClassificationModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_class):
        super().__init__()
        self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=True)
        self.rnn = nn.LSTM(embed_dim, hidden_dim, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, num_class)
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, text, offsets):
        embedded = self.embedding(text, offsets)
        rnn_out, h_out = self.rnn(embedded)
        return self.fc(rnn_out)

model = TextClassificationModel(vocab_size, emsize, num_class).to(device)

## 定義訓練及評估函數

In [87]:
import time

# 訓練函數
def train(dataloader):
    model.train()
    total_acc, total_count = 0, 0
    log_interval = 500
    start_time = time.time()

    for idx, (label, text, offsets) in enumerate(dataloader):
        optimizer.zero_grad()
        predicted_label = model(text, offsets)
        loss = criterion(predicted_label, label)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1)
        optimizer.step()
        total_acc += (predicted_label.argmax(1) == label).sum().item()
        total_count += label.size(0)
        if idx % log_interval == 0 and idx > 0:
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches '
                  '| accuracy {:8.3f}'.format(epoch, idx, len(dataloader),
                                              total_acc/total_count))
            total_acc, total_count = 0, 0
            start_time = time.time()

# 評估函數
def evaluate(dataloader):
    model.eval()
    total_acc, total_count = 0, 0

    with torch.no_grad():
        for idx, (label, text, offsets) in enumerate(dataloader):
            predicted_label = model(text, offsets)
            loss = criterion(predicted_label, label)
            total_acc += (predicted_label.argmax(1) == label).sum().item()
            total_count += label.size(0)
    return total_acc/total_count

## 建立DataLoader，逐批訓練

In [88]:
from torch.utils.data import DataLoader

# 批次處理
def collate_batch(batch):
    label_list, text_list, offsets = [], [], [0]
    for (_label, _text) in batch:
        label_list.append(label_pipeline(_label))
        processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64)
        text_list.append(processed_text)
        offsets.append(processed_text.size(0)) # 設定每筆資料的起始位置
    label_list = torch.tensor(label_list, dtype=torch.int64)
    offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)  # 單字的索引值累加
    text_list = torch.cat(text_list)
    return label_list.to(device), text_list.to(device), offsets.to(device)

dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, 
                        collate_fn=collate_batch)

## 測試 DataLoader

In [89]:
# 取得3筆資料
for idx,(label,text, offset) in enumerate(dataloader):
    print("idx：",idx)
    print("label:",label)
    print("text:",text)
    print("offset:",offset)
    if idx >= 2:
        break

idx： 0
label: tensor([1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0,
        1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 1,
        0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 0])
text: tensor([ 1039,    10,    16,  ...,     7, 10889,   156])
offset: tensor([    0,   275,   700,  1040,  1245,  2053,  2268,  2477,  2615,  2946,
         3038,  3442,  4040,  4182,  4371,  4480,  4610,  4874,  5089,  5278,
         5470,  6123,  6354,  6721,  7220,  7545,  8196,  8290,  8495,  8722,
         8973,  9134,  9346,  9559,  9735,  9872, 10007, 10620, 10754, 11239,
        11926, 12088, 12274, 12468, 12592, 12829, 12923, 13157, 13291, 13464,
        13599, 13745, 14528, 14734, 15103, 15246, 15389, 15624, 15708, 15958,
        16102, 16168, 16694, 16863])
idx： 1
label: tensor([0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 0, 1,
        1, 0, 0, 0, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 0, 0, 1, 0, 1, 0, 1, 0, 1, 1,
        1, 0, 1, 1,

In [90]:
from torch.utils.data.dataset import random_split

train_dataset = ImdbDataset(mode="train")
test_dataset = ImdbDataset(mode="test")

# 資料切割，95% 作為訓練資料
num_train = int(len(train_dataset) * 0.95)
split_train_, split_valid_ = \
    random_split(train_dataset, [num_train, len(train_dataset) - num_train])

# 建立DataLoader
train_dataloader = DataLoader(split_train_, batch_size=BATCH_SIZE,
                              shuffle=True, collate_fn=collate_batch)
valid_dataloader = DataLoader(split_valid_, batch_size=BATCH_SIZE,
                              shuffle=True, collate_fn=collate_batch)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE,
                             shuffle=True, collate_fn=collate_batch)

## 模型訓練

In [91]:
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LR)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.1)

total_accu = None
for epoch in range(1, EPOCHS + 1):
    epoch_start_time = time.time()
    train(train_dataloader)
    accu_val = evaluate(valid_dataloader)
    if total_accu is not None and total_accu > accu_val:
        scheduler.step()
    else:
        total_accu = accu_val
    print('-' * 59)
    print('| end of epoch {:3d} | time: {:5.2f}s | '
          'valid accuracy {:8.3f} '.format(epoch,
                                           time.time() - epoch_start_time,
                                           accu_val))
    print('-' * 59)

-----------------------------------------------------------
| end of epoch   1 | time: 18.40s | valid accuracy    0.490 
-----------------------------------------------------------
-----------------------------------------------------------
| end of epoch   2 | time: 19.20s | valid accuracy    0.706 
-----------------------------------------------------------
-----------------------------------------------------------
| end of epoch   3 | time: 18.39s | valid accuracy    0.767 
-----------------------------------------------------------
-----------------------------------------------------------
| end of epoch   4 | time: 20.06s | valid accuracy    0.811 
-----------------------------------------------------------
-----------------------------------------------------------
| end of epoch   5 | time: 20.41s | valid accuracy    0.836 
-----------------------------------------------------------
-----------------------------------------------------------
| end of epoch   6 | time: 18.68s |

## 模型評估

In [92]:
print(f'測試資料準確度: {evaluate(test_dataloader):.3f}')

測試資料準確度: 0.868


## 測試新資料

In [94]:
# 預測
label = {0:'負面', 1:'正面'}
def predict(text, text_pipeline):
    with torch.no_grad():
        text = torch.tensor(text_pipeline(text)).to(device)
        output = model(text, torch.tensor([0])).to(device)
        return output.argmax(1).item()

# 測試資料
my_test = open('./nlp_data/imdb_1.txt', encoding='utf8').read()
print(label[predict(my_test, text_pipeline)])

負面


In [98]:
print(label[predict("I like this movie very much", text_pipeline)])
print(label[predict("This movie is boring", text_pipeline)])


正面
負面


In [95]:
acc = 0
for i in range(20000):
    # if i < 100: print(test_dataset[i][0])
    acc += 1 if test_dataset[i][0] == predict(test_dataset[i][1], text_pipeline) else 0
print(acc)

17259
