In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments,RobertaTokenizer,RobertaForSequenceClassification, RobertaModel, RobertaConfig
from torch.utils.data import Dataset
import torch
import pandas as pd
from torch.utils.data import DataLoader
from tqdm import tqdm
import numpy as np
import torch.nn.functional as F
from torch import nn
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence
from sklearn.metrics import roc_auc_score
import pickle

## Optional: If model won't load from huggingface, this install fixes that

In [None]:
!pip install --upgrade "huggingface_hub[hf_xet]" transformers

## Demo for B

In [5]:
#load paths and word embeddings
train_path = '/content/drive/MyDrive/NLI/train.csv'
test_path = "/content/drive/MyDrive/NLI/test.csv"
glove_path = '/content/drive/MyDrive/NLI/glove.840B.300d.txt'

In [6]:
device = "cuda"

In [7]:
with open('/content/drive/MyDrive/NLI/vocab_and_glove_new.pkl', 'rb') as f:
    word2idx, final_embeddings = pickle.load(f)

In [None]:
#Used code for model from: https://github.com/dunesand/Text-Matching-based-on-ESIM-model/blob/master/esim_model.py
# Model proposed is a new model inspired from the ESIM model that got higher accuracies than base ESIM model
#Paper: https://arxiv.org/pdf/1609.06038v3

In [8]:
class ESIM(nn.Module):
    def __init__(self, hidden_size, embed_dim, linear_size, embedding_matrix, dropout=0.5):
        super().__init__()
        self.hidden_size = hidden_size
        self.embed_dim = embed_dim
        self.dropout_rate = dropout

        #embedding layer with embedding matrix created.
        num_words = embedding_matrix.shape[0] - 1
        self.embedding = nn.Embedding(num_words + 1, embed_dim, padding_idx=0)
        self.embedding.weight = nn.Parameter(torch.from_numpy(embedding_matrix).float(), requires_grad=True)
        self.embed_dropout = nn.Dropout(dropout)
        self.bn_embedding = nn.BatchNorm1d(embed_dim)

        #biLSTM encoder
        self.lstm1 = nn.LSTM(embed_dim, hidden_size, batch_first=True, bidirectional=True)

        #contextual encoding enhanced using self-attention
        self.self_attention = nn.MultiheadAttention(embed_dim=2 * hidden_size, num_heads=4, batch_first=True)

        #2 layer biLSTM used with dropout for regularisation
        self.lstm2 = nn.LSTM(hidden_size * 8, hidden_size, num_layers=2, batch_first=True,
                             bidirectional=True, dropout=dropout)
        # projection to match dimensions input is 8*hidden_size, output is 2*hidden_size.
        self.residual_proj = nn.Linear(hidden_size * 8, 2 * hidden_size)

        #gating layer is used to modulate the new representations
        self.attention_gate = nn.Linear(2 * hidden_size, 2 * hidden_size)

        #final classifier that takes [o1_rep, o2_rep, diff, prod] (16*hidden_size).
        self.classifier = nn.Sequential(
            nn.BatchNorm1d(16 * hidden_size),
            nn.Linear(16 * hidden_size, linear_size),
            nn.ELU(inplace=True),
            nn.BatchNorm1d(linear_size),
            nn.Dropout(dropout),
            nn.Linear(linear_size, linear_size),
            nn.ELU(inplace=True),
            nn.BatchNorm1d(linear_size),
            nn.Dropout(dropout),
            nn.Linear(linear_size, 1)
        )

        #mask value
        self.mask_val = -2**32 + 1.0

    def forward(self, sent1, sent2):
        #masks where tokens = 0
        mask1 = sent1.eq(0)
        mask2 = sent2.eq(0)

        #embedding shape = (batch, seq_len, embed_dim), dropout, and batch normalisation.
        x1 = self.embed_dropout(self.embedding(sent1))
        x2 = self.embed_dropout(self.embedding(sent2))
        x1 = self.bn_embedding(x1.transpose(1, 2)).transpose(1, 2)
        x2 = self.bn_embedding(x2.transpose(1, 2)).transpose(1, 2)

        #first biLSTM encoding with shape = (batch, seq_len, 2*hidden_size)
        o1, _ = self.lstm1(x1)
        o2, _ = self.lstm1(x2)

        #self-attention is applied
        o1_sa, _ = self.self_attention(o1, o1, o1)
        o2_sa, _ = self.self_attention(o2, o2, o2)
        o1 = o1 + o1_sa
        o2 = o2 + o2_sa

        #soft-attention alignments
        o1_aligned, o2_aligned = self.soft_attention_align(o1, o2, mask1, mask2)

        #gating applied
        gate1 = torch.sigmoid(self.attention_gate(o1))
        gate2 = torch.sigmoid(self.attention_gate(o2))
        o1_aligned = o1_aligned * gate1
        o2_aligned = o2_aligned * gate2

        #original and aligned representation concatenated with element-wise subtractions and multiplication
        o1_combined = torch.cat([o1, o1_aligned, self.submul(o1, o1_aligned)], dim=-1)
        o2_combined = torch.cat([o2, o2_aligned, self.submul(o2, o2_aligned)], dim=-1)

        #representations using the deeper LSTM
        o1_composed, _ = self.lstm2(o1_combined)
        #residual connections used to preserve lower level features
        o1_composed = o1_composed + self.residual_proj(o1_combined)

        o2_composed, _ = self.lstm2(o2_combined)
        o2_composed = o2_composed + self.residual_proj(o2_combined)

        #pooling on the aggregate representations, both with shape = (batch, 4*hidden_size)
        o1_rep = self.apply_pooling(o1_composed)
        o2_rep = self.apply_pooling(o2_composed)

        #absolute difference and element-wise features captured for additional pair features
        diff = torch.abs(o1_rep - o2_rep)
        prod = o1_rep * o2_rep

        #features concatenated with shape = (batch, 16*hidden_size)
        combined_rep = torch.cat([o1_rep, o2_rep, diff, prod], dim=-1)

        # Final classification.
        similarity = self.classifier(combined_rep)
        return similarity

    def soft_attention_align(self, x1, x2, mask1, mask2):
        #attention scores computed
        attention_scores = torch.matmul(x1, x2.transpose(1, 2))

        #masks then applied to attention scores
        mask1_val = mask1.float().masked_fill(mask1, self.mask_val)
        mask2_val = mask2.float().masked_fill(mask2, self.mask_val)

        #compute soft alignment weights and produce aligned representations.
        weight1 = F.softmax(attention_scores + mask2_val.unsqueeze(1), dim=-1)
        aligned_x1 = torch.matmul(weight1, x2)
        weight2 = F.softmax(attention_scores.transpose(1, 2) + mask1_val.unsqueeze(1), dim=-1)
        aligned_x2 = torch.matmul(weight2, x1)
        return aligned_x1, aligned_x2

    def submul(self, x1, x2):
        #element-wise subtractions and multiplication
        subtraction = x1 - x2
        multiplication = x1 * x2
        return torch.cat([subtraction, multiplication], dim=-1)

    def apply_pooling(self, x):
        #average and max pooling
        avg_pooled = F.avg_pool1d(x.transpose(1, 2), kernel_size=x.size(1)).squeeze(-1)
        max_pooled = F.max_pool1d(x.transpose(1, 2), kernel_size=x.size(1)).squeeze(-1)
        return torch.cat([avg_pooled, max_pooled], dim=-1)


