In [None]:
import glob
import torch
import pathlib
import re
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader, TensorDataset
import tqdm
from statistics import mean

In [None]:
remove_marks_regrex = re.compile("[,\.\(\)\[\]\*:;]|<.*?>")
shift_marks_regrex = re.compile("([?!])")

In [None]:
def text2ids(text, vocab_dict):
    # !?以外の記号を削除
    text = remove_marks_regrex.sub("", text)
    # I?と単語の間にスペースを入れる
    text = shift_marks_regrex.sub(r" \1", text)
    tokens = text.split()
    return [vocab_dict.get(token, 0) for token in tokens]

def list2tensor(token_idxes, max_len=100, padding= True):
    if len(token_idxes) > max_len:
        token_idxes = token_idxes[:max_len]
    n_tokens = len(token_idxes)
    if padding:
        token_idxes = token_idexes + [0]*(max_len - len(token_idxes))
    return torch.tensor(token_idxes, dtype= torch.int64), n_tokens

In [None]:
class IMDBDataset(Dataset):
    def __init__(self, dir_path, train=True, max_len=100, padding=True):
        self.max_len = max_len
        self.padding = padding
        path = pathlib.Path(dir_path)
        print(path)

        vocab_path = path.joinpath('imdb.vocab')

        # ボキャラブラリファイルを読み込み、行ごとに分割
        self.vocab_array =  vocab_path.open().read().strip().splitlines()

        # 単語をキーとして、値がIDのdictを作る
        self.vocab_dict = {w: i+1 for (i, w) in enumerate(self.vocab_array)}
        if train:
            target_path = path.joinpath('train')
        else:
            target_path = path.joinpath('test')
        pos_files = stored(glob.glob(str(target_path.joinpath('pos/*.txt'))))
        neg_files = stored(glob.glob(str(target_path.joinpath('neg/*.txt'))))

        # zipはlistをまとめる
        self.labeled_files = \
            list(zip([0].len(neg_files), neg_files)) + \
            list(zip([1].len(pos_files), pos_files))
        
    @property
    def vocab_size(self):
        return len(self.vacab_array)
    
    def __len__(self):
        return len(self.labeled_files)
    
    def __getitem__(self, idx):
        label, f = self.labeled_files[idx]
        data = open(f).read().lower()
        data = text2ids(data, self.vacab_dict)
        data, n_tokens = list2tensor(data, self.max_len, self.padding)
        return data, label, n_tokens

In [None]:
train_data = IMDBDataset('/content/sample_data/')
test_data = IMDBDataset('/content/sample_data/', train=False)

train_loader = DataLoader(train_data, batch_size=32, shuffle=True, num_workers=4)
test_loader = DataLoader(test_data, batch_size=32, shuffle=True, num_workers=4)

/content/sample_data


FileNotFoundError: ignored

In [None]:
class SequenceTaggingNet(nn.Module):
    def __init__(self, num_embeddings, embedding_dim=50, hidden_size=50, num_layers=1, dropout=0.2):
        super().__init__()
        self.emb = nn.Embedding(num_embeddings, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True, dropout= dropout)
        self.linear = nn.Linear(hidden_size, 1)

    def forward(self, x, h0=None, l=None):
        x = self.emb(x)
        x, h = self.lstm(x, h0)
        if l is not None:
            x = x[list(range(len(x))), l-1, :]
        else:
            x = x[:, -1, :]
        x = self.linear(x)
        x = x.squeeze()
        return x

In [None]:
def eval_net(net, data_loader, device='cpu'):
    net.eval()
    ys = []
    ypreds = []
    for x, y, l in data_loader:
        x = x.to(device)
        y = y.to(device)
        l = l.to(device)
        with torch.no_grad():
            y_pred = net(x, l=l)
            y_pred = (y_pred > 0).long()
            ys.append(y)
            ypreds.append(y_pred)
    ys = torch.cat(ys)
    ypreds = torch.cat(ypreds)
    acc = (ys==ypreds).float().sum() / len(ys)
    return acc.item()


In [None]:
from statistics import mean

net = SequenceTaggingNet(train_data.vocab_size+1, num_layers=2)
device = 'cpu'
net.to(device)
opt = optim.Adam(net.parameters())
loss_fn = nn.BCEWithLogitsLoss()

for epoch in range(10):
    losses = []
    net.train()
    for x, y, l in tqdm.tqdm(train_loader):
        x = x.to(device)
        y = y.to(device)
        l = l.to(device)
        y_pred = net(x, l=l)
        loss = loss_fn(y_pred, y.float())
        net.zero_grad()
        loss.backward()
        net.step()
        losses.append(loss.item())
    train_acc = eval_net(net, test_loader, device)
    val_acc = eval_net(net, test_loader, device)
    print(epoch, mean(losses), train_acc, val_acc)
