In [None]:
!pip install transformers sentencepiece datasets

In [None]:
# Imports

import pandas as pd
import numpy as np
from pprint import pprint as pp
import os
import pdb
import torch
from torch.utils import data
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, classification_report
from torch.utils.data import DataLoader, TensorDataset
from torch.nn.functional import softmax
from datasets import load_dataset

In [None]:
# Create the Challenge Set

# Templates
Short_Term_Sents = [
  'Luke will eat dinner',
  'Mary\'s party will start',
  'our plane will be departing',
  'the servers will be back up',
  'dinner will be ready',
  'the coffee will be ready to drink',
  'our favorite breakfast restaurant opens',
  'the showing will begin',
  'Jeopardy is starting',
  'we will collect the exams',
  'the President is giving a speech',
  'class will end',
  'class will start',
  'the grocery store will close',
  'the webpage should finish loading',
  'we will eat our meal',
  'the race will commence',
  'commercial break is over',
  'the next song is ready to play',
  'the oven will be ready',
]

Long_Term_Sents = [
  'we will sell the house',
  'the park will re-open',
  'basketball season ends',
  'we will start attending outdoor concerts',
  'final grades are due',
  'Pixar plans to release a new film',
  'Kendrick\'s album is set to drop',
  'the next Presidential election is set to take place',
  'we will go visit Grandma',
  'the landlord will collect rent',
  'we will ride our bikes',
  'Apple will release the next iPhone',
  'the courtcase will begin',
  'a new bike shop is opening up',
  'Jake is planning to move',
  'Lindsey will pay off her loans',
  'summer solstice will take place',
  'the quarantine will be lifted',
  'baseball season starts',
  'my doctor\'s appointment is scheduled to take place',
]

# Challenge Set Generation Class
class ChallengeSet3():
  def __init__(self, shortSents, longSents):
    self.fixed = ['in']
    self.unfixed = ['before', 'after']
    self.prepositions = ['in', 'before', 'after']
    self.duration_numbers_1 = None
    self.duration_numbers_2 = None
    self.duration_words = ['second', 'minute', 'hour', 'day', 'week', 'month', 'year']
    self.shortSents = shortSents
    self.longSents = longSents
    self.secMinChallengeSet = self.generate_sec_min_challenge_set()
    self.minHourChallengeSet = self.generate_min_hr_challenge_set()
    self.hourDayChallengeSet = self.generate_hr_day_challenge_set()
    self.dayWeekChallengeSet = self.generate_day_week_challenge_set()
    self.weekMonthChallengeSet = self.generate_week_month_challenge_set()
    self.monthYearChallengeSet = self.generate_month_year_challenge_set()
    self.fullChallengeSet = self.generate_full_challenge_set()

  def get_duration(self, duration_word, duration_number):
    multipliers = {
      'second': 1,
      'minute': 60,
      'hour': 3600,
      'day':    86400,
      'week':   604800,
      'month':  2.628e+6,
      'year':   3.154e+7
    }
    duration = int(duration_number) * int(multipliers[duration_word])
    return duration

  def get_label(self, preposition_1, preposition_2,
                    duration_word_1, duration_word_2,
                    duration_number_1, duration_number_2):
    # Get durations in seconds
    duration_1 = self.get_duration(duration_word_1, duration_number_1)
    duration_2 = self.get_duration(duration_word_2, duration_number_2)

    label = 'None'
    # Conditional Evaluation of Times relative to prepositions
    if 'before' in preposition_1 and 'before' in preposition_2:
      if duration_1 < duration_2: label = 'Entailment'
      if duration_1 > duration_2: label = 'Neutral'
      if duration_1 == duration_2: label = 'Entailment'
    elif 'before' in preposition_1 and 'after' in preposition_2:
      if duration_1 < duration_2: label = 'Contradiction'
      if duration_1 > duration_2: label = 'Neutral'
      if duration_1 == duration_2: label = 'Contradiction'
    elif 'before' in preposition_1 and 'in' in preposition_2:
      if duration_1 < duration_2: label = 'Contradiction'
      if duration_1 > duration_2: label = 'Neutral'
      if duration_1 == duration_2: label = 'Contradiction'
    elif 'after' in preposition_1 and 'before' in preposition_2:
      if duration_1 < duration_2: label = 'Neutral'
      if duration_1 > duration_2: label = 'Contradiction'
      if duration_1 == duration_2: label = 'Contradiction'
    elif 'after' in preposition_1 and 'after' in preposition_2:
      if duration_1 < duration_2: label = 'Neutral'
      if duration_1 > duration_2: label = 'Entailment'
      if duration_1 == duration_2: label = 'Entailment'
    elif 'after' in preposition_1 and 'in' in preposition_2:
      if duration_1 < duration_2: label = 'Neutral'
      if duration_1 > duration_2: label = 'Contradiction'
      if duration_1 == duration_2: label = 'Contradiction'
    elif 'in' in preposition_1 and 'before' in preposition_2:
      if duration_1 < duration_2: label = 'Entailment'
      if duration_1 > duration_2: label = 'Contradiction'
      if duration_1 == duration_2: label = 'Contradiction'
    elif 'in' in preposition_1 and 'after' in preposition_2:
      if duration_1 < duration_2: label = 'Contradiction'
      if duration_1 > duration_2: label = 'Entailment'
      if duration_1 == duration_2: label = 'Contradiction'
    elif 'in' in preposition_1 and 'in' in preposition_2:
      if duration_1 < duration_2: label = 'Contradiction'
      if duration_1 > duration_2: label = 'Contradiction'
      if duration_1 == duration_2: label = 'Entailment'
    else:
      raise ValueError("Invalid Preposition (Prepositions given: {}, {})")


    return label

  def generate_set_triple(self,
                          preposition_1, preposition_2,
                          duration_word_1, duration_word_2,
                          duration_number_1, duration_number_2,
                          S):

    PP_1 = ' '.join([preposition_1, duration_number_1, duration_word_1+'s'])
    PP_2 = ' '.join([preposition_2, duration_number_2, duration_word_2+'s'])
    premise = ' '.join([PP_1, S])
    hypothesis = ' '.join([S, PP_2])

    # Get label
    label = self.get_label(preposition_1, preposition_2,
                  duration_word_1, duration_word_2,
                  duration_number_1, duration_number_2)

    durType = duration_word_1 + '_' + duration_word_2
    premFixed = ((preposition_1 in self.fixed) and (preposition_2 in self.unfixed))

    return [premise, hypothesis, label, durType, premFixed]

  def getClassDistribution(self, challengeSet):
    Neutral = [a for a in challengeSet if a[2] == 'Neutral']
    Contradiction = [a for a in challengeSet if a[2] == 'Contradiction']
    Entailment = [a for a in challengeSet if a[2] == 'Entailment']
    return Neutral, Contradiction, Entailment

  def setDurationNumbers(self, dur_nums_1, dur_nums_2):
    self.duration_numbers_1 = dur_nums_1
    self.duration_numbers_2 = dur_nums_2

  def generate_sec_min_challenge_set(self):
    # Set new range of numbers to use for comparing times (60 sec / min)
    times_1 = [str(60 * x) for x in range(1, 11, 1)]
    times_2 = [str(x) for x in range(1, 11, 1)]
    self.setDurationNumbers(times_1, times_2)

    secMinChallengeSet = []
    #for S in self.shortSents:
    for p1 in self.prepositions:
      for p2 in self.prepositions:
        for t1 in self.duration_numbers_1:
          for t2 in self.duration_numbers_2:
            S = np.random.choice(self.shortSents)
            triple = self.generate_set_triple(p1, p2, 'second', 'minute', t1, t2, S)
            secMinChallengeSet.append(triple)
    return secMinChallengeSet

  def generate_min_hr_challenge_set(self):
    # Set new range of numbers to use for comparing times (60 sec / min)
    times_1 = [str(60 * x) for x in range(1, 11, 1)]
    times_2 = [str(x) for x in range(1, 11, 1)]
    self.setDurationNumbers(times_1, times_2)

    minHourChallengeSet = []
    #for S in self.shortSents:
    for p1 in self.prepositions:
      for p2 in self.prepositions:
        for t1 in self.duration_numbers_1:
          for t2 in self.duration_numbers_2:
            S = np.random.choice(self.shortSents)
            triple = self.generate_set_triple(p1, p2, 'minute', 'hour', t1, t2, S)
            minHourChallengeSet.append(triple)
    return minHourChallengeSet

  def generate_hr_day_challenge_set(self):
    # Set new range of numbers to use for comparing times (60 sec / min)
    times_1 = [str(24 * x) for x in range(1, 11, 1)]
    times_2 = [str(x) for x in range(1, 11, 1)]
    self.setDurationNumbers(times_1, times_2)

    hourDayChallengeSet = []
    #for S in self.shortSents:
    for p1 in self.prepositions:
      for p2 in self.prepositions:
        for t1 in self.duration_numbers_1:
          for t2 in self.duration_numbers_2:
            S = np.random.choice(self.shortSents)
            triple = self.generate_set_triple(p1, p2, 'hour', 'day', t1, t2, S)
            hourDayChallengeSet.append(triple)
    return hourDayChallengeSet

  def generate_day_week_challenge_set(self):
    # Set new range of numbers to use for comparing times (60 sec / min)
    times_1 = [str(7 * x) for x in range(1, 11, 1)]
    times_2 = [str(x) for x in range(1, 11, 1)]
    self.setDurationNumbers(times_1, times_2)

    dayWeekChallengeSet = []
    #for S in self.shortSents:
    for p1 in self.prepositions:
      for p2 in self.prepositions:
        for t1 in self.duration_numbers_1:
          for t2 in self.duration_numbers_2:
            S = np.random.choice(self.longSents)
            triple = self.generate_set_triple(p1, p2, 'day', 'week', t1, t2, S)
            dayWeekChallengeSet.append(triple)
    return dayWeekChallengeSet

  def generate_week_month_challenge_set(self):
    # Set new range of numbers to use for comparing times (60 sec / min)
    times_1 = [str(int(4.34524 * x)) for x in range(1, 11, 1)]
    times_2 = [str(x) for x in range(1, 11, 1)]
    self.setDurationNumbers(times_1, times_2)

    weekMonthChallengeSet = []
    #for S in self.shortSents:
    for p1 in self.prepositions:
      for p2 in self.prepositions:
        for t1 in self.duration_numbers_1:
          for t2 in self.duration_numbers_2:
            S = np.random.choice(self.longSents)
            triple = self.generate_set_triple(p1, p2, 'week', 'month', t1, t2, S)
            weekMonthChallengeSet.append(triple)
    return weekMonthChallengeSet

  def generate_month_year_challenge_set(self):
    # Set new range of numbers to use for comparing times (60 sec / min)
    times_1 = [str(12 * x) for x in range(1, 11, 1)]
    times_2 = [str(x) for x in range(1, 11, 1)]
    self.setDurationNumbers(times_1, times_2)

    monthYearChallengeSet = []
    #for S in self.shortSents:
    for p1 in self.prepositions:
      for p2 in self.prepositions:
        for t1 in self.duration_numbers_1:
          for t2 in self.duration_numbers_2:
            S = np.random.choice(self.longSents)
            triple = self.generate_set_triple(p1, p2, 'month', 'year', t1, t2, S)
            monthYearChallengeSet.append(triple)
    return monthYearChallengeSet

  def generate_full_challenge_set(self):
    fullChallengeSet = []
    fullChallengeSet.extend(self.secMinChallengeSet)
    fullChallengeSet.extend(self.minHourChallengeSet)
    fullChallengeSet.extend(self.hourDayChallengeSet)
    fullChallengeSet.extend(self.dayWeekChallengeSet)
    fullChallengeSet.extend(self.weekMonthChallengeSet)
    fullChallengeSet.extend(self.monthYearChallengeSet)
    return fullChallengeSet

# Challenge Set 3 Initialization
CS3 = ChallengeSet3(Short_Term_Sents, Long_Term_Sents)
challengeSet3DF = pd.DataFrame(CS3.fullChallengeSet, columns=['premise', 'hypothesis', 'label', 'duration_type', 'prem_fixed'])
challengeSet3DF

In [None]:
# Set up components for modeling
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
if torch.cuda.is_available():
  device = torch.device('cuda')
else:
  device = torch.device('cpu')

class CustomDataset(data.Dataset):
    def __init__(self, dfObject):
        self.dfObject = dfObject  # Pandas dataframe

    def __len__(self):
        return self.dfObject.shape[0]

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        dfRows = self.dfObject.iloc[idx]
        dfPremise = dfRows['premise']
        dfHypothesis = dfRows['hypothesis']
        dfLabels = dfRows['label']

        return dfPremise, dfHypothesis, dfLabels


def CustomCollatFunc(data):

  premises = [elem[0] for elem in data]
  hypothesises = [elem[1] for elem in data]
  labels = [elem[2] for elem in data]
  encoded_result = modelTokenizer(premises, hypothesises, padding=True, truncation=True, max_length=64, return_tensors='pt', return_attention_mask=True)
  #pdb.set_trace()

  output =  {'input_ids': encoded_result['input_ids'],
                'attention_mask': encoded_result['attention_mask'],
                # 'token_type_ids': encoded_result['token_type_ids'],
                'labels': torch.tensor(labels)}
  
  return output


def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='macro')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

In [None]:
# Testing Challenge Set 3

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Modify challenge set for model as needed
challengeSet = challengeSet3DF

# Do not need for standard mnli dataset
challengeSet['label'].replace(to_replace='Entailment', value=1, inplace=True)
challengeSet['label'].replace(to_replace='Neutral', value=0, inplace=True)
challengeSet['label'].replace(to_replace='Contradiction', value=0, inplace=True)

trainMask = np.random.rand(len(challengeSet)) < 0.8
challengeTrainDataset = CustomDataset(challengeSet[trainMask])
challengeEvalDataset = CustomDataset(challengeSet[~trainMask])
challengeDataset = CustomDataset(challengeSet)

# Predict on UDS
print("Running challenge set 3 through model...\n")

# Change model name as necessaray
MODEL_NAME = '/content/drive/MyDrive/Models/roberta-large_uds_dur_lr_2e-05_wt_0.1_ws_122_hypo_only_False/checkpoint-6000'
modelTokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)

trainArgs = TrainingArguments(
    output_dir = './outputs',
    num_train_epochs = 2,
    evaluation_strategy = 'epoch',
    per_device_train_batch_size = 16,
    per_device_eval_batch_size = 128,
    learning_rate = 2e-5,
    weight_decay = 0.1,
    warmup_steps = 122,
    load_best_model_at_end = True,
    metric_for_best_model = 'eval_accuracy'
)

trainer = Trainer(
    model = model,
    args = trainArgs,
    data_collator = CustomCollatFunc,
    compute_metrics = compute_metrics
)

predictions = trainer.predict(challengeDataset)
predInds = predictions.predictions.argmax(-1)
trueInds = challengeSet['label'].values
print(classification_report(trueInds, predInds))


In [None]:
# Fine Tune UDS on CS3
print("Fine tuning on challenge set 3...")

trainArgs = TrainingArguments(
    output_dir = './outputs',
    num_train_epochs = 5,
    evaluation_strategy = 'epoch',
    per_device_train_batch_size = 16,
    per_device_eval_batch_size = 128,
    learning_rate = 2e-5,
    weight_decay = 0.1,
    load_best_model_at_end = True,
    metric_for_best_model = 'eval_accuracy'
)
trainer = Trainer(
    model = model,
    args = trainArgs,
    train_dataset = challengeTrainDataset,
    eval_dataset = challengeEvalDataset,
    data_collator = CustomCollatFunc,
    compute_metrics = compute_metrics,
    callbacks = [EarlyStoppingCallback(early_stopping_patience=2)]
)
trainer.train()

print("Fine tuned on challenge set 3, predictions:\n")
predictions = trainer.predict(challengeEvalDataset)
predInds = predictions.predictions.argmax(-1)
trueInds = challengeSet[~trainMask]['label'].values
print(classification_report(trueInds, predInds))