In [None]:
#!pip install torchtext==0.6

In [None]:
# Import Libraries
# Core PyTorch library for tensor operations and model training
import torch

# TorchText library for text preprocessing, batching, and dataset loading
from torchtext import data

# PyTorch's neural network module â€“ used to define model layers and architectures
import torch.nn as nn

In [None]:
# Sets a Random Seed: Reproducibility
# When using same seed value, any unit (computer) will result the same
SEED = 1234

# Initialize PyTorch global random number generator
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

# Process Input Text Data
TEXT = data.Field(tokenize = 'spacy',  # Spacy Tokenizer to split text into words (tokens)
                  tokenizer_language = 'en_core_web_sm',  # English tokenizer
                  include_lengths = True)  # Length of each sentences

# Process Labels (targets)
LABEL = data.LabelField(dtype = torch.float)

In [None]:
# Load the IMDB Dataset
from torchtext import datasets
# Split the review text ("TEXT") and binary sentiment labels ("LABEL" = {"positive","negative"})
train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)

In [None]:
# Create Validation data by Split from Training Data
import random

train_data, valid_data = train_data.split(random_state = random.seed(SEED))

In [None]:
# Develop the Vocabulary
MAX_VOCAB_SIZE = 25000    # Keep only the top 25,000 most frequent tokens

# Word-to-index vocabulary from Training text
TEXT.build_vocab(train_data,
                 max_size = MAX_VOCAB_SIZE,
                 vectors = "glove.6B.100d",       # Pre-trianed GloVe word embeddings (100-dimensional)
                 unk_init = torch.Tensor.normal_) # Initializ unknown words (OOV) with random normal values

# Develop Label Vocabulary
# Converts "positive" -> 1, "negative" -> 0
LABEL.build_vocab(train_data)

In [None]:
# Examples (reviews) in one training step
BATCH_SIZE = 64

# Automatically chooses the best available device: GPU (CUDA), otherwise CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# PyTorch-compatible iterators (dataloaders) for batching and feeding data into the model.
train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size = BATCH_SIZE,
    sort_within_batch = True,
    device = device)

In [None]:
import torch.nn as nn

class RNN(nn.Module):
  def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers,
               bidirectional, dropout, pad_idx):
    super().__init__()

    self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)

    self.rnn = nn.LSTM(embedding_dim,
                       hidden_dim,
                       num_layers=n_layers,
                       bidirectional=bidirectional,
                       dropout=dropout
                       )

    self.fc = nn.Linear(hidden_dim * 2, output_dim)

    self.dropout = nn.Dropout(dropout)

  def forward(self, text, text_lengths):
    # text = [sent len, batch size]
    embedded = self.dropout(self.embedding(text))
    # embedded = [sent len, batch size, emb dim]

    # Pack Sequence
    # Lengths Need to be on CPU
    packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths.to('cpu'))

    packed_output, (hidden, cell) = self.rnn(packed_embedded)

    # Unpack Sequence
    output, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_output)

    # Output = [sent len, batch size, hid dim * num directions]
    # Ouput over padding tokens are zero tensors

    # hidden = [num layers * num directions, batch size, hid dim]
    # cell = [num layers * num directions, batch size, hid dim]

    # Concat the final forward (hidden[-2,:,:]) and backward (hidden[-1,:,:]) hidden layers
    # and apply dropout

    hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1))

    # hidden = [batch size, hid dim * num directions]

    return self.fc(hidden)

In [None]:
# Parameters
INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
OUTPUT_DIM = 1
N_LAYERS = 2
BIDIRECTIONAL = True   # Move to Right and Above | Hidden calculate two nodes independently
DROPOUT = 0.5
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]   # Padding Index, to ensure batch process from different amount of words each sentences

# Model
model = RNN(INPUT_DIM,
            EMBEDDING_DIM,
            HIDDEN_DIM,
            OUTPUT_DIM,
            N_LAYERS,
            BIDIRECTIONAL,
            DROPOUT,
            PAD_IDX
            )

In [None]:
def count_parameters(model):
  return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

In [None]:
pretrained_embeddings = TEXT.vocab.vectors

print(pretrained_embeddings.shape)

In [None]:
model.embedding.weight.data.copy_(pretrained_embeddings)

In [None]:
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]

model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)

print(model.embedding.weight.data)

In [None]:
# Optimizer
import torch.optim as optim

optimizer = optim.Adam(model.parameters())

In [None]:
# Criterion
# Binary Classification
criterion = nn.BCEWithLogitsLoss()

# Move to GPU
model = model.to(device)
criterion = criterion.to(device)

In [None]:
# Evaluation Metrics
def binary_accuracy(preds, y):
  """
  Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8.
  """

  # Round predictions to the closest binary value
  rounded_preds = torch.round(torch.sigmoid(preds))
  correct = (rounded_preds == y).float()
  acc = correct.sum() / len(correct)
  return acc

In [None]:
# Training
def train(model, iterator, optimizer, criterion):

  epoch_loss = 0
  epoch_acc = 0

  model.train()

  for batch in iterator:
    optimizer.zero_grad()
    text, text_lengths = batch.text  # Unpack text and text_lengths from batch.text
    predictions = model(text, text_lengths).squeeze(1) # Pass both text and text_lengths
    loss = criterion(predictions, batch.label)
    acc = binary_accuracy(predictions, batch.label)
    loss.backward()
    optimizer.step()

    epoch_loss += loss.item()
    epoch_acc += acc.item()

  return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
# Evaluation
def evaluate(model, iterator, criterion):

  epoch_loss = 0
  epoch_acc = 0

  model.eval()

  with torch.no_grad():
    for batch in iterator:

      text, text_lengths = batch.text

      predictions = model(text, text_lengths).squeeze(1)

      loss = criterion(predictions, batch.label)

      acc = binary_accuracy(predictions, batch.label)

      epoch_loss += loss.item()
      epoch_acc += acc.item()

  return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
import time

def epoch_time(start_time, end_time):
  elapsed_time = end_time - start_time
  elapsed_mins = int(elapsed_time / 60)
  elapsed_secs = int(elapsed_time - (elapsed_mins * 60))

  return elapsed_mins, elapsed_secs

In [None]:
N_EPOCH = 5

best_valid_loss = float('inf')

for epoch in range(N_EPOCH):

  start_time = time.time()

  train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
  valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)

  end_time = time.time()

  epoch_mins, epoch_secs = epoch_time(start_time, end_time)

  if valid_loss < best_valid_loss:
    best_valid_loss = valid_loss
    torch.save(model.state_dict(), 'tut2-model.pt')

    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\tVal. Loss: {valid_loss:.3f} | Val. Acc: {valid_acc*100:.2f}%')

In [None]:
model_load_state_dict(torch_load/'tut2-model.pt')

test_loss, test_acc = evaluate(model, test_iterator, criterion)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

In [None]:
import spacy
nlp = spacy.load('en_core_web_sm')

def predict_sentiment(model, sentence):
  model.eval()
  tokenized = [tok.text for tok in nlp.tokenizer(sentence)]
  indexed = [TEXT.vocab.stoi[t] for t in tokenized]
  length = [len(indexed)]
  tensor = torch.LongTensor(indexed).to(device)
  tensor = tensor.unsqueeze(1)
  length_tensor = torch.LongTensor(length)
  prediction = torch.sigmoid(model(tensor, length_tensor))
  return prediction.item()

In [None]:
predict_sentiment(model, "This film is terrible")

In [None]:
predict_sentiment(model, "This film is great")