# Assignment 2

- In this assignment, you will train a model for sentiment analysis
    - Sentiment analysis is to predict whether the given text's sentiment is positive or negative
    - The input is a sequence of tokens and the output is a result of sigmoid function
- You have to submit a report (in pdf) and your code (in .py)
    - In your report, you have to briefly explain about your code
        - Code explanation can be very simple
    - Also, add explanation on these problems
        - Problem 6: Analyze the Prediction of Model 
    - You have to submit your code in .py file
        - Copy and paste your completed code to ``Assignment2.py`` file
- The main goal of this assignment is to implement a pipeline to train a neural network
    - Problem 1: Building a Dataset class (6 pts)
    - Problem 2: Build a Str2Idx2Str Converter (12 pts)
        - Complete the function
    - Problem 3: Implement a collate function (7 pts)
    - Problem 4: Implement a Binary Cross Entropy Loss (5 pts)
    - Problem 5: Complete Training Loop (12 pts)
        - Training with a single batch
        - Validate the model
    - Problem 6: Analyze the Prediction of Model (18 pts)
        - Write it in your report

In [None]:
# Download .py file. You have to copy and paste the completed function to this py file and submit it.
!wget https://raw.githubusercontent.com/jdasam/aat3020-2024/main/Assignment2.py
!wget https://raw.githubusercontent.com/jdasam/aat3020-2024/main/assignment2_pre_defined.py

## Preparation: Download dataset

In [None]:
!wget https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz # download dataset

In [None]:
!tar -xzf aclImdb_v1.tar.gz # unzip the file

In [None]:
from typing import List, Tuple, Union
import torch
import torch.nn as nn
from torchtext.data.utils import get_tokenizer
import matplotlib.pyplot as plt

### Make Datasplit
- In typical machine learning tasks, one has to split training set and validation set
    - Training set is to train the model's parameter
    - Validation set is to check how model works for unseen dataset
        - Validation set is used to optimize model's hyperparameter

In [None]:
'''
You don't have to change this cell
'''

from pathlib import Path
import random

train_path = Path('aclImdb/train')
test_path = Path('aclImdb/test')

def get_train_txt_paths_in_split(dir_path:str='aclImdb/train', seed:int=0):
  dir_path = Path(dir_path)
  train_set, valid_set = [], []
  random.seed(seed) # manually seed random so that you can get the same random result whenever you run the code for reproducibility
  for typ in ('pos', 'neg'):
    paths_of_typ = list( (dir_path / typ).glob('*.txt'))
    num_examples = len(paths_of_typ)
    num_train_sample = num_examples * 4 // 5
    
    paths_of_typ = sorted(paths_of_typ)
    random.shuffle(paths_of_typ) # shuffle the dataset
    train_set += paths_of_typ[:num_train_sample] # assign first num_train_sample samples for train set
    valid_set += paths_of_typ[num_train_sample:] # assign the remaining samples for validation set
    
  random.shuffle(train_set)
  random.shuffle(valid_set)
  
  return train_set, valid_set


train_pths, valid_pths = get_train_txt_paths_in_split(train_path)
test_pths = list(test_path.rglob("*.txt"))

print(f"Number of training data: {len(train_pths)}, validation data: {len(valid_pths)}, test data: {len(test_pths)}")

In [None]:
'''
 Print the first 10 paths in train_pths
'''
train_pths[:10]

#### Make Vocabulary
- This cell makes a vocabulary from the training set
  - using ``basic_english`` tokenizer
  - using ``Counter`` to count the number of tokens
  - using ``min_count`` to remove tokens that appear less than ``min_count`` times

In [None]:
from collections import Counter

def make_vocab_from_txt_fns(txt_fns_list:List(str), tokenizer):
  '''
  This function takes a list of txt file paths and returns a list of all the words in the txt files
  '''
  vocab = Counter()
  for txt_fn in txt_fns_list:
    with open(txt_fn, 'r') as f:
      for line in f:
        vocab.update(tokenizer(line))
  return vocab

tokenizer = get_tokenizer('basic_english')
entire_vocab = make_vocab_from_txt_fns(train_pths, tokenizer)
min_count = 5
vocab = sorted([token for token, count in entire_vocab.items() if count >= min_count])

