In [None]:
import torch
import re
import numpy as np
import random
import torch.nn as nn
import json
import csv
from transformers import BertPreTrainedModel, RobertaConfig
from transformers import RobertaTokenizer, RobertaForMaskedLM, RobertaForSequenceClassification, RobertaModel
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from torch.nn import CrossEntropyLoss
import torch.nn.functional as F
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score, cross_val_predict
from sklearn.linear_model import SGDClassifier
from sklearn.pipeline import make_pipeline
from sklearn.metrics import accuracy_score

ROOT_DIR = '~/NLI+KB/'
CACHE_DIR = '~/.cache/'
WINOWHY_PATH = 'datasets/winowhy/winowhy.jsonl'

In [None]:
# helper function: read and dump data
def dump_jsonl(data, output_path, append=False):
    """
    Write list of objects to a JSON lines file.
    """
    mode = 'a+' if append else 'w'
    with open(output_path, mode, encoding='utf-8') as f:
        for line in data:
            json_record = json.dumps(line, ensure_ascii=False)
            f.write(json_record + '\n')
    print('Wrote {} records to {}'.format(len(data), output_path))

def load_jsonl(input_path) -> list:
    """
    Read list of objects from a JSON lines file.
    """
    data = []
    with open(input_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.rstrip('\n|\r')))
    print('Loaded {} records from {}'.format(len(data), input_path))
    return data

In [None]:
from dataclasses import dataclass

class WinoWhySentence(object):
    sentence: str = None
    context: str = None
    wsc_sentence: str = None
    answer_reason: str = None
    reason: str = None
    label: int = 0
    wsc_id: int = 0
    fold_num: int = 1
    wsc_marked_sentence: str = None
    wsc_asked_sentence: str = None
        
def load_winowhy_from_path(filepath: str):
    ws = load_jsonl(filepath)
    winowhy_sentences = list()
    for w in ws:
        s = WinoWhySentence()
        s.sentence = w['sentence']
        s.context = w['context']
        s.wsc_sentence = w['wsc_sentence']
        s.answer_reason = w['answer_reason']
        s.reason = w['reason']
        s.label = w['label']
        s.wsc_id = w['wsc_id']
        s.fold_num = w['fold_num']
        s.wsc_marked_sentence = w['wsc_marked_sentence']
        s.wsc_asked_sentence = w['wsc_asked_sentence']
        winowhy_sentences.append(s)
    return winowhy_sentences

@dataclass
class ExpConfig(object):
    # JSONL file path
    dataset_path: str = ""
        
    winowhy_dataset_path: str = ""
        
    atomic_dataset_path: str = ""
        
    conceptnet_dataset_path: str = ""

    dataset: str = "winowhy"
    # Task description
    task_name: str = ""
    # Only using single GPU
    gpu_id: int = 0
    # Seed for random
    seed: int = 42
    # 'cpu', 'cuda'
    device: str = 'cpu' 
    # "roberta-base", "roberta-largbe"
    model_name: str = ""
    # If model_path is not None or not empty, load model from model_path instead of transformers' pretrained ones
    model_path: str = ""
    # For training the classifier layer
    learning_rate: float = 1e-3
    # Number of total epochs
    num_training_epochs: int = 15
    # Max sequence length
    max_seq_len: int = 128
        
    batch_size: int = 1

    def set_seed(self, new_seed = None):
        seed = self.seed if new_seed is None else new_seed
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(seed)

    def set_gpu_if_possible(self, gpu_id = None):
        if torch.cuda.is_available():
            self.device = 'cuda'
            if gpu_id is not None:
                self.device = 'cuda:{}'.format(gpu_id)
        else:
            self.device = 'cpu'
            
class RobertaClassificationHead(nn.Module):
    """Head for sentence-level classification tasks."""

    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = features[:, 0, :]  # take <s> token (equiv. to [CLS])
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x
    
class RobertaOnlyClassificationHead(BertPreTrainedModel):
    config_class = RobertaConfig
    base_model_prefix = "roberta"

    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.roberta = RobertaModel(config)
        self.classifier = RobertaClassificationHead(config)

    def forward(self, sequence_output):
        logits = self.classifier(sequence_output)
        return logits 


## RoBERTa + MNLI

