# Test Your Model
The code below illustrates how to load the model trained and saved at notebook *cw2-train.ipynb*, and test the performance of the loaded model with held-out test data. 

**REMARK 1**: You should adjust the code below according to what components you have saved and how you have saved them.

**REMARK 2**: When the markers evaluate your model, they will use the code below (but replace the test data with some held-out data) to run the test. **Hence, make sure you can re-load your model and test its performance with the code below.**

**REMARK 3**: If you use embeddings to represent text, **DO NOT** include the embeddings file in your submitted file due to its huge size. Instead, specify which pre-trained embedding you want to use or provide a link for downloading it; the markers will download the embedding and run your code below to load the embeddings.



In [1]:
# NOTE! The model defined below MUST BE EXACTLY THE SAME as the one you used at training

import torch
import torch.nn as nn

class ConvRNN_Classifier(nn.Module):
    def __init__(self, vocab_size, embd_dim, hidden_dim, model_type, cls_num, pooler_type, filter_size_list, filter_num_list, dropout, gpu):
        super(ConvRNN_Classifier, self).__init__()
        assert model_type in ['rnn','lstm','bilstm','gru']
        assert pooler_type in ['max','avg']
        
        self.embd_dim = embd_dim
        self.hidden_dim = hidden_dim
        
        self.embedding = nn.Embedding(vocab_size, embd_dim)
        
        # rnn type
        if model_type == 'rnn':
            self.rnn = nn.RNN(hidden_size=hidden_dim, batch_first=True, input_size=embd_dim, dropout=dropout)
        elif model_type == 'lstm':
            self.rnn = nn.LSTM(hidden_size=hidden_dim, batch_first=True, input_size=embd_dim, dropout=dropout)
        elif model_type == 'bilstm':
            self.rnn = nn.LSTM(hidden_size=hidden_dim, batch_first=True, input_size=embd_dim, 
                               bidirectional=True, dropout=dropout)
        else: # model_type == 'gru'
            self.rnn = nn.GRU(hidden_size=hidden_dim, batch_first=True, input_size=embd_dim, 
                              dropout=dropout, bidirectional=True, num_layers=2)
        
        self.convs = self.build_convs(filter_size_list, filter_num_list, gpu)
        self.tanh = nn.Tanh()
        self.dropout = nn.Dropout(dropout)
        self.fc1 = nn.Linear(np.sum(filter_num_list), 96)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(96, 48)
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(48, cls_num)
        self.gpu = gpu
                
        self.relu = nn.ReLU()
        
        self.gpu = gpu
        if gpu: self.to('cuda')
            
    def build_convs(self, f_sizes, f_nums, gpu):
        convs = nn.ModuleList()
        for fs, fn in zip(f_sizes, f_nums):
            padding_size = fs-1
            m = nn.Conv1d(self.hidden_dim, fn, fs, stride=padding_size,padding=padding_size)
            if gpu: m.to('cuda')
            convs.append(m)
        return convs
        
    def get_conv_output(self, input_matrix, conv, gpu):
        # step 1: compute convolution 
        input_matrix = torch.transpose(input_matrix, 1, 2)
#         print('input_mat_conv', input_matrix.shape, 'self.embd_dim', self.embd_dim)
        assert input_matrix.shape[1] == self.hidden_dim
        if gpu:
            input_matrix = input_matrix.to('cuda')
        conv_output = conv(input_matrix)
        # step 2: pass through an activation function 
        conv_relu = self.tanh(conv_output)
        # step 3: max-over-time pooling
        maxp = nn.MaxPool1d(conv_relu.shape[2])
        maxp_output = maxp(conv_relu)
        return maxp_output
    
    def forward(self, input_matrix):
#         print('ip mat', input_matrix.shape)
        token_num = input_matrix.shape[1]
#         print('token_num', token_num)
        embedding = self.embedding(input_matrix)
        hidden_vecs = self.rnn(embedding)[0]
#         print('hidden', hidden_vecs.shape)
#         print(hidden_vecs)
        cnn_repr = torch.tensor([])
        if self.gpu: cnn_repr = cnn_repr.to('cuda')
        for cv in self.convs:
            cv_output = self.get_conv_output(hidden_vecs, cv, self.gpu)
            cnn_repr = torch.cat((cnn_repr, cv_output), dim=1)
        # print(cnn_repr.shape)
        