print(f"Number of tokens in the entire vocabulary: {len(entire_vocab)}")
print(f"Number of tokens in the vocabulary with min_count = {min_count}: {len(vocab)}")
print(f"First 10 tokens in the vocabulary: {vocab[:10]}")
print(f"Last 10 tokens in the vocabulary: {vocab[-10:]}")
print(f"Middle 10 tokens in the vocabulary: {vocab[len(vocab)//2-5:len(vocab)//2+5]}")

## Problem 1: Complete the dataset class
- Complete the given class ``IMDbData`` 
    - ``IMDbData`` has a list of txt paths. Each txt corresponds to a single data sample.
        - **The label, whether the given sentence is positive or negative, is recorded in the name of directory path of the file**
        - You can convert ``Path`` instance to ``str`` by ``str(a_path)``
    - Complete two special methods ``__len__`` and ``__getitem__``
        - ``__len__`` returns the length of the dataset, which is number of total data samples in the dataset
        - ``__getitem__`` takes an index and returns the corresponding data sample for a given index
        - To read txt file, you can use a pre-defined ``read_txt`` function
            - ``read_txt`` gets an txt file path as an input and returns a content of the txt file in a string
        - To get a list of token, use ``self.tokenizer``

In [None]:
def read_txt(txt_path):
  with open(txt_path, 'r') as f:
    txt_string = f.readline()
  return txt_string

class IMDbData:
  def __init__(self, path_list):
    self.paths = path_list
    self.tokenizer = get_tokenizer('basic_english')
  
  def __len__(self):
    """
    __len__ is a special method that returns length of the instance when called with len(class_instance)
    e.g.
      dataset = IMDbData()
      length_of_dataset = len(dataset)
      
    TODO: Complete this function 
    """
    return 

  def __getitem__(self, idx):
    """
    __getitem__ is a special method that returns an item for a given index when called with class_instance[index]
    e.g.
      trainset = IMDbData(train_pths)
      trainset[6] == trainset.__getitem__(6)
      
    output: sequence_of_token, label
      sequence_of_token (list): a list of string (word token). Use self.tokenizer to make string into a list of word token
      label (int): 0 if the sentence is negative, 1 if the sentence is positive    
      
    HINT: use str(pth) to convert Path into String.
          You can find the label of the sample in its file directory path

          
    TODO: Complete this function using self.paths, self.tokenizer, and read_txt()
    """

    return 

trainset = IMDbData(train_pths)
validset = IMDbData(valid_pths)
short_validset = IMDbData(valid_pths[:100])
testset = IMDbData(test_pths)

# print('__len__ result for trainset: ', len(trainset))
# print('__getitem__ result: ', trainset[1])


In [None]:
'''
Examples of how tokenizer works
'''
trainset.tokenizer('this is my example Sentence!!')

In [None]:
'''
Test your IMDbData class
'''
trainset = IMDbData(train_pths)
assert len(trainset) == 20000 and len(validset) ==5000 and len(short_validset)==100
assert len(trainset[0]) == 2
assert trainset[154][0][10:15] == ['ends', 'right', 'after', 'this', 'little'], "Error in the trainset __getitem__ output"
assert trainset[594][1] == 0 and trainset[523][1] == 1 and trainset[1523][1] == 0, "Error in the trainset __getitem__ output"

print("Passed all the test cases!")

## Problem 2: Complete String to idx Converter
- Complete a class for converting a list of string to a list of integer
    - use the variable ``vocab``, which is a list of strings

