In [None]:
!pip install torchtext==0.17.0
!pip install portalocker

import torch
import torch.nn.functional as F
import torchtext
import torchtext.datasets
import torch.nn as nn

In [None]:
train_iter, test_iter = torchtext.datasets.IMDB(split=('train','test')) #split train and test dataset
tokenizer = torchtext.data.utils.get_tokenizer('basic_english') #use basic english tokenizer

In [None]:
MODELNAME = "imdb-lstm.model"
EPOCH = 10
BATCHSIZE = 64
LR = 5e-5
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print (DEVICE)

In [None]:
train_data = [(label, tokenizer (line)) for label, line in train_iter]
train_data.sort(key = lambda x: len(x[1]))
test_data = [(label, tokenizer (line)) for label, line in test_iter]
test_data.sort(key = lambda x: len(x[1]))

In [None]:
for i in range(10):
  print(train_data[i])

In [None]:
#use same bow to embedding
def make_vocab(train_data, min_freq):
  vocab= {}
  for label, tokenlist in train_data:
    for token in tokenlist:
      if token not in vocab:
        vocab [token] = 0
      vocab [token] += 1
  vocablist = [('<unk>', 0), ('<pad>', 0), ('<cls>', 0), ('<eos>',0)]
  vocabidx = {}
  #example: "Ton" : 5 -> token = "Ton" and freq = 5
  for token, freq in vocab.items():
    if freq >= min_freq:
      idx= len(vocablist)
      vocablist.append((token, freq))
      vocabidx [token] = idx
  vocabidx[ '<unk>']=0
  vocabidx['<pad>']=1
  vocabidx['<cls>']=2
  vocabidx['<eos>']=3
  return vocablist, vocabidx

In [None]:
vocablist, vocabidx = make_vocab(train_data, 10)

In [None]:
def preprocess (data, vocabidx):
  rr = []
  for label, tokenlist in data:
    tkl = ['<cls>']
    for token in tokenlist:
      tkl.append(token if token in vocabidx else '<unk>')
    tkl.append('<eos>')
    rr.append((label, tkl))
  return rr

In [None]:
train_data = preprocess (train_data, vocabidx)
test_data = preprocess (test_data, vocabidx)
for i in range(10):
  print(train_data[i])

In [None]:
def make_batch(data, batchsize):
  bb = []
  blabel = []
  btokenlist = []
  for label, tokenlist in data:
    blabel.append(label)
    btokenlist.append(tokenlist)
    if len(blabel) > batchsize:
      bb.append((btokenlist, blabel))
      blabel = []
      btokenlist = []
  if len(blabel) > 0:
    bb.append((btokenlist, blabel))
  return bb

train_data = make_batch(train_data, BATCHSIZE)
test_data = make_batch(test_data, BATCHSIZE)
for i in range(10):
  print(train_data[i])

In [None]:
#create padding for same len
def padding(bb):
  for tokenlists, labels in bb:
    maxlen = max([len(x) for x in tokenlists])
    for tkl in tokenlists:
      for i in range(maxlen-len(tkl)):
        tkl.append('<pad>')
  return bb

In [None]:
train_data = padding(train_data)
test_data = padding(test_data)
for i in range(10):
  print(train_data[i])

In [None]:
#embedding data
def word2id(bb, vocabidx):
  rr = []
  for tokenlists, labels in bb:
    id_labels = [label - 1 for label in labels]
    id_tokenlists = []
    for tokenlist in tokenlists:
      id_tokenlists.append([vocabidx[token] for token in tokenlist])
    rr.append((id_tokenlists, id_labels))
  return rr

In [None]:
train_data = word2id (train_data, vocabidx)
test_data = word2id(test_data, vocabidx)
for i in range(10):
  print(train_data[i])

In [None]:
class MyLSTM(nn.Module):
    def __init__(self):
        super(MyLSTM, self).__init__()
        vocabsize = len(vocablist)
        self.emb = nn.Embedding(vocabsize, 300, padding_idx=vocabidx['<pad>'])
        #create auto hidden size
        self.lstm = nn.LSTM(input_size=300, hidden_size=300, num_layers=1, batch_first=True)
        self.fc = nn.Linear(300, 2)

    def forward(self, x):
        e = self.emb(x)
        h0 = torch.zeros(1, x.size(0), 300).to(DEVICE)
        c0 = torch.zeros(1, x.size(0), 300).to(DEVICE)
        out, _ = self.lstm(e, (h0, c0))
        out = out[:, -1, :]  # Lấy đầu ra của LSTM ở thời điểm cuối cùng
        out = self.fc(out)
        return out

In [None]:
def train():
  model = MyLSTM().to (DEVICE)
  optimizer = torch.optim.Adam(model.parameters(), lr=LR)
  for epoch in range(EPOCH):
    loss = 0
    for tokenlists, labels in train_data:
      tokenlists = torch.tensor(tokenlists, dtype = torch.int64).to (DEVICE)
      labels = torch.tensor(labels, dtype=torch.int64).to (DEVICE)
      optimizer.zero_grad()
      y = model(tokenlists)
      batchloss = F.cross_entropy (y, labels)
      batchloss.backward()
      optimizer.step()
      loss = loss + batchloss.item()
    print("epoch", epoch,": loss", loss)
  torch.save(model.state_dict(), MODELNAME)

In [None]:
def test():
  total=0
  correct = 0
  model = MyLSTM().to (DEVICE)
  model.load_state_dict(torch. load (MODELNAME))
  model.eval()
  for tokenlists, labels in test_data:
    total += len(labels)
    tokenlists = torch.tensor (tokenlists, dtype = torch.int64).to (DEVICE)
    labels = torch.tensor (labels, dtype=torch.int64).to (DEVICE)
    y=model (tokenlists)
    pred_labels = y.max(dim=1) [1]
    correct+=(pred_labels == labels).sum()
  print("correct: ", correct.item())
  print("total: ",total)
  print("accuracy: ", (correct.item()/float(total)))

In [None]:
train()