In [9]:
#dataset reads the csv from filepath and returns the tokenised, indexed tensors of sentence pairs with the label
class NliDataset(Dataset):
    def __init__(self, filepath, word2idx, max_len=64):
        self.data = pd.read_csv(filepath)
        self.word2idx = word2idx
        self.max_len = max_len

    def encode(self, sentence):
        tokens = sentence.lower().split()
        ids = [self.word2idx.get(tok, self.word2idx['<unk>']) for tok in tokens]
        return torch.tensor(ids[:self.max_len], dtype=torch.long)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        sent1 = self.encode(row['premise'])
        sent2 = self.encode(row['hypothesis'])
        return sent1, sent2

    def __len__(self):
        return len(self.data)

def collate_fn(batch):
    #combines multiple samples into a batch, padding applied for consistent lengths
    s1, s2 = zip(*batch)
    s1 = pad_sequence(s1, batch_first=True, padding_value=0)
    s2 = pad_sequence(s2, batch_first=True, padding_value=0)
    return s1, s2

## Model loaded from path and set to evaluation mode, the test data loaded too

In [10]:
model = ESIM(hidden_size=300, embed_dim=300, linear_size=512, embedding_matrix=final_embeddings)
model.load_state_dict(torch.load("/content/drive/MyDrive/NLI/best_model_esim_improved_new_T_good.pt"))
model.to(device)
model.eval()
test_dataset = NliDataset(test_path, word2idx)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