In [None]:
class Str2Idx2Str:
  def __init__(self, vocab):
    '''
    Input:
      vocab: a list of strings, which contains all the words in the vocabulary
    TODO: Complete the class
    
    1. Declare self.idx2str
    - self.idx2str is a list of strings, which contains a string of word that corresponds to the index of the list
    - e.g. self.idx2str[your_index] returns a string value of your_index of the vocabulary
    - Use the input argument vocab to initialize self.idx2str
    
    2. Declare self.str2idx
    - self.str2idx is a dictionary, where its keys are the words in strings and its values are the corresponding index of each word
    - e.g. self.str2idx[your_word] returns an integer value of the index of your_word in the vocabulary

    '''
    self.idx2str = []
    self.str2idx = {}
    
    
    '''
    You have to add these lines,
    And explain in your report what is the function of these two lines 
    '''
    self.unknown_idx = len(self.str2idx)
    self.idx2str.append("UNKNOWN")
  
    
  
  def __call__(self, alist):
    '''
    This function converts list of word string to its index and vice versa.
    For example, if it takes ['if', 'anyone', 'who', 'loves', 'laurel', 'and', 'hardy', 'can', 'watch', 'this', 'movie', 'and', 'feel', 'good', 'about', 'it', ',', 'you'] as an input,
    it will return [83, 1544, 38, 6741, 15722, 5, 10801, 86, 1716, 37, 1005, 5, 998, 219, 59, 20, 1, 81].
    
    If it takes [83, 1544, 38, 6741, 15722, 5, 10801, 86, 1716, 37, 1005, 5, 998, 219, 59, 20, 1, 81],
    it will return ['if', 'anyone', 'who', 'loves', 'laurel', 'and', 'hardy', 'can', 'watch', 'this', 'movie', 'and', 'feel', 'good', 'about', 'it', ',', 'you']
    
    If it takes a list of list of string, such as [['after', 'watching', 'about', 'half', 'of'], ['reading', 'all', 'of', 'the', 'comments'], ['why', 'has', 'this', 'not', 'been'], ['this', 'is', 'a', 'really', 'strange']],
    it will return a list of list of integer, [[49, 2641, 59, 343, 3], [2185, 64, 3, 0, 1939], [738, 31, 37, 36, 51], [37, 14, 7, 588, 5186]]
    
    Vice versa, if it takes [[49, 2641, 59, 343, 3], [2185, 64, 3, 0, 1939], [738, 31, 37, 36, 51], [37, 14, 7, 588, 5186]] as an input,
    it will return [['after', 'watching', 'about', 'half', 'of'], ['reading', 'all', 'of', 'the', 'comments'], ['why', 'has', 'this', 'not', 'been'], ['this', 'is', 'a', 'really', 'strange']],
    
    Input: alist of strings, or a list of integers, or a list of lists
      e.g. alist = ['if', 'anyone', 'who', 'loves', 'laurel', 'and', 'hardy', 'can', 'watch', 'this', 'movie', 'and', 'feel', 'good', 'about', 'it', ',', 'you']
        or alist = [83, 1544, 38, 6741, 15722, 5, 10801, 86, 1716, 37, 1005, 5, 998, 219, 59, 20, 1, 81]
        or alist = [['after', 'watching', 'about', 'half', 'of'], ['reading', 'all', 'of', 'the', 'comments'], ['why', 'has', 'this', 'not', 'been'], ['this', 'is', 'a', 'really', 'strange']]
        or alist = [[49, 2641, 59, 343, 3], [2185, 64, 3, 0, 1939], [738, 31, 37, 36, 51], [37, 14, 7, 588, 5186]]
    
    IMPORTANT: If a word in the input list is not in the vocabulary of Str2Idx2Str, then it has to convert it into UNKNOWN token.
    
    
    Output: a list of integer
    
    TODO: Complete this function, using self.idx2str and self.str2idx
    
    Hint: You can figure out the type of input by using the function isinstance. It will return boolean.
        isinstance(an_item, list)
        isinstance(an_item, str)
        isinstance(an_item, int)
    '''
    
    # Write your code from here
    
    
    return

# Test the code
converter = Str2Idx2Str(vocab)
input_sentence = trainset[0][0][:20] #0th sample, text (instead of label), first 20 words
print(f"Input sentence: {input_sentence}")
print(f"Converted sentence: {converter(input_sentence)}")
print(f"Re-converted sentence: {converter(converter(input_sentence))}")
print(f"Result for a list of sentences/ input_list: {[trainset[i][0][:5]for i in range(1,5)]}, output_list: {converter([trainset[i][0][:5]for i in range(1,5)])}")



In [None]:
'''
Test your code by running this cell.

Don't change the test cases
'''