#         cnn_repr = torch.tensor([])
#         if self.gpu: cnn_repr = cnn_repr.to('cuda')
#         for cv in self.convs:
#             cv_output = self.get_conv_output(cnn_repr, cv, self.gpu)
#             cnn_repr = torch.cat((cnn_repr, cv_output), dim=1)
        
        after_dp = self.dropout(cnn_repr.squeeze())
        fc1_out = self.relu1(self.fc1(after_dp))
        fc2_out = self.relu2(self.fc2(fc1_out))
        logit = self.fc3(fc2_out)
        # the CrossEntropyLoss provided by pytorch includes softmax; so you do not need to include a softmax layer in your net
        return logit

In [2]:
# reconstruct your trained model from pickle

import pickle
def reconstruct_model(pickle_path):
    model = ConvRNN_Classifier(vocab_size=saved_model_dic['vocab_size'],
                               embd_dim=saved_model_dic['input_dim'],
                               hidden_dim=saved_model_dic['hidden_dim'],
                               model_type=saved_model_dic['model_type'],
                               cls_num=saved_model_dic['num_class'],
                               pooler_type=saved_model_dic['pooler'],
                               filter_size_list=saved_model_dic['filter_sizes'],
                               filter_num_list=saved_model_dic['filter_nums'],
                               dropout=saved_model_dic['dropout_rate'],
                               gpu=saved_model_dic['gpu'])
    saved_weights = saved_model_dic['neural_weights']
    model.load_state_dict(saved_weights)

    word_to_id = saved_model_dic['word_to_id']
    max_seq_length = saved_model_dic['max_seq_length']
    return model, word_to_id, max_seq_length

In [3]:
# use the reconstructed model to make predictions on the test data

import re

def word_extract(text):
    text = re.sub(r'(-)|(,)|(\.)', '',text)
    text = re.findall(r'[A-Za-z\d]+[\w^\']*', text.lower())
    text = [i.strip() for i in text]
    return ' '.join(text)

def pad_features(data_series, seq_length):

    padded_features = np.zeros((len(data_series.values), seq_length), dtype=int)
    for i, v in enumerate(data_series):
        padded_features[i, -len(v):] = np.array(v)[:seq_length]
    return list(padded_features)

def word_to_id_converter(word_mappings, data_series):
    id_data = []    
    for t in data_series:
        mapping = []
        for i in t.split():
            if i in word_mappings.keys():
                mapping.append(word_mappings[i])
            else:
                mapping.append(0)
        id_data.append(mapping)
    return id_data

def make_batch_prediction(batch, model, use_gpu=False):
    batch_logits = torch.tensor([])
    if use_gpu: 
        batch_logits = batch_logits.to('cuda')
    for i in range(batch.shape[0]):
        input_sents = torch.from_numpy(batch[i]).float()
        if use_gpu:
            input_sents = input_sents.to('cuda')
#         print(input_sents.unsqueeze(0).shape)
        logits = model(input_sents.unsqueeze(0).long())
        batch_logits = torch.cat( (batch_logits, logits) )
    return batch_logits.view(batch.shape[0],-1)

def test_trained_model(model, test_data, batch_size=32, gpu=False):
    with torch.no_grad(): # let pytorch know that no gradient should be computed
        model.eval() # let the model know that it in test mode, i.e. no gradient and no dropout
        predictions = []
        for idx in range(0,len(np.array(test_data)),batch_size):
            y_pred = make_batch_prediction(np.array(test_data)[idx:idx+batch_size], model, gpu)
            pred_labels = [np.argmax(entry) for entry in y_pred.cpu().detach().numpy()]
            predictions += pred_labels
    
    return predictions

In [5]:
# load sample test data
import pandas as pd
import numpy as np

## reconstruct_model
model, word_to_id, max_seq_len = reconstruct_model('cw2_crnn_custom_final.pickle')

test_data = pd.read_table('../coursework2_train.tsv')
test_data['joined_text'] = test_data['article_title'] + ' ' + test_data['sentence_text']
test_data['joined_text'] = test_data['joined_text'].apply(word_extract)
test_data['joined_text_id'] = word_to_id_converter(word_to_id, test_data['joined_text'].to_list())
test_data['joined_text_padded'] = pad_features(test_data['joined_text_id'], max_seq_len)

test_text = test_data['joined_text_padded'].tolist()[-2000:]
test_raw_labels = test_data['label'].tolist()[-2000:]

label_dic = {'non-propaganda':0, 'propaganda':1} 

test_labels = [label_dic[rl] for rl in test_raw_labels]

print('test data size', len(test_labels))

test_pred = test_trained_model(model, np.array(test_text), batch_size=32)

# test model
from sklearn.metrics import precision_recall_fscore_support,accuracy_score
pre, rec, f1, _ = precision_recall_fscore_support(test_labels, test_pred, average='macro')
print('macro-F1 on test data', f1)

  "num_layers={}".format(dropout, num_layers))


test data size 2000
macro-F1 on test data 0.928540098861077
