## Dependencies

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
import pandas as pd
import numpy as np
from transformers import AutoTokenizer, AutoModel
# import adam
from torch.optim import Adam
import evaluate

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
checkpoint = "yangheng/deberta-v3-base-absa"

## Data Loading

In [2]:
dataset = load_dataset('knowledgator/events_classification_biotech') 

You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this dataset from the next major release of `datasets`.


In [3]:
dataset["train"][1000]

{'title': 'Ultra Health Initiates Cannabis Coverage for Behavioral',
 'content': 'Ultra Health Initiates Cannabis Coverage for Behavioral Health Services\nNew Mexicos #1 cannabis company requests insurer confirmation to eliminate cannabis cost-sharing\nFebruary 21, 2022 12:47 ET\n| Source:\nUltra Health\nScottsdale, Arizona, UNITED STATES\nALBUQUERQUE, N.M., Feb.  21, 2022  (GLOBE NEWSWIRE) -- Ultra Health, New Mexicos #1 Cannabis Company, recently sent a\nletter\nto New Mexicos prominent health insurers and New Mexico state departments to seek confirmation from insurers for cannabis coverage as a behavioral health service.\nThe communication is a response to a recent law that eliminated all cost-sharing and any out-of-pocket costs for behavioral health services and medications.\nOn January 1, 2022,\nSenate Bill 317\nbecame effective to make mental and behavioral health services more affordable for New Mexicans.\nThe legislation expanded the definition of behavioral health services to 

In [None]:
data

In [None]:
data = pd.read_csv('Data/data.csv')
data.head()

In [None]:
MAX_LENGTH = 100
tokenizer = AutoTokenizer.from_pretrained(checkpoint)

In [None]:
# "['food' , 'place']"

def string2array(string):
    new_string = string[1:-1]
    new_string = new_string.strip()
    new_string = new_string.replace(" ", "")
    new_string = new_string.replace("'", "")
    return new_string.split(",")

In [None]:
out = string2array("['food' , 'place']")
print(type(out))
print(type(out[0]))

In [None]:
aspect_terms = data['Aspects'].tolist()
aspect_labels = []
# print(aspect_terms)
for item in aspect_terms:
    arr = string2array(item)
    for aspect in arr:
        aspect_labels.append(aspect)

aspect_labels = list(set(aspect_labels))
print(aspect_labels)

In [None]:
sentiment_labels = ['negative', 'positive', 'neutral']

In [None]:
def oneHot(aspect, sentiment):
    encoding = np.zeros((1,28))
    aspect_index = aspect_labels.index(aspect)*4
    encoding[0][aspect_index]= 1
    encoding[0][sentiment_labels.index(sentiment)+aspect_index+1] = 1
    return encoding

In [None]:
oneHot("place", "neutral")

In [None]:
def multiOneHot(aspects, sentiments):
    assert(len(aspects) == len(sentiments))
    encoding = np.zeros((1,28))
    for i in range(len(aspects)):
        aspect_index = aspect_labels.index(aspects[i])*4
        encoding[0][aspect_index]= 1
        encoding[0][sentiment_labels.index(sentiments[i])+aspect_index+1] = 1
    return encoding

In [None]:
multiOneHot(["price","place"],["positive","negative"])

## Label Making

In [None]:
classes = []
for aspect in aspect_labels:
    classes.append(aspect)
    for sentiment in sentiment_labels:
        classes.append(aspect + "_" + sentiment)

print(classes)
class2id = {classes[i]:i for i in range(len(classes))}
id2class = {i:classes[i] for i in range(len(classes))}
print(class2id)
print(id2class)

In [None]:
def vectorizedString2Array(arr):
    # now we have a list of strings like ["['food' , 'place']", "['food' , 'place']"]
    # we need to convert this into a list of lists like [['food', 'place'], ['food', 'place']]
    out = []
    for item in arr:
        out.append(string2array(item))
    return out

In [None]:
def combineClassesVectorized(aspects, sentiments):
    assert(type(aspects) == list)
    combined = []
    for example_idx in range(len(aspects)):
        aspect = aspects[example_idx]
        sentiment = sentiments[example_idx]
        combined.append([])
        for idx in range(len(aspect)):
            combined[example_idx].append(aspect[idx])
            combined[example_idx].append(aspect[idx] + "_" + sentiment[idx])
    return combined

In [None]:
def combineClasses(aspects, sentiments):
    assert(len(aspects) == len(sentiments))
    combined = []
    for idx in range(len(aspects)):
        combined.append(aspects[idx])
        combined.append(aspects[idx] + "_" + sentiments[idx])
    return combined

In [None]:
out = combineClassesVectorized([["price","place"],["price","place"]],[["positive","negative"],["positive","negative"]])
print(out)

### Datatset Pre Processing

In [None]:
# ds = SentimentDataset('sentiment.csv')
dataset = load_dataset('csv', data_files='sentiment.csv', split='train')
# create data splits with 0.8 and 0.2
dataset = dataset.train_test_split(test_size=0.2, seed=42)


In [None]:
dataset

In [None]:
out_classes = combineClasses(string2array(dataset["train"][0]["Aspects"]), string2array(dataset["train"][0]["Sentiment"]))
print(out_classes)

In [None]:
labels = [0. for i in range(len(classes))]
print(labels)

In [None]:
for label in out_classes:
        label_id = class2id[label]
        labels[label_id] = 1.

print(labels)

In [None]:
# def preprocess_function(examples):
#     combined_classes = combineClasses(vectorizedString2Array(examples["Aspects"]),vectorizedString2Array(examples["Sentiment"]))

#     for example in examples:
#         example["combined_classes"] = combined_classes
#         labels = [0. for i in range(len(classes))]
#         for label in example["combined_classes"]:
#             label_id = class2id[label]
#             labels[label_id] = 1.
#         example["labels"] = labels
        
#     # tokenize the examples
#     return tokenizer(examples["Reviews"], padding="max_length", truncation=True, max_length=MAX_LENGTH)
#     # return examples


In [None]:
def preprocess_function(example):
    text = example["Review"]
    combined_classes = combineClasses(string2array(example["Aspects"]), string2array(example["Sentiment"]))
    labels = [0. for i in range(len(classes))]
    for label in combined_classes:
        # print(label)
        label_id = class2id[label]
        labels[label_id] = 1.
    example["labels"] = labels
    example = tokenizer(text, truncation=True)
    
    return example
    # return tokenizer(example["Review"], padding="max_length", truncation=True, max_length=MAX_LENGTH)

In [None]:
from transformers import DataCollatorWithPadding
tokenized_dataset = dataset.map(preprocess_function, batched=False)

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
import evaluate
import numpy as np

clf_metrics = evaluate.combine(["accuracy", "f1", "precision", "recall"])

def sigmoid(x):
   return 1/(1 + np.exp(-x))

def compute_metrics(eval_pred):

   predictions, labels = eval_pred
   predictions = sigmoid(predictions)
   predictions = (predictions > 0.5).astype(int).reshape(-1)
   return clf_metrics.compute(predictions=predictions, references=labels.astype(int).reshape(-1))



## Training

### Model Instantiation

In [None]:
# from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer

# model = AutoModelForSequenceClassification.from_pretrained(
#    checkpoint, num_labels=len(classes),
#            id2label=id2class, label2id=class2id,
#                        problem_type = "multi_label_classification")


In [None]:
# import torchinfo
# torchinfo.summary(model)

In [None]:
# exp = 1
# base_dir = "runs/exp_" + str(exp)
# training_args = TrainingArguments(

#    output_dir=base_dir+"/model",
#    learning_rate=2e-5,
#    per_device_train_batch_size=3,
#    per_device_eval_batch_size=3,
#    num_train_epochs=2,
#    weight_decay=0.01,
#    evaluation_strategy="epoch",
#    save_strategy="epoch",
#    load_best_model_at_end=True,
#    logging_dir=base_dir+"/logs",
# )

# trainer = Trainer(

#    model=model,
#    args=training_args,
#    train_dataset=tokenized_dataset["train"],
#    eval_dataset=tokenized_dataset["test"],
#    tokenizer=tokenizer,
#    data_collator=data_collator,
#    compute_metrics=compute_metrics,
# )

# trainer.train()


### Save Trained Model

In [None]:
# save trained model

## Final Inference

In [None]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer
model_path = "runs/checkpoint-4102"
model = AutoModelForSequenceClassification.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)