list_of_string = ['if', 'anyone', 'who', 'loves', 'laurel', 'and', 'hardy', 'can', 'watch', 'this', 'movie', 'and', 'feel', 'good', 'about', 'it', ',', 'you']
list_of_intger = [11987, 1487, 26426, 14417, 13830, 1310, 11072, 3792, 26166, 24238, 15911, 1310, 9041, 10407, 551, 12881, 76, 26889]

assert converter(list_of_string) == list_of_intger, \
    f"The output of converting list_of_string has to be same with list_of_intger. Your current output is {converter(list_of_string)}"
assert converter(list_of_intger) == list_of_string, \
    f"The output of converting list_of_intger has to be same with list_of_string. Your current output is {converter(list_of_intger)}"


list_of_string_list = [['after', 'watching', 'about', 'half', 'of'], ['reading', 'all', 'of', 'the', 'comments'], ['why', 'has', 'this', 'not', 'been'], ['this', 'is', 'a', 'really', 'strange']]
list_of_integer_list = [[902, 26173, 551, 10912, 16765], [19394, 1092, 16765, 24153, 5020], [26447, 11120, 24238, 16550, 2483], [24238, 12847, 493, 19419, 23088]]

assert converter(list_of_string_list) == list_of_integer_list, \
    f"The output of converting list_of_string_list has to be same with list_of_integer_list. Your current output is {converter(list_of_string_list)}"
assert converter(list_of_integer_list) == list_of_string_list, \
    f"The output of converting list_of_integer_list has to be same with list_of_string_list. Your current output is {converter(list_of_integer_list)}"

print("Passed all the test cases!")

## Problem 3: Complete Collate Function
- Every data sample in ``IMDbData`` has different length
    - Therefore, you have to handle various input length to group the multiple sequnece samples as a tensor
- You have to implement ``pack_collate`` which takes a raw batch from the dataset and groups it into a ``PackedSequence``
    - You don't need to know about ``PackedSequence`` now. It helps to implement an efficient computation for sequence with different lengths
- Implement two variables, following the description in the function
    - ``txts_in_idxs``, ``labels``
    - use ``self.converter`` 

In [None]:
import torch
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import PackedSequence, pad_packed_sequence, pack_sequence

class PackCollateWithConverter:
  def __init__(self, converter):
    self.converter = converter
  
  def __call__(self, batch):
    '''
    TODO: Declare variables word_sentences_in_idxs and label_tensor, following the description below
    Use self.converter to convert txts to txts_in_idxs

    word_sentences_in_idxs: A list of torch.LongTensor. Each element in a list is a sequence of integer, and the each integer represents a vocabulary index of word in a sentence.
                  i-th element of word_sentences_in_idxs corresponds to the i-th data sample in the batch  
    label_tensor: torch.FloatTensor with a shape of [len(batch)]. i-th value of the tensor represents the label of the i-th data sample in the batch (either 0.0 or 1.0)
    '''
    
    # Write your code from here
    word_sentences_in_idxs = []
    label_tensor = torch.FloatTensor([0])

    
    '''
    Leave the code below as it is
    '''
    assert isinstance(word_sentences_in_idxs, list), f"txts_in_idxs has to be a list, not {type(word_sentences_in_idxs)}"
    assert isinstance(word_sentences_in_idxs[0], torch.LongTensor), f"An elmenet of txts_in_idxs has to be a torch.LongTensor, not {type(word_sentences_in_idxs[0])}"
    assert isinstance(label_tensor, torch.FloatTensor), f"labels has to be a torch.FloatTensor, not {type(label_tensor)}"
    assert label_tensor[-1] == batch[-1][1], "i-th element of labels has to be "
    
    packed_sequence = pack_sequence(word_sentences_in_idxs, enforce_sorted=False)

    return packed_sequence, label_tensor

pack_collate = PackCollateWithConverter(converter)
# Test the code
train_loader = DataLoader(trainset, batch_size=32, collate_fn=pack_collate, shuffle=True)
batch = next(iter(train_loader))

print('A batch looks like this: ', batch)

### Preparation: Define Model
- You don't have to change this code, or try to understand how this GRU model works at the current stage