In [None]:
def roberta_cross_entropy_for_winowhy_nli(wl_sentence: WinoWhySentence, model: RobertaForSequenceClassification, tokenizer: RobertaTokenizer, max_seq_len: int, device: str='cpu') -> float:

    MAX_SEQ_LEN = max_seq_len
    
    first = tokenizer.tokenize(wl_sentence.wsc_asked_sentence, add_prefix_space=True)
    second = tokenizer.tokenize(wl_sentence.reason, add_prefix_space=True)

    input_ids = tokenizer.convert_tokens_to_ids([tokenizer.cls_token] + first + [tokenizer.sep_token] + second + [tokenizer.sep_token])
    input_ids = input_ids[:MAX_SEQ_LEN]
    attention_mask = [1] * len(input_ids)
    
    input_ids += [1] * (MAX_SEQ_LEN - len(input_ids))
    attention_mask += [0] * (MAX_SEQ_LEN - len(attention_mask))
    
    input_ids = torch.tensor([input_ids]).to(device)
    attention_mask = torch.tensor([attention_mask]).to(device)
    
    with torch.no_grad():
        model.eval()
        outputs = model(input_ids, attention_mask=attention_mask)
        
    return torch.argmax(outputs[0]).to('cpu').item()

def roberta_for_winowhy(config: ExpConfig, roberta_cross_entropy_for_winowhy_nli):
    
    winowhy_sentences = load_winowhy_from_path(config.dataset_path)
        
    if config.model_path is not None and config.model_path != "":
        tokenizer = RobertaTokenizer.from_pretrained(config.model_path)
        model = RobertaForSequenceClassification.from_pretrained(config.model_path, num_labels=3)
    else:
        tokenizer = RobertaTokenizer.from_pretrained(config.model_name)
        model = RobertaForSequenceClassification.from_pretrained(config.model_name)
        
    model.eval()
    model.to(config.device)
    
    correct = 0
    
    #result = open(SAVE_DIR + config.model_name + ".csv", "w")
    #writer = csv.writer(result)
    #writer.writerow(["pred", "label", "ww_sentence", "wsc_id"])
    
    for ws in winowhy_sentences:
        pred = roberta_cross_entropy_for_winowhy_nli(ws, model, tokenizer, config.max_seq_len, config.device)
        if pred == 1:
            pred_str = "entailment"
        elif pred == 0:
            pred_str = "not entailment"
        elif pred == 2:
            pred_str = "not entailment"
        if (pred == 0 and ws.label == 0) or (pred == 1 and ws.label == 1) or (pred == 2 and ws.label == 0):
            correct += 1
            
        #writer.writerow([pred_str, ws.label, ws.sentence, ws.wsc_id])
        
    acc = correct / len(winowhy_sentences)

    return acc

In [None]:
for model_name in ['roberta-base-mnli', 'roberta-large-mnli']:
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        
    robertaconfig = ExpConfig()
    robertaconfig.set_seed()
    robertaconfig.set_gpu_if_possible(1)
    robertaconfig.dataset = 'winowhy'
    robertaconfig.dataset_path = ROOT_DIR + WINOWHY_PATH
    robertaconfig.task_name = 'Test on WinoWhy'
    robertaconfig.model_name = model_name
    robertaconfig.model_path = CACHE_DIR + model_name
    
    print('\n================================')
    print('Experiment: {} using {}'.format(robertaconfig.task_name, robertaconfig.model_name))
    
    scores = roberta_for_winowhy(robertaconfig, roberta_cross_entropy_for_winowhy_nli)
    
    print('Scores: {}'.format(scores))
    print('================================')

## Roberta+MNLI+ATOMIC (full set)

In [None]:
def getInput(wl_sentence: WinoWhySentence, atomic: dict, tokenizer: RobertaTokenizer, max_seq_len: int, topk: int, device: str='cpu'):
    MAX_SEQ_LEN = max_seq_len
    
    first = tokenizer.tokenize(wl_sentence.wsc_asked_sentence, add_prefix_space=True)
    second = tokenizer.tokenize(wl_sentence.reason, add_prefix_space=True)
    
    input_ids = torch.tensor([]).to(device).long()
    attention_mask = torch.tensor([]).to(device).long()
    for i in range(topk):
        a = tokenizer.tokenize(atomic['Overall'][i], add_prefix_space=True)
        input_ids_a = tokenizer.convert_tokens_to_ids([tokenizer.cls_token] + first + [tokenizer.sep_token] + a + [tokenizer.sep_token] + second + [tokenizer.sep_token])


        input_ids_a = input_ids_a[:MAX_SEQ_LEN]
        attention_mask_a = [1] * len(input_ids_a)
        
        input_ids_a += [1] * (MAX_SEQ_LEN - len(input_ids_a))
        attention_mask_a += [0] * (MAX_SEQ_LEN - len(attention_mask_a))

        input_ids_a = torch.tensor([input_ids_a]).to(device)
        attention_mask_a = torch.tensor([attention_mask_a]).to(device)
        
        input_ids = torch.cat((input_ids, input_ids_a), 0)
        attention_mask = torch.cat((attention_mask, attention_mask_a), 0)

    return input_ids, attention_mask