In [11]:
def predict_on_dev(model, test_loader, device):
    model.eval()
    all_preds = []

    with torch.no_grad():
        for sent1, sent2 in test_loader:
            sent1, sent2 = sent1.to(device), sent2.to(device)

            logits = model(sent1, sent2)
            logits = logits.squeeze(-1)
            probs = torch.sigmoid(logits)
            preds = (probs >= 0.5).long()

            all_preds.extend(preds.cpu().numpy())

    return all_preds

In [12]:
test_preds = predict_on_dev(model, test_loader, device)

pred_df = pd.DataFrame({"prediction": test_preds})

pred_df.to_csv("/content/drive/MyDrive/NLI/test_predictions_B_new.csv", index=False)
print("Saved predictions to test_predictions_B.csv")

Saved predictions to test_predictions_B.csv


## Demo for C

In [3]:
tokeniser = RobertaTokenizer.from_pretrained("roberta-large")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/482 [00:00<?, ?B/s]

In [4]:
class ExplainableModel(nn.Module):
    #RoBERTa used to encode input sequences
    #span-based features using SIC obtained
    #interpretation used to weight the spans and generates explainable summary
    #prediction is a combination of most relevant spans
    def __init__(self, bert_dir):
        super().__init__()
        self.bert_config = RobertaConfig.from_pretrained(bert_dir, num_labels=2)
        self.intermediate = RobertaModel.from_pretrained(bert_dir)
        self.span_info_collect = SICModel(self.bert_config.hidden_size)
        self.interpretation = InterpretationModel(self.bert_config.hidden_size)
        self.output = nn.Linear(self.bert_config.hidden_size, 1)

    def forward(self, input_ids, start_indexs, end_indexs, span_masks):
        # generate mask
        attention_mask = (input_ids != 1).long()
        # intermediate layer
        hidden_states = self.intermediate(input_ids, attention_mask=attention_mask).last_hidden_state
        # span info collecting layer(SIC)
        h_ij = self.span_info_collect(hidden_states, start_indexs, end_indexs)
        # interpretation layer
        H, a_ij = self.interpretation(h_ij, span_masks)
        # output layer
        out = self.output(H)
        return out, a_ij