In [None]:
class SentimentModel(nn.Module):
  def __init__(self,vocab_size, hidden_size=128, num_layers=3, ):
    super().__init__()
    self.word_embedding = nn.Embedding(vocab_size, hidden_size)
    self.gru = nn.GRU(hidden_size, hidden_size, num_layers=num_layers, bidirectional=True, dropout=0.3, batch_first=True)
    self.num_layers = num_layers
    self.hidden_size = hidden_size
    self.final_layer = nn.Linear(hidden_size*2, 1)
    
  def forward(self, x):
    x = PackedSequence(self.word_embedding(x.data), batch_sizes=x.batch_sizes, sorted_indices=x.sorted_indices, unsorted_indices=x.unsorted_indices)
    x, hidden = self.gru(x)
    pad_x, lens = pad_packed_sequence(x, batch_first=True)

    max_x = torch.stack([torch.max(pad_x[i, :lens[i]], dim=0)[0] for i in range(len(lens))], dim=0)

    # max_x = torch.max(pad_x, dim=0)[0]
    pred_logit = self.final_layer(max_x)[:,0]
    return torch.sigmoid(pred_logit)

# Test the model
model = SentimentModel(len(converter.idx2str))
batch = next(iter(train_loader))
x, y = batch
out = model(x)
out

## Problem 4: Implement Binary Cross Entropy Loss
- Without using ``torch.nn.BCELoss``
    - You can implement it with ``torch.log`` and ``torch.mean`` or ``atensor.mean()``

In [None]:
def get_binary_cross_entropy_loss(pred, target, eps=1e-8):
  '''
  pred (torch.FloatTensor): Prediction value for N samples 
                            Each element in the tensor is the output of torch.sigmoid, and has a value between 0 and 1
  target (torch.FloatTensor): Corresponding target value for N samples. 
                              Each element in the tensor has value of either 0 or 1
  eps (float): A small value to avoid log(0) error
  
  output: Mean of binary cross entropy of N samples
  
  TODO: Complete this function
  
  '''
  return

In [None]:
'''
Test your BCE loss function
Don't change the test cases
'''

test_pred_case = torch.Tensor([9.9894e-01, 2.2645e-03, 1.8131e-01, 8.0153e-03, 9.9972e-01, 1.0378e-03,
        9.9949e-01, 9.9967e-01, 6.4150e-03, 9.9912e-01, 9.9896e-01, 1.4350e-01,
        9.9896e-01, 2.1979e-02, 9.9976e-01, 4.5389e-03, 9.9906e-01, 1.0633e-02,
        9.9749e-01, 5.5501e-04, 7.0052e-04, 2.9509e-04, 3.2752e-04, 9.9940e-01,
        4.5912e-04, 9.9969e-01, 6.0225e-03, 9.9974e-01, 9.9907e-01, 9.9942e-01,
        4.0911e-01, 2.8850e-01])
test_target_case = torch.Tensor([1., 0., 0., 0., 1., 0., 1., 1., 0., 1., 1., 1., 1., 0., 1., 0., 1., 0.,
        1., 0., 0., 0., 0., 1., 0., 1., 0., 1., 1., 1., 0., 0.])

your_result = get_binary_cross_entropy_loss(test_pred_case, test_target_case)

'''
The value can be little different because of epsilon value used for torch.log
'''
print(f"BCE Loss by torch.nn.BCELoss is {torch.nn.BCELoss()(test_pred_case, test_target_case)} and your BCE loss is {your_result}")


### Problem 5: Complete Training Loop
- In this problem, you have to implement a Trainer class
    - It contains everything you need to train a neural network model
    - model, optimizer, loss function, train loader, validation loader, and device (cuda or cpu)
- IMPORTANT
    - Select proper ``batch_size`` for ``train_loader``, ``validation_loader``, ``test_loader``

- Complete ``self._get_accuracy()``
  - If the prediction is larger than 0.5, you can regard it as positive sentiment

- Complete ``self._get_loss_and_acc_from_single_batch()``
    - using ``self._get_accuracy()``

- Complete ``_train_by_single_batch``
    - using ``self._get_loss_and_acc_from_single_batch()``
    - You can test it on the cell below

- Complete ``validate``
    - Implement it with **preventing gradient calculation** to reduce memory usage
        - Use ``with torch.no_grad():`` or ``with torch.inference_mode():``
    

In [None]:
from tqdm.auto import tqdm

