In [20]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoConfig

# pick the model you want to use
MODEL_NAME = "pranaydeeps/EXALT-Baseline"

# load the tokenizer and model
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)

In [21]:
from transformers_interpret import SequenceClassificationExplainer
import torch
from typing import Dict, List, Optional, Tuple, Union
from transformers import PreTrainedModel, PreTrainedTokenizer

class CustomSequenceClassificationExplainer(SequenceClassificationExplainer): # need custom explainer to handle xlm-roberta
    def __init__(
        self,
        model: PreTrainedModel,
        tokenizer: PreTrainedTokenizer,
        attribution_type: str = "lig",
        custom_labels: Optional[List[str]] = None,
    ):
        super().__init__(model, tokenizer)
        
    def _make_input_reference_token_type_pair(self, input_ids: torch.Tensor, sep_idx: int = 0
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Returns two tensors indicating the corresponding token types for the `input_ids`
        and a corresponding all zero reference token type tensor.
        Args:
            input_ids (torch.Tensor): Tensor of text converted to `input_ids`
            sep_idx (int, optional):  Defaults to 0.

        Returns:
            Tuple[torch.Tensor, torch.Tensor]
        """
        seq_len = input_ids.size(1)
        
        if self.model.config.model_type == 'xlm-roberta':
            token_type_ids = torch.zeros(seq_len, dtype=torch.int, device=self.device).expand_as(input_ids)
        else:
            token_type_ids = torch.tensor([0 if i <= sep_idx else 1 for i in range(seq_len)], device=self.device).expand_as(
                input_ids
            )
        ref_token_type_ids = torch.zeros_like(token_type_ids, device=self.device).expand_as(input_ids)

        return (token_type_ids, ref_token_type_ids)

In [22]:
from transformers_interpret import SequenceClassificationExplainer

# prepare the explainer
cls_explainer = CustomSequenceClassificationExplainer(model, tokenizer)

# run the explainer on an example
example_sentence = "My parents hated this ridiculous movie."

word_attributions = cls_explainer(example_sentence)

# the explainer outputs the attributions for each sub-token in the input
print("Sub-token attributions:", word_attributions)

# the attributions are specifically targetting the predicted class
print("Predicted class:",cls_explainer.predicted_class_name)

Sub-token attributions: [('<s>', 0.0), ('▁My', 0.0023662683176213025), ('▁parents', -0.0044839087437695165), ('▁hat', 0.13700055463100105), ('ed', 0.838185384402232), ('▁this', 0.4072319378334896), ('▁ridiculous', 0.2140027340738356), ('▁movie', 0.12540142859780076), ('▁', 0.1389692863864906), ('.', 0.1788222651295735), ('</s>', 0.0)]
Predicted class: LABEL_4


In [23]:
# you can separate the sub-tokens and attributions by zipping them
sub_tokens, attributions = zip(*word_attributions)

# you can print them out together again like this
for sub_token, attribution in zip(sub_tokens, attributions):
    print(sub_token, attribution)

<s> 0.0
▁My 0.0023662683176213025
▁parents -0.0044839087437695165
▁hat 0.13700055463100105
ed 0.838185384402232
▁this 0.4072319378334896
▁ridiculous 0.2140027340738356
▁movie 0.12540142859780076
▁ 0.1389692863864906
. 0.1788222651295735
</s> 0.0


These attributions still need some work, as some of them are assigned to empty tokens.
In addition, some emoji's or special character are slit into different sub-tokens
For that purpose, we provide the following function

In [24]:
def Clean_AttributionTokens(tokenized_text, attributions):
    """creates a vector of binary values to indicate whether a word is a trigger word or not (based on a predefined threshold)

    Args:
        normalized_scores (list): a list of numerical scores that have already been normalized (i.e., they sum to 1.0)
        threshold (float, optional): A lower bound for converting numerical values to a binary 1. Values below the threshold are converted to 0. Defaults to 0.2.

    Returns:
        list: a binary vector of 1s and 0s indicating whether a word is a trigger word or not
    """
    offset_mapping = tokenizer(tokenized_text, return_offsets_mapping=True)["offset_mapping"]
    #print(offset_mapping)
    final_attributions = {}

    # to ensure the same mapping, we need to find the indices of the spaces (which are token+1)
    space_indices = [] # counting first character as a space because otherwise the first token will be skipped

    # keep track of the space indices
    for char_index, character in enumerate(tokenized_text + " "):  # add a space to capture the final token
        #print(char_index, character)
        if character.isspace():
            #print("Space found", char_index)
            space_indices.append(char_index)

    # not very effective to run over ALL mappings for EACH token, but it works
    # for each space (i.e. token), find the corresponding sub-tokens and sum the attributions based on character
    for i, space_index in enumerate(space_indices, start =0):
        final_attributions[i] = 0
        for tokenindex, mapping in enumerate(offset_mapping):
            begin_index = mapping[0]
            end_index = mapping[1]
            if begin_index == 0 and end_index == 0: # ignore BoS and EoS tokens
                continue
            elif i == 0: # special treatment because there is no previous token for the first token
                if space_index >= end_index: # any sub-tokens before space index (token delimiter) are concatenated 
                    final_attributions[i] += attributions[tokenindex]
            
            else:
                if space_index >= end_index and begin_index >= space_indices[i-1]: # begin index > previous space index because otherwise importances will overlap
                    final_attributions[i] += attributions[tokenindex]
                elif space_index < begin_index:
                    break
    
    #print(final_attributions)
    final_outputs = []
    for key in final_attributions.keys():
        final_outputs.append(final_attributions[key])
    return tokenized_text.split(" "), final_outputs


In [25]:
print('Before')
# you can print them out together again like this
for sub_token, attribution in zip(sub_tokens, attributions):
    print(sub_token, attribution)

final_tokens, new_attributions = Clean_AttributionTokens(tokenized_text=example_sentence, attributions=attributions)
print("After")
for token, attribution in zip(final_tokens, new_attributions): 
    print(token, attribution)


Before
<s> 0.0
▁My 0.0023662683176213025
▁parents -0.0044839087437695165
▁hat 0.13700055463100105
ed 0.838185384402232
▁this 0.4072319378334896
▁ridiculous 0.2140027340738356
▁movie 0.12540142859780076
▁ 0.1389692863864906
. 0.1788222651295735
</s> 0.0
After
My 0.0023662683176213025
parents -0.0044839087437695165
hated 0.975185939033233
this 0.4072319378334896
ridiculous 0.2140027340738356
movie. 0.2643707149842913


As you can see, these attributions include negatives and positives. Negative attributions may make some sense for a binary classification task, but for multi-class classification, it is hard to know what that means.
Also, the total attribution for sentences does not necessarily add up to 1 or any specific value.
For our task, we normalize the scores per sentence so they have equal weight.

In [26]:
def Normalize_Attributions(attributions):
    """ Function to normalize attributions to sum to 1, ignoring negative attributions

    Args:
        attributions (list): numerical attribution scores for each sub-token as theiy are split by the transformer tokenizer

    Returns:
        list: normalized attributions that sum up to 1 for each sentence
    """
    added_non_negatives = 0
    non_negatives = []
    for attributionscore in attributions:
        if float(attributionscore) < 0:
            nonzeroscore = 0
        else:
            nonzeroscore = float(attributionscore)
            added_non_negatives += float(attributionscore)
        non_negatives.append(nonzeroscore)

    if added_non_negatives != 0: # if added values are zero, crashes due to zero division
        relative_contributions = [non_negative/added_non_negatives for non_negative in non_negatives] # default route
    else:
        relative_contributions = [0 for non_negative in non_negatives] # backup for zero division
    return relative_contributions

In [27]:
# using this function, we can normalize the attributions
new_attributions = Normalize_Attributions(attributions=new_attributions)
for token, attribution in zip(final_tokens, new_attributions): # get rid of <s> and </s> tokens
    print(token, attribution)


My 0.001270031222765881
parents 0.0
hated 0.5234049669479127
this 0.218570849343028
ridiculous 0.11486024302782911
movie. 0.1418939094584643


For one part of the Task, we evaluate on numerical values, meaning you do not need this step/function. However, for the other, we only take binary trigger word indicators.
This means that the numerical values have to be converted to binary values. This can be done with the following function:



In [28]:
def CreateBinaryVector(normalized_scores, threshold=0.2):
    """creates a vector of binary values to indicate whether a word is a trigger word or not (based on a predefined threshold)

    Args:
        normalized_scores (list): a list of numerical scores that have already been normalized (i.e., they sum to 1.0)
        threshold (float, optional): A lower bound for converting numerical values to a binary 1. Values below the threshold are converted to 0. Defaults to 0.2.

    Returns:
        list: a binary vector of 1s and 0s indicating whether a word is a trigger word or not
    """
    normalized_scores = normalized_scores
    vector = []
    for attribution in normalized_scores:
        if attribution >= threshold:
            vector.append(1)
        else:
            vector.append(0)
    return vector



In [29]:
binary_vector = CreateBinaryVector(new_attributions, threshold=0.1)
for token, attribution in zip(final_tokens, binary_vector):
    print(token, attribution)

My 0
parents 0
hated 1
this 1
ridiculous 1
movie. 1


In [30]:
# this can be combined into a single function
# BUT: beware that you may need additional cleaning steps or token merging depending on your tokenizer

def Vector_from_raw_attributions(inputstring, interpret_output, threshold=0.1):
    """ a combined function to normalize, clean, and create a binary vector from raw numerical attributions values for subtokens
    Args:
        inputstring (string): the input text string (from the column ["Texts"])
        interpret_output (Tuple): a Tuple containing the output from the importance attribution model (i.e., the raw attributions + the subtokens)
        threshold (float, optional): Minimal required importance to convert the numerical value to a binary 1. Defaults to 0.1.

    Returns:
        list: a vector of binary values indicating whether a token is a trigger word or not
    """
    sub_tokens, attribute_scores = zip(*interpret_output)
    tokenized_sample, attribute_scores = Clean_AttributionTokens(inputstring, attribute_scores)
    attribute_scores = Normalize_Attributions(attributions=attribute_scores)
    attribute_scores = CreateBinaryVector(attribute_scores, threshold)
    return tokenized_sample, attribute_scores

example_sentence = "My parents hated this ridiculous movie."
interpret_output = cls_explainer(example_sentence)

tokenized_sample, final_vector = Vector_from_raw_attributions(example_sentence, interpret_output)

for token, attribution in zip(tokenized_sample, final_vector): # get rid of <s> and </s> tokens
    print(token, attribution)

My 0
parents 0
hated 1
this 1
ridiculous 1
movie. 1


In [33]:
# to run this on the entire dataset
import pandas as pd

# pick the model you want to use
MODEL_NAME = "pranaydeeps/EXALT-Baseline"

# load the tokenizer and model
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)
cls_explainer = CustomSequenceClassificationExplainer(model, tokenizer)

traindata = pd.read_csv("data/exalt_trigger_train.tsv", sep="\t")
print(traindata)

predictions = []
for rowindex, row in traindata[:2].iterrows():
    tweet_text = row["Texts"]
    print(rowindex, tweet_text)
    interpret_output = cls_explainer(tweet_text)
    final_tokens, final_vector = Vector_from_raw_attributions(tweet_text, interpret_output)
    print(final_vector)
    print(len(final_vector) ==len(tweet_text.split(" ")))
    predictions.append(final_vector)


traindata = traindata.iloc[:2]
traindata["Labels"] = predictions # MAKE SURE TO NAME THIS COLUMN "Labels" FOR THE EVALUATION SCRIPT TO WORK
traindata.to_csv("data/exalt_trigger_train_predictions.tsv", sep="\t", index=False)




         ID                                              Texts  \
0      7168  @user I’m so happy you’ve found some success f...   
1     11762  @user Awww , thank you 😚 Well , nobody knows ....   
2     10854  @user Heheh . Me too actually ! Welcome ( back...   
3      7294  @user Hey hun just thought I’d show you how vi...   
4     10435  @user given the dreadful performance of my bat...   
...     ...                                                ...   
2995   7906  there isnt a day where im not deeply upset abo...   
2996   8809  @user Luckily , thanks to an old friend of min...   
2997   8031  I feel very lost and not sure about what Im doing   
2998   7456  Im gonna read the Captain Phasma novel and hop...   
2999   8783                      @user Ya its been tooooo long   

                                                 Labels  
0                        [0, 0, 1, 1, 0, 0, 0, 0, 0, 0]  
1         [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]  
2         [0, 0, 0, 0, 0, 0, 0, 1