class SICModel(nn.Module):
    def __init__(self, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        #linear transformations for different span endpoints
        self.W_1 = nn.Linear(hidden_size, hidden_size) #start token
        self.W_2 = nn.Linear(hidden_size, hidden_size) #end token
        self.W_3 = nn.Linear(hidden_size, hidden_size) #start and end difference
        self.W_4 = nn.Linear(hidden_size, hidden_size) #start and end interaction

    def forward(self, hidden_states, start_indexs, end_indexs):
        # hidden_states: (batch_size, seq_len, hidden_size)
        # start_indexs / end_indexs: (batch_size, span_len)

        batch_size, seq_len, hidden_size = hidden_states.size()
        span_len = start_indexs.size(1)

        #transformations applied in batch
        W1_h = self.W_1(hidden_states)
        W2_h = self.W_2(hidden_states)
        W3_h = self.W_3(hidden_states)
        W4_h = self.W_4(hidden_states)

        #transformed embeddings collected for each span
        W1_hi_emb = [] #collect transformed start token embeddings
        W2_hj_emb = [] #collect transformed end token embeddings
        W3_hi_start_emb = [] #(h_i - h_j)
        W3_hi_end_emb = []
        W4_hj_start_emb = [] #(h_i * h_j)
        W4_hj_end_emb = []

        #iterate through each item in batch, select transformed hidden states
        #at span positions, this collects span representations per example
        for b in range(batch_size):
            si = start_indexs[b]
            ei = end_indexs[b]

            W1_hi_emb.append(W1_h[b].index_select(0, si))
            W2_hj_emb.append(W2_h[b].index_select(0, ei))
            W3_hi_start_emb.append(W3_h[b].index_select(0, si))
            W3_hi_end_emb.append(W3_h[b].index_select(0, ei))
            W4_hj_start_emb.append(W4_h[b].index_select(0, si))
            W4_hj_end_emb.append(W4_h[b].index_select(0, ei))

        #embeddings now stacked to give tensor shape = (batch_size, span_len, hidden_size)
        W1_hi_emb = torch.stack(W1_hi_emb)
        W2_hj_emb = torch.stack(W2_hj_emb)
        W3_hi_start_emb = torch.stack(W3_hi_start_emb)
        W3_hi_end_emb = torch.stack(W3_hi_end_emb)
        W4_hj_start_emb = torch.stack(W4_hj_start_emb)
        W4_hj_end_emb = torch.stack(W4_hj_end_emb)

        #combine span representations
        span = (
            W1_hi_emb +
            W2_hj_emb +
            (W3_hi_start_emb - W3_hi_end_emb) +
            torch.mul(W4_hj_start_emb, W4_hj_end_emb)
        )

        h_ij = torch.tanh(span)
        return h_ij

class InterpretationModel(nn.Module):
    #assign importance weights for each span
    #linear scoring layer for all span representations
    #span masks used to mask illegal spans
    #softmax to normalise attention weights
    #weighted average of spans gives final sentence representation (H)
    def __init__(self, hidden_size):
        super().__init__()
        self.h_t = nn.Linear(hidden_size, 1)

    def forward(self, h_ij, span_masks):
        o_ij = self.h_t(h_ij).squeeze(-1)  # (ba, span_num)
        # mask illegal span
        o_ij = o_ij - span_masks
        # normalize all a_ij, a_ij sum = 1
        a_ij = nn.functional.softmax(o_ij, dim=1)
        # weight average span representation to get H
        H = (a_ij.unsqueeze(-1) * h_ij).sum(dim=1)  # (bs, hidden_size)
        return H, a_ij


In [5]:
class NliDataset(Dataset):
    def __init__(self, csv_path):
        self.df = pd.read_csv(csv_path)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        return {
            "premise": str(row["premise"]),
            "hypothesis": str(row["hypothesis"])
        }

In [6]:
def collate(batch):
    input_ids_list = []
    start_idxs = []
    end_idxs = []
    span_masks = []
    max_len = 512

    #each example in the batch is looped through
    for item in batch:
        #premise and hypothesis are tokenised, also padded to the max length and truncated if greater than the max length
        enc =tokeniser(
            item["premise"],
            item["hypothesis"],
            return_tensors="pt",
            truncation=True,
            padding="max_length",
            max_length=max_len

        )

        input_ids = enc["input_ids"].squeeze(0)

        #length without padding is calculated
        token_len = (input_ids != tokeniser.pad_token_id).sum().item()

        #generate all possible spans of length 1: (i, i+1) for valid token positions
        span_start = torch.arange(1, token_len - 2)
        span_end = span_start + 1
        span_mask = torch.zeros_like(span_start).float()

        #pad span index and masks so they're the same length
        input_ids_list.append(input_ids)
        start_idxs.append(span_start)
        end_idxs.append(span_end)
        span_masks.append(span_mask)

    start_idxs = pad_sequence(start_idxs, batch_first=True)
    end_idxs = pad_sequence(end_idxs, batch_first=True)
    span_masks = pad_sequence(span_masks, batch_first=True)

    #return tokenised inputs, span start/end indexes, span masks and labels
    return(
        torch.stack(input_ids_list),
        start_idxs,
        end_idxs,
        span_masks
    )

## Load transformer model and fresh testset

In [10]:
model = ExplainableModel("roberta-large").to(device)
model.load_state_dict(torch.load("/content/drive/MyDrive/NLI/best_model_Transformer_Large_new.pt"))
model.to(device)
model.eval()
test_dataset = NliDataset(test_path)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, collate_fn=collate)


Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [11]:
def predict_on_dev_C(model, dev_loader, device):
    model.eval()
    all_preds = []

    with torch.no_grad():
        for input_ids, start_idxs, end_idxs, span_masks in dev_loader:

            input_ids = input_ids.to(device)
            start_idxs = start_idxs.to(device)
            end_idxs = end_idxs.to(device)
            span_masks = span_masks.to(device)

            logits, _ = model(input_ids, start_idxs, end_idxs, span_masks)
            probs = torch.sigmoid(logits).squeeze(-1)

            preds = (probs >= 0.5).long()

            all_preds.extend(preds.cpu().tolist())

    return all_preds


In [12]:
test_preds = predict_on_dev_C(model, test_loader, device)

pred_df = pd.DataFrame({"prediction": test_preds})

pred_df.to_csv("/content/drive/MyDrive/NLI/test_predictions_C.csv", index=False)
print("Saved predictions to test_predictions_C.csv")

Saved predictions to test_predictions_C.csv