class Trainer:
  def __init__(self, model, optimizer, loss_fn, train_loader, valid_loader, device):
    self.model = model
    self.optimizer = optimizer
    self.loss_fn = torch.nn.BCELoss()
    self.train_loader = train_loader
    self.valid_loader = valid_loader
    
    self.model.to(device)
    
    self.best_valid_accuracy = 0
    self.device = device
    
    self.training_loss = []
    self.training_acc = []
    self.validation_loss = []
    self.validation_acc = []

  def save_model(self, path='imdb_sentiment_model.pt'):
    torch.save({'model':self.model.state_dict(), 'optim':self.optimizer.state_dict()}, path)
    
  def train_by_num_epoch(self, num_epochs):
    for epoch in range(num_epochs):
      self.model.train()
      for batch in tqdm(self.train_loader, leave=False):
        loss_value, acc = self._train_by_single_batch(batch)
        self.training_loss.append(loss_value)
        self.training_acc.append(acc)

      self.model.eval()
      validation_loss, validation_acc = self.validate()
      self.validation_loss.append(validation_loss)
      self.validation_acc.append(validation_acc)
      
      print(f"Epoch {epoch+1}, Training Loss: {loss_value:.4f}, Training Acc: {acc:.4f}, Validation Loss: {validation_loss:.4f}, Validation Acc: {validation_acc:.4f}")
      if validation_acc > self.best_valid_accuracy:
        print(f"Saving the model with best validation accuracy: Epoch {epoch+1}, Acc: {validation_acc:.4f} ")
        self.save_model('imdb_sentiment_model_best.pt')
      else:
        self.save_model('imdb_sentiment_model_last.pt')
      self.best_valid_accuracy = max(validation_acc, self.best_valid_accuracy)

  def _get_accuracy(self, pred, target, threshold=0.5):
    '''
    This method calculates accuracy for given prediction and target
    
    input:
      pred (torch.Tensor): Prediction value for a given batch
      target (torch.Tensor): Target value for a given batch
      threshold (float): Threshold value for deciding whether the prediction is positive or negative. Default value is 0.5
      
    output: 
      accuracy (float): Mean Accuracy value for every sample in a given batch
    
    TODO: Complete this method using all the input arguments
    '''

    return

  def _get_loss_and_acc_from_single_batch(self, batch):
    '''
    This method calculates loss value for a given batch

    batch (tuple): (batch_of_input_text, batch_of_label)

    You have to use variables below:
    self.model (SentimentModel/torch.nn.Module): A neural network model
    self.loss_fn (function): function for calculating BCE loss for a given prediction and target
    self.device (str): 'cuda' or 'cpu'
    self._get_accuracy (function): function for calculating accuracy for a given prediction and target

    output: 
      loss (torch.Tensor): Mean binary cross entropy value for every sample in the training batch
      acc (float): Accuracy for the given batch
    # CAUTION! The output loss has to be torch.Tensor that is backwardable, not a float value or numpy array

    TODO: Complete this method
    '''

    return
      
  def _train_by_single_batch(self, batch):
    '''
    This method updates self.model's parameter with a given batch
    
    batch (tuple): (batch_of_input_text, batch_of_label)
    
    You have to use methods and variables below:

    self._get_loss_and_acc_from_single_batch (function): function for calculating loss value for a given batch    
    self.optimizer (torch.optim.adam.Adam): Adam optimizer that optimizes model's parameter

    output: 
      loss (float): Mean binary cross entropy value for every sample in the training batch
      acc (float): Mean accuracy for the given batch
    The model's parameters, optimizer's steps has to be updated inside this method

    TODO: Complete this method 
    '''
    return


    
  def validate(self, external_loader=None):
    '''
    This method calculates accuracy and loss for given data loader.
    It can be used for validation step, or to get test set result
    
    input:
      data_loader: If there is no data_loader given, use self.valid_loader as default.
      
    
    output: 
      validation_loss (float): Mean Binary Cross Entropy value for every sample in validation set
      validation_accuracy (float): Mean Accuracy value for every sample in validation set
      
    Use these methods:
      self._get_loss_and_acc_from_single_batch (function): function for calculating loss value for a given batch
    
    TODO: Complete this method 
    CAUTION: During validation, you can make it faster by not calculating gradient

    '''
    
    ### Don't change this part
    if external_loader and isinstance(external_loader, DataLoader):
      loader = external_loader
      print('An arbitrary loader is used instead of Validation loader')
    else:
      loader = self.valid_loader
      
    self.model.eval()
    
    '''
    Write your code from here, using loader, self.model, self.loss_fn.
    '''

    return 