def roberta_cross_entropy_for_winowhy_atomic_nli(wl_sentence: WinoWhySentence, atomic: dict, model_1: RobertaModel, model_2: RobertaOnlyClassificationHead, tokenizer: RobertaTokenizer, max_seq_len: int, device: str='cpu') -> float:

    MAX_SEQ_LEN = 128
    input_ids, attention_mask = getInput(wl_sentence, atomic, tokenizer, MAX_SEQ_LEN, 10, device)
    
    with torch.no_grad():
        model_1.eval()
        model_2.eval()
        outputs = model_1(input_ids, attention_mask=attention_mask)[0].mean(dim=0, keepdim=True)
        logits = model_2(outputs)
        loss = F.softmax(logits, dim = 1)
    return torch.argmax(logits).to('cpu').item(), loss[0][0].item(), loss[0][1].item(), loss[0][2].item()

def roberta_for_winowhy(config: ExpConfig, roberta_cross_entropy_for_winowhy_atomic_nli):
    
    winowhy_sentences = load_winowhy_from_path(config.winowhy_dataset_path)
    
    ff = open(config.atomic_dataset_path) 
    atomic_sentences = json.loads(ff.read())
    ff.close()
        
    if config.model_path is not None and config.model_path != "":
        tokenizer = RobertaTokenizer.from_pretrained(config.model_path)
        model_1 = RobertaModel.from_pretrained(config.model_path)
        model_2 = RobertaOnlyClassificationHead.from_pretrained(config.model_path, num_labels=3)
    else:
        tokenizer = RobertaTokenizer.from_pretrained(model_name)
        model_1 = RobertaModel.from_pretrained(config.model_path)
        model_2 = RobertaOnlyClassificationHead.from_pretrained(config.model_path, num_labels=3)
        
    model_1.eval()
    model_1.to(config.device)
    
    model_2.eval()
    model_2.to(config.device)

    correct = 0
    
    #result = open(SAVE_DIR + config.model_name + ".csv", "w")
    #writer = csv.writer(result)
    #writer.writerow(["pred", "label", "ww_sentence", "wsc_id"])
    
    for i in range(len(winowhy_sentences)):
        pred, contradict_logits, entail_logits, neutral_logits = roberta_cross_entropy_for_winowhy_atomic_nli(winowhy_sentences[i], atomic_sentences[str(i)], model_1, model_2, tokenizer, config.max_seq_len, config.device)
        if pred == 1:
            pred_str = "entailment"
        elif pred == 0:
            pred_str = "not entailment"
        elif pred == 2:
            pred_str = "not entailment"
        if (pred == 0 and winowhy_sentences[i].label == 0) or (pred == 1 and winowhy_sentences[i].label == 1) or (pred == 2 and winowhy_sentences[i].label == 0):
            correct += 1
        #writer.writerow([pred_str, winowhy_sentences[i].label, winowhy_sentences[i].sentence])
    acc = correct / len(winowhy_sentences)
    return acc

In [None]:
KB_DIR = "kb_extract/winowhy_atomic/"
for model_name in ['roberta-base-mnli', 'roberta-large-mnli']:
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        
    robertaconfig = ExpConfig()
    robertaconfig.set_seed()
    robertaconfig.set_gpu_if_possible(1)
    robertaconfig.dataset = 'winowhy'
    robertaconfig.winowhy_dataset_path = ROOT_DIR + WINOWHY_PATH
    robertaconfig.atomic_dataset_path = ROOT_DIR + KB_DIR + model_name + "-atomic.json"
    robertaconfig.task_name = 'Test on WinoWhy'
    robertaconfig.model_name = model_name
    robertaconfig.model_path = CACHE_DIR + model_name
    
    print('\n================================')
    print('Experiment: {} using {}'.format(robertaconfig.task_name, robertaconfig.model_name))
    
    scores = roberta_for_winowhy(robertaconfig, roberta_cross_entropy_for_winowhy_atomic_nli)
    
    print('Scores: {}'.format(scores))
    print('================================')

## Roberta+MNLI+ConceptNet (full set)

In [None]:
def getInput(wl_sentence: WinoWhySentence, conceptnet: dict, tokenizer: RobertaTokenizer, max_seq_len: int, topk: int, device: str='cpu'):
    MAX_SEQ_LEN = max_seq_len
    
    first = tokenizer.tokenize(wl_sentence.wsc_asked_sentence, add_prefix_space=True)
    second = tokenizer.tokenize(wl_sentence.reason, add_prefix_space=True)
    
    input_ids = torch.tensor([]).to(device).long()
    attention_mask = torch.tensor([]).to(device).long()
    for i in range(topk):
        a = tokenizer.tokenize(conceptnet['Overall'][i], add_prefix_space=True)
        
        input_ids_a = tokenizer.convert_tokens_to_ids([tokenizer.cls_token] + first + [tokenizer.sep_token] + a + [tokenizer.sep_token] + second + [tokenizer.sep_token])
        input_ids_a = input_ids_a[:MAX_SEQ_LEN]
        attention_mask_a = [1] * len(input_ids_a)
        
        input_ids_a += [1] * (MAX_SEQ_LEN - len(input_ids_a))
        attention_mask_a += [0] * (MAX_SEQ_LEN - len(attention_mask_a))

        input_ids_a = torch.tensor([input_ids_a]).to(device)
        attention_mask_a = torch.tensor([attention_mask_a]).to(device)
        
        input_ids = torch.cat((input_ids, input_ids_a), 0)  
        attention_mask = torch.cat((attention_mask, attention_mask_a), 0)

    return input_ids, attention_mask

def roberta_cross_entropy_for_winowhy_conceptnet_nli(wl_sentence: WinoWhySentence, conceptnet: dict, model_1: RobertaModel, model_2: RobertaOnlyClassificationHead, tokenizer: RobertaTokenizer, max_seq_len: int, device: str='cpu') -> float:

    MAX_SEQ_LEN = 128
    input_ids, attention_mask = getInput(wl_sentence, conceptnet, tokenizer, MAX_SEQ_LEN, 10, device)
    
    with torch.no_grad():
        model_1.eval()
        model_2.eval()
        outputs = model_1(input_ids, attention_mask=attention_mask)[0].mean(dim=0, keepdim=True)
        logits = model_2(outputs)
        loss = F.softmax(logits, dim = 1)
    return torch.argmax(logits).to('cpu').item(), loss[0][0].item(), loss[0][1].item(), loss[0][2].item()

def roberta_for_winowhy(config: ExpConfig, roberta_cross_entropy_for_winowhy_conceptnet_nli):
    winowhy_sentences = load_winowhy_from_path(config.winowhy_dataset_path)
    
    ff = open(config.conceptnet_dataset_path) 
    conceptnet_sentences = json.loads(ff.read())
    ff.close()
        
    if config.model_path is not None and config.model_path != "":
        tokenizer = RobertaTokenizer.from_pretrained(config.model_path)
        model_1 = RobertaModel.from_pretrained(config.model_path)
        model_2 = RobertaOnlyClassificationHead.from_pretrained(config.model_path, num_labels=3)
    else:
        tokenizer = RobertaTokenizer.from_pretrained(model_name)
        model_1 = RobertaModel.from_pretrained(config.model_path)
        model_2 = RobertaOnlyClassificationHead.from_pretrained(config.model_path, num_labels=3)
        
    model_1.eval()
    model_1.to(config.device)
    
    model_2.eval()
    model_2.to(config.device)
    
    correct = 0
    
    #result = open(SAVE_DIR + config.model_name + ".csv", "w")
    #writer = csv.writer(result)
    #writer.writerow(["pred", "label", "ww_sentence", "wsc_id"])
    
    for i in range(len(winowhy_sentences)):
        pred, contradict_logits, entail_logits, neutral_logits = roberta_cross_entropy_for_winowhy_conceptnet_nli(winowhy_sentences[i], conceptnet_sentences[str(i)], model_1, model_2, tokenizer, config.max_seq_len, config.device)
        if pred == 1:
            pred_str = "entailment"
        elif pred == 0:
            pred_str = "not entailment"
        elif pred == 2:
            pred_str = "not entailment"
        if (pred == 0 and winowhy_sentences[i].label == 0) or (pred == 1 and winowhy_sentences[i].label == 1) or (pred == 2 and winowhy_sentences[i].label == 0):
            correct += 1
        #writer.writerow([pred_str, winowhy_sentences[i].label, winowhy_sentences[i].sentence, winowhy_sentences[i].wsc_id])
        
    acc = correct / len(winowhy_sentences)
    return acc

In [None]:
KB_DIR = "kb_extract/winowhy_cn/"
for model_name in ['roberta-base-mnli', 'roberta-large-mnli']:
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        
    robertaconfig = ExpConfig()
    robertaconfig.set_seed()
    robertaconfig.set_gpu_if_possible(0)
    robertaconfig.dataset = 'winowhy'
    robertaconfig.winowhy_dataset_path = ROOT_DIR + WINOWHY_PATH
    robertaconfig.conceptnet_dataset_path = ROOT_DIR + KB_DIR + model_name + "-conceptnet.json"
    robertaconfig.task_name = 'Test on WinoWhy, predicting KB'
    robertaconfig.model_name = model_name
    robertaconfig.model_path = CACHE_DIR + model_name
    
    print('\n================================')
    print('Experiment: {} using {}'.format(robertaconfig.task_name, robertaconfig.model_name))
    
    scores = roberta_for_winowhy(robertaconfig, roberta_cross_entropy_for_winowhy_conceptnet_nli)
    
    print('Scores: {}'.format(scores))
    print('================================')