## Install packages

In [None]:
# !pip install transformers
# !pip install transformers[torch]
# !pip install accelerate -U
# !pip install transformers-interpret
# !!pip install transformers accelerate optimum

## Import necessary modules/packages

In [None]:
# Importing the libraries needed
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
from numpy import argmax,exp

import torch
from torch import tensor
from torch.utils.data import Dataset
import accelerate

import transformers
from transformers import DistilBertTokenizer
from transformers import DistilBertForSequenceClassification
from transformers import Trainer, TrainingArguments

from transformers_interpret import SequenceClassificationExplainer
from pathlib import Path
from optimum.pipelines import pipeline

import re

In [None]:
import warnings
warnings.filterwarnings("ignore")

## Set up filepaths and inputs

In [None]:
input_path = '../dist_sent_all/nature_sentences_data.csv'
default_model = 'distilbert-base-uncased-finetuned-sst-2-english'
model_all = "../dist_sent_all/new_distilbert/model_sentences"
model_plain = "../dist_sent_plain/new_distilbert/model_sentences"
tokenizer_model = 'distilbert-base-uncased-finetuned-sst-2-english'
masking = "plain"
N = 5
experiment = "distilbert_sent"

In [None]:
path_all = Path(model_all)
if masking == "all":
  path_plain = path_all
elif masking == "plain":
  path_plain = Path(model_plain)
elif masking == "default":
  path_plain = default_model

## Tokenizer

In [None]:
tokenizer = DistilBertTokenizer.from_pretrained(tokenizer_model)
tokenizer.save_pretrained(model_all)
tokenizer.save_pretrained(model_plain)

## Set up input data

In [None]:
df=pd.read_csv(input_path)

X_all = df['TEXT'].values
y_all = df['CATEGORY'].values

X_train, X_test, y_train, y_test = train_test_split(X_all, y_all, test_size=0.3, random_state=42, stratify=y_all)

## Declare useful functions

### Class: Set up data
### Function: softmax

In [None]:
class myDataset(Dataset):
  def __init__(self, encodings, labels, tokenizer):
    self.encodings = tokenizer(list(encodings), truncation=True, padding=True)
    self.labels = labels

  def __getitem__(self, idx):
    item = {key: tensor(val[idx]) for key, val in self.encodings.items()}
    item['labels'] = tensor(self.labels[idx])
    return item

  def __len__(self):
    return len(self.labels)

def softmax(vector):
  e = exp(vector)
  return e / e.sum()

### Function: Get words weights using ig

In [None]:
def get_words_weights(text, model, tokenizer):

  # gettoken weights using ig
  with torch.no_grad():
    cls_explainer = SequenceClassificationExplainer(
      model,
      tokenizer)
    word_attributions = cls_explainer(text, internal_batch_size=1)


  # find word weights from token weights
  weights_df = pd.DataFrame(word_attributions, columns =['Token', 'Weight'])
  weights_df['Count'] = 1

  last_word_index = 0
  for i in range(weights_df.shape[0]):
    if weights_df.Token[i].startswith("##"):
      weights_df.Token[last_word_index] = weights_df.Token[last_word_index] + weights_df.Token[i].replace("##", "")
      weights_df.Weight[last_word_index] += weights_df.Weight[i]
      weights_df.Count[last_word_index] += 1
      weights_df.Token[i] = "##"
    else:
      last_word_index = i

  weights_df = weights_df[weights_df.Token != "##" ]
  weights_df['Avg_Weight'] = weights_df['Weight']/weights_df['Count']

  return weights_df[['Avg_Weight','Token']]

### Function: Perform masking and return best sentence