In [None]:
text = "The final blow was when the waiter brought us the check before we had even finished dessert--never mind that the only reason we were taking a long time to finish the meal was because of the extreme delay in the service of our food."
inputs = tokenizer(text, return_tensors="pt")

In [None]:
with torch.no_grad():
    logits = model(**inputs).logits

print(logits)

In [None]:
threshold = 0.3
predictions = (torch.sigmoid(logits) > threshold).int()
print(predictions)
print([id2class[i] for i in range(len(classes)) if predictions[0][i] == 1])


In [None]:
def predict(text, model=model, threshold=0.3):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=MAX_LENGTH)
    with torch.no_grad():
        logits = model(**inputs).logits
    predictions = (torch.sigmoid(logits) > threshold).int()
    # print(text)
    #output cleanup
    for i in range(len(predictions)):
        # if any aspect is not predicted, its sentiments should not be predicted
        if i%4 == 0:
            if predictions[0][i] == 0:
                i = i+4
            else:
                # only one sentiment should be predicted if aspect is predicted
                continue
    outputs = [id2class[i] for i in range(len(classes)) if predictions[0][i] == 1]
    aspects = [item for item in outputs if "_" not in item]
    sentiments = [item.split("_")[1] for item in outputs if "_" in item]
    return aspects, sentiments

In [None]:
dataset["test"][0]

In [None]:
aspects, sentiments = predict(text)

In [None]:
print(aspects)
print(sentiments)

## Submission Dataset

In [None]:
def pred_without_tokenizer(tokens, model=model, threshold=0.3):
    with torch.no_grad():
        logits = model(**tokens).logits
    predictions = (torch.sigmoid(logits) > threshold).int()
    # print(text)

    #output cleanup
    for i in range(len(predictions)):
        # if any aspect is not predicted, its sentiments should not be predicted
        if i%4 == 0:
            if predictions[0][i] == 0:
                i = i+4
            else:
                # only one sentiment should be predicted if aspect is predicted
                continue
            


    outputs = [id2class[i] for i in range(len(classes)) if predictions[0][i] == 1]
    aspects = [item for item in outputs if "_" not in item]
    sentiments = [item.split("_")[1] for item in outputs if "_" in item]
    return aspects, sentiments

In [None]:
submission = pd.read_csv('sample_submission.csv')
submission.head()

In [None]:
sub_dataset = load_dataset('csv', data_files='sample_submission.csv', split='train')
sub_dataset

In [None]:
warnings = 0
for i in range(len(sub_dataset)):
    text = sub_dataset[i]["Review"]
    tokens = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=MAX_LENGTH)
    aspects, sentiments = pred_without_tokenizer(tokens, threshold=0.2)
    # print(text)
    # if length of aspects and sentiments are not same, raise a warning
    if len(aspects) != len(sentiments):
        print("Warning: Length of aspects and sentiments are not same, truncating")
        warnings += 1
        # drop the last item if length of aspects and sentiments are not same
        if len(aspects) > len(sentiments):
            aspects = aspects[:-1]
        else:
            sentiments = sentiments[:-1]
        print(aspects)
        print(sentiments)
    
    submission["Aspects"][i] = aspects
    submission["Sentiment"][i] = sentiments

print(warnings)

In [None]:
submission.head()

In [None]:
# add blank lists to the empty cells
submission.fillna(value="["",""]", inplace=True)
submission.head()

In [None]:
submission.to_csv('submission.csv', index=False)

## Transformer Interpret

In [None]:
from transformers_interpret import MultiLabelClassificationExplainer
cls_explainer = MultiLabelClassificationExplainer(model, tokenizer)
word_attributions = cls_explainer(sub_dataset[0]["Review"])


In [None]:
print(word_attributions)

In [None]:
import json

json.dumps(word_attributions)
# save json to file
with open('word_attributions.json', 'w') as f:
    json.dump(word_attributions, f)

## Occlusion Model

In [None]:
def make_occluded_samples(x, y):
    new_test_x = []
    new_test_y = []
    for i in range(len(x)):
        full_emb = x[i]
        label = y[i]
        new_test_x.append(full_emb)
        new_test_y.append(label)
        for j in range(len(full_emb)):
            # samples with one of chunks occluded
            emb_occluded = []
            for k in range(len(full_emb)):
                if(k==j):
                    emb_occluded.append(np.zeros(768))
                else:
                    emb_occluded.append(full_emb[k])

            new_test_x.append(emb_occluded)
            new_test_y.append(label)
    return new_test_x, new_test_y

In [None]:
new_test_x, new_test_y = make_occluded_samples(dataset["Review"], dataset["labels"])