"""
Don't change this part
"""
model = SentimentModel(len(converter.idx2str), hidden_size=128, num_layers=3)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
train_loader = DataLoader(trainset, batch_size=64, collate_fn=pack_collate, shuffle=True)
valid_loader = DataLoader(validset, batch_size=128, collate_fn=pack_collate, shuffle=False)
test_loader = DataLoader(testset, batch_size=128, collate_fn=pack_collate, shuffle=False)

trainer =  Trainer(model, optimizer, get_binary_cross_entropy_loss, train_loader, valid_loader, device='cuda')

#### Check the result
- Check your implementation works correctly
- Don't change the code below

In [None]:
"""
This code is to test trainer._train_by_single_batch

If your code is implemented correctly, the loss will go down for this specific batch
"""

trainer.model.train()
train_batch = next(iter(trainer.train_loader)) # get a batch from train_loader

loss_track = []
for _ in range(10):
  loss_value, acc = trainer._train_by_single_batch(train_batch) # test the trainer
  loss_track.append(loss_value)

assert isinstance(loss_value, float) and loss_value > 0,  "The return of trainer._train_by_single_batch has to be a single float value that is larger than 0"
print(f"Loss value for 10 repetition for the same training batch is  {[f'{loss:.4f}' for loss in loss_track]}")

In [None]:
"""
This code is to test trainer.validate
"""

short_valid_loader = DataLoader(short_validset, batch_size=50, collate_fn=pack_collate)

validation_loss, validation_acc = trainer.validate(short_valid_loader)
assert isinstance(validation_loss, float) and isinstance(validation_acc, float), "Both return value of trainer.validate has to be float"
assert validation_loss > 0, "Validation Loss has to be larger than 1"
assert 0 <= validation_acc <= 1, "Validation Acc has to be between 0 and 1"

print(f"Valid loss: {validation_loss}, Accuracy: {validation_acc}")

### Train the model with the completed Trainer
- In your report, attach the result of following cells and describe the training result and test result
    - Plot of training and validation loss/acc
    - Result of your model on test set

- [Optional] You can modify the code to train the model in different ways
    - optimizer, batch_size, model_size, num_epcohs, etc

In [None]:
trainer.train_by_num_epoch(5)

In [None]:
'''
Plot the result after the training
'''

plt.figure(figsize=(10,8))

plt.subplot(4,1,1)
plt.title("Training loss")
plt.plot(trainer.training_loss)

plt.subplot(4,1,2)
plt.title("Training accuracy")
plt.plot(trainer.training_acc)

plt.subplot(4,1,3)
plt.title("Validation loss by epoch")
plt.plot(trainer.validation_loss)

plt.subplot(4,1,4)
plt.title("Validation accuracy by epoch")
plt.plot(trainer.validation_acc)

In [None]:
'''
Get the test result
'''

test_loss, test_acc = trainer.validate(test_loader)

print(f"Test Loss: {test_loss}, Test Accuracy: {test_acc}")


### Paste your code to ``Assignment2.py`` file
- Copy and paste your completed code to ``Assignment2.py`` file
- You can check your code is correct by running the following cell
  - In my test, my last print was
    - Valid loss: 0.6931650042533875, Accuracy: 0.5
    - Epoch 1, Training Loss: 0.6802, Training Acc: 0.6875, Validation Loss: 0.6942, Validation Acc: 0.5100
    - Saving the model with best validation accuracy: Epoch 1, Acc: 0.5100 
    - Last 5 Training loss: [0.6907180547714233, 0.6797541379928589, 0.7054318189620972, 0.6722485423088074, 0.6801780462265015]

In [None]:
!python3 Assignment2.py

## Problem 6: Analyze the Prediction of the model
- In this problem, you have to anlayze the prediction of the model
    - You can select among the two models
        - Trainer is designed to save two models
        - ``imdb_sentiment_model_last.pt`` contains the model weights after the last training epoch
        - ``imdb_sentiment_model_best.pt`` contains the model weights after the training epoch with the best validation accuracy
     
- If you failed to train your model by solving the previous problems, you can download the model
- In your report, describe your analysis on how the trained model works on the text
    - What is the main criteria for model to decide whether the review is positive or negative?
    - When does it make mistakes? When does it make nice predictions? 
    - Does the converted input text has enough information to classify text compared to the original text?
        - Do you see any problems in tokenizing or using UNKNOWN?
- You can write the analysis by using only the Test Set samples, or using your own review texts
    


#### Download Model (if you have failed to trained your own)

In [None]:
'''
If you failed to train your own model, you can download the "imdb_sentiment_model_best_pretrained.pt" in CyberCampus and upload it to the colab or your workspace. 
'''


#### Load Model

In [None]:
model = SentimentModel(len(converter.idx2str), 128, 3) # You have to use the same model architecture as the one you used for training
your_model_pt_path = 'imdb_sentiment_model_last.pt' # or imdb_sentiment_model_best.pt, or imdb_sentiment_model_best_pretrained.pt etc
model.load_state_dict(torch.load(your_model_pt_path, map_location='cpu')['model']) # Or imdb_sentiment_model_best.pt

#### Check the largest error cases from the Test Set
- Following code will print out the error case with the largest errors
    - Or data sample with smallest error, with ``largest_loss=False``

In [None]:
nl = '\n'
def sort_data_idx_by_loss(model, data_loader, device='cuda'):
  assert isinstance(data_loader.sampler, torch.utils.data.sampler.SequentialSampler)
  model.eval()
  model.to(device)
  entire_loss = []
  entire_pred = []
  loss_fn = torch.nn.BCELoss(reduction='none')
  with torch.no_grad():
    for batch in tqdm(data_loader):
      x, y =batch
      pred = model(x.to(device))
      loss = loss_fn(pred, y.to(device))
      entire_loss += loss.tolist()
      entire_pred += pred.tolist()
  sorted_indices = sorted(range(len(entire_loss)),key=entire_loss.__getitem__)
  return sorted_indices, entire_loss, entire_pred

def print_top_k_loss_case(data_loader, sorted_indices, entire_loss, entire_pred, k=10, descending=True):
  if descending:
    sorted_indices = reversed(sorted_indices[-k:])
  else:
    sorted_indices = sorted_indices[:k]
  
  texts = []
  for i, idx in enumerate(sorted_indices):
    data_sample = data_loader.dataset[idx]
    conv_text = ' '.join(converter(converter(data_sample[0])))
    orig_text = read_txt(data_loader.dataset.paths[idx])
    texts.append({'converted': conv_text, 'original': orig_text})
    print(f" {i}. Sample index: {idx} - Loss: {entire_loss[idx]:.4f}, Model Prediction: {entire_pred[idx]:.4f}, Correct Label: {data_sample[1]} \
          {nl}  Converted Text: {conv_text} {nl}  Original Text: {orig_text} {nl}")
  return texts


'''
This will calculate loss for each sample in the test set
'''
sorted_indices, entire_loss, entire_pred =  sort_data_idx_by_loss(model, test_loader, device='cuda')


In [None]:
'''
This will print out top-k most incorrect prediction on test set
'''
texts = print_top_k_loss_case(test_loader, sorted_indices, entire_loss, entire_pred, k=10, descending=True)

In [None]:
'''
This will print out top-k most correct prediction on test set
'''

texts = print_top_k_loss_case(test_loader, sorted_indices, entire_loss, entire_pred, k=10, descending=False)

#### Test wit your own text input

In [None]:
def estimate_sentiment_of_given_txt(model, input_text):
  model.cpu()
  model.eval()
  tokenizer = trainset.tokenizer
  your_text_in_token = tokenizer(input_text)
  model_input = pack_sequence([torch.tensor(converter(your_text_in_token), dtype=torch.long)])
  prediction = model(model_input).squeeze()
  
  return prediction


your_text = """
    This movie is terrific
"""

estimate_sentiment_of_given_txt(model, your_text)