In [None]:
def get_best_sentence(text, n, weights_df, indexes, initial_prob, label, trainertest, mask_filler):

  idx = indexes[n] if len(indexes)>1 else indexes
  
  word_n = weights_df.Token[idx]
  
  word_n = word_n.replace("(", "\(").replace(")", "\)").replace("+", "\+").replace("*", "\*").replace("[", "\[").replace("]", "\]")
  pattern = re.compile(word_n, re.IGNORECASE)
  text_n = pattern.sub("[MASK]", text, count = 1)
  cand_seqs = mask_filler(text_n, top_k=5)
  #print(cand_seqs)

  best_prob = initial_prob
  best_label = label
  best_i = -1
  best_sent = text

  #take candidate sentences
  for i in range(5):

    one_sent = cand_seqs[i]['sequence']
    # new prediction using trained classifier
    test_dataset = myDataset(np.array([one_sent], dtype='object'), np.array([0]), tokenizer)
    with torch.no_grad():
      outputs = trainertest.predict(test_dataset)
    predictions = outputs.predictions
    probabilities = softmax(predictions[0])
    pred_label = argmax(softmax(probabilities))
    
    # keep the sentence with the worst probability/closer to changing the class
    if (best_label == label) and (pred_label == label):
      if probabilities[pred_label] < best_prob:
        best_prob =  probabilities[pred_label]
        best_i = i
        best_sent = one_sent
    elif (best_label == label) and (pred_label != label):
      best_label = pred_label
      best_prob = probabilities[pred_label]
      best_i = i
      best_sent = one_sent
    elif (best_label != label) and (pred_label != label):
      if probabilities[pred_label] > best_prob:
        best_prob =  probabilities[pred_label]
        best_i = i
        best_sent = one_sent

  return best_i, best_label, best_prob, best_sent

### Function: Change class of input text using masking

In [None]:
def change_class(text, label, initial_prob, weights_df, trainertest, mask_filler, model, N = 5):

  indexes = weights_df['Avg_Weight'][1:-1].nlargest(n=N).index.values

  #using the most important word
  n=0
  examined_text = text
  examined_prob = initial_prob
  examined_word = 0
  #print(text)

  while (n<N):

    if examined_word>=len(indexes):
      break;
    

    best_i, best_label, best_prob, best_sent = get_best_sentence(examined_text, examined_word, weights_df,
                                                                indexes, examined_prob, label, trainertest, mask_filler)
    
    # if class has changed, return
    if best_label != label: return best_label, best_prob, best_sent, n

    # if no candidate sentence gets you closer to a class change, reject and continue with next word
    if best_i == -1:
      n += 1
      examined_word += 1
    # if class has not changed, but progress has been made
    # get word weights of the new best sentence and continue
    else:
      weights_df = get_words_weights(best_sent, model, tokenizer)
      indexes = weights_df['Avg_Weight'][1:-1].nlargest(n=N).index.values
      examined_text = best_sent
      examined_prob = best_prob
      n += 1
      examined_word = 0

  return best_label, best_prob, best_sent, n

## Load classification model

In [None]:
modeltest = DistilBertForSequenceClassification.from_pretrained(path_all)

training_arguments = TrainingArguments(evaluation_strategy='epoch',
                                       save_strategy='epoch',
                                       logging_strategy='epoch',
                                       log_level='critical',
                                       output_dir="./results",
                                       num_train_epochs=2,
                                       per_device_train_batch_size=8,
                                       per_device_eval_batch_size=8,
                                       warmup_steps=200,
                                       weight_decay=0.01,
                                       logging_dir="./logs",
                                       load_best_model_at_end=True)

trainertest = Trainer(model=modeltest, args=training_arguments)

## Load masking model

In [None]:
# new prediction using trained classifier
mask_filler = pipeline("fill-mask", path_plain, accelerator="bettertransformer")

In [None]:
torch.cuda.empty_cache() 

## Perform the workflow for all the scientific test subset

In [None]:
# only for scientific text
scient_test = X_test[y_test == 0]

results_list = []

for one_test in range(len(scient_test)):
  #print(one_test)
  try:
    text = scient_test[one_test]

    test_dataset = myDataset(np.array([text], dtype='object'), np.array([0]), tokenizer)
    outputs = trainertest.predict(test_dataset)
    predictions = outputs.predictions
    probabilities = softmax(predictions[0])
    pred_label = argmax(softmax(probabilities))
    #print(pred_label)

    #results = classifier(text)
    if pred_label != 0:
      continue;
    #print(text)
    text = text.replace("é", "e")
    weights_df = get_words_weights(text, modeltest, tokenizer)
    #print(text)
    best_label, best_prob, best_sent, n = change_class(text, pred_label, probabilities[pred_label], weights_df, path_all, path_plain, trainertest, mask_filler, modeltest, N = N)
    #print(text)
    if best_label != pred_label:
      best_prob = 1 - best_prob
    results_list.append([text, best_sent, n, probabilities[pred_label], best_prob, best_label])
    #if one_test % 10 == 0 :
      #print(one_test)
  except:
    continue;


df = pd.DataFrame(results_list, columns=['Original', 'New', 'n', 'old_prob', 'new_prob', 'new_class'])
output_path = experiment+ "_" + str(N) + "_" + masking + ".csv"
df.to_csv(output_path)