In [1]:
%%time 

import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import warnings

# Set random seeds for reproducibility
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True

warnings.filterwarnings('ignore')

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

CPU times: user 1.23 s, sys: 2.01 s, total: 3.24 s
Wall time: 726 ms


## DATABASE

In [2]:
from torch.optim import AdamW
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm.notebook import tqdm
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
from transformers import BertTokenizer
from torch.utils.data import DataLoader, Dataset

# Load and prepare data
class MovieReviewDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.data = dataframe
        self.max_len = max_len
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        review = self.data.iloc[index]
        inputs = self.tokenizer.encode_plus(
            review['Content'],
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            return_token_type_ids=True,
            truncation=True
        )
        input_ids = inputs['input_ids']
        attention_mask = inputs['attention_mask']
        label = 1 if review['Sentiment'] == 'Positive' else 0
        return {
            'input_ids': torch.tensor(input_ids, dtype=torch.long),
            'attention_mask': torch.tensor(attention_mask, dtype=torch.long),
            'labels': torch.tensor(label, dtype=torch.long)
        }

# Example data loading and preparation
data = []
folder_path = 'movies/docs'  # Adjust the path to your dataset folder

for filename in os.listdir(folder_path):
    if filename.endswith('.txt'):
        with open(os.path.join(folder_path, filename), 'r', encoding='utf-8') as file:
            content = file.read().replace('\n', ' ').replace("\'", "")
        sentiment = 'Negative' if filename.startswith('negR') else 'Positive'
        id = f"N{filename[5:8]}" if sentiment == 'Negative' else f"P{filename[5:8]}"
        data.append({'Content': content, 'Sentiment': sentiment, 'id': id})

# Convert to DataFrame
df = pd.DataFrame(data)

# Sort the DataFrame by 'id'
df = df.sort_values(by='id').reset_index(drop=True)

# Separate the dataset
train_neg = df[df['Sentiment'] == 'Negative'][:800]
val_neg = df[df['Sentiment'] == 'Negative'][800:900]
test_neg = df[df['Sentiment'] == 'Negative'][900:1000]

train_pos = df[df['Sentiment'] == 'Positive'][:800]
val_pos = df[df['Sentiment'] == 'Positive'][800:900]
test_pos = df[df['Sentiment'] == 'Positive'][900:1000]

# Concatenate the splits
train_df = pd.concat([train_neg, train_pos]).sample(frac=1).reset_index(drop=True)
val_df = pd.concat([val_neg, val_pos]).sample(frac=1).reset_index(drop=True)
test_df = pd.concat([test_neg, test_pos]).sample(frac=1).reset_index(drop=True)

# Create datasets and dataloaders
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
train_dataset = MovieReviewDataset(train_df.sample(frac=1, random_state=200), tokenizer, max_len=128)
val_dataset = MovieReviewDataset(val_df, tokenizer, max_len=128)
test_dataset = MovieReviewDataset(test_df, tokenizer, max_len=128)

batch_size = 128
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

## MODEL

In [None]:
class CNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout):
        super(CNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.convs = nn.ModuleList([nn.Conv2d(1, n_filters, (fs, embedding_dim)) for fs in filter_sizes])
        self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        embedded = self.embedding(text)  # [batch size, sent len, emb dim]
        embedded = embedded.unsqueeze(1)  # [batch size, 1, sent len, emb dim]
        conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]
        pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
        cat = self.dropout(torch.cat(pooled, dim=1))
        return self.fc(cat)

# Create the CNN instance
INPUT_DIM = len(tokenizer.get_vocab())  # Use tokenizer vocabulary size
EMBEDDING_DIM = 512
N_FILTERS = 100
FILTER_SIZES = [2, 3, 4]
OUTPUT_DIM = 1  # Binary classification
DROPOUT = 0.5

model = CNN(INPUT_DIM, EMBEDDING_DIM, N_FILTERS, FILTER_SIZES, OUTPUT_DIM, DROPOUT)
model = model.to(device)  

import torch.optim as optim

optimizer = optim.Adam(model.parameters())
criterion = nn.BCEWithLogitsLoss()
criterion = criterion.to(device)

def train(model, iterator, optimizer, criterion):
    epoch_loss = 0
    epoch_acc = 0
    model.train()
    
    for batch in iterator:
        texts = batch['input_ids'].to(device)
        labels = batch['labels'].to(device)
        
        optimizer.zero_grad()
        predictions = model(texts).squeeze(1)
        
        loss = criterion(predictions, labels.float())
        acc = binary_accuracy(predictions, labels)
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

def evaluate(model, iterator, criterion):
    epoch_loss = 0
    epoch_acc = 0
    model.eval()
    
    with torch.no_grad():
        for batch in iterator:
            texts = batch['input_ids'].to(device)
            labels = batch['labels'].to(device)
            predictions = model(texts).squeeze(1)
            loss = criterion(predictions, labels.float())
            acc = binary_accuracy(predictions, labels)
            epoch_loss += loss.item()
            epoch_acc += acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

def binary_accuracy(preds, y):
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = (rounded_preds == y).float()
    acc = correct.sum() / len(correct)
    return acc

NUM_EPOCHS = 20

for epoch in range(NUM_EPOCHS):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, val_loader, criterion)
    
    print(f'Epoch: {epoch+1:02}')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')


torch.save(model.state_dict(), 'model.pth')

## XAI TECHNIQUES

In [None]:
# Tokenize the example text
text = "I love this movie."

inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
input_ids = inputs['input_ids']
attention_mask = inputs['attention_mask']

In [None]:
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap

def visualize_attributions(attributions, input_ids):
    # Sum the attributions across embedding dimensions and normalize them
    attributions = attributions.sum(dim=-1).squeeze(0)
    attributions = attributions / torch.norm(attributions)  # Normalizing attributions
    weights = attributions.detach().numpy()
    
    # Convert input IDs to tokens
    words = tokenizer.convert_ids_to_tokens(input_ids.squeeze().tolist())
    
    # Skip the first and last token (usually [CLS] and [SEP])
    words = words[1:-1]
    weights = weights[1:-1]
    
    # Check if the number of weights matches the number of words
    if len(words) != len(weights):
        raise ValueError("The number of weights must match the number of words in the text.")
    
    # Create a custom color map
    colors = ["red", "lightgrey", "green"]
    cmap = LinearSegmentedColormap.from_list("custom_cmap", colors, N=256)
    
    # Normalize weights to be between -1 and 1
    norm = plt.Normalize(-1, 1)
    
    # Create a figure and axis
    fig, ax = plt.subplots(figsize=(len(words) * 1, 2))
    ax.axis('off')

    # Plot each word with its corresponding background color
    x_pos = 0
    for word, weight in zip(words, weights):
        color = cmap(norm(weight))
        ax.text(x_pos, 0.5, word, fontsize=12, weight='bold', color='black', 
                bbox=dict(facecolor=color, edgecolor='none', boxstyle='round,pad=0.5'))
        x_pos += len(word) * 0.5  # Adjust spacing between words
    
    # Adjust the plot
    plt.xlim(-0.5, x_pos)
    plt.ylim(0, 1)
    plt.axis('off')
    plt.show()

In [None]:
# Define a model wrapper for Captum
class CNNModelWrapper(nn.Module):
    def __init__(self, model):
        super(CNNModelWrapper, self).__init__()
        self.model = model

    def forward(self, input_ids):
        embedded = self.model.embedding(input_ids)
        embedded = embedded.unsqueeze(1)  # [batch size, 1, sent len, emb dim]
        conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.model.convs]
        pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
        cat = self.model.dropout(torch.cat(pooled, dim=1))
        output = self.model.fc(cat)
        return output

# Prepare model and wrapper
wrapper = CNNModelWrapper(model)

In [None]:
from captum.attr import configure_interpretable_embedding_layer, remove_interpretable_embedding_layer

# Configure interpretable embedding layer for Captum
interpretable_embedding = configure_interpretable_embedding_layer(model, 'embedding')


# Since the model outputs a single logit for binary classification, set target to 0 (for the logit itself)
target_index = 0

#### Deep Lift

In [None]:
%%time 

from captum.attr import DeepLift

def xai_deeplift(model, input, target, visual=False):

    input_embeddings = interpretable_embedding.indices_to_embeddings(input)    
    xai = DeepLift(wrapper)
    attributions = xai.attribute(input_embeddings, target=target)
    
    if visual==True:
        visualize_attributions(attributions, input)

    return attributions

temp = xai_deeplift(wrapper, input_ids, target_index, visual=True)

#### Saliency

In [None]:
%%time 

from captum.attr import Saliency

def xai_saliency(model, input, target, visual=False):

    input_embeddings = interpretable_embedding.indices_to_embeddings(input)    
    xai = Saliency(wrapper)
    attributions = xai.attribute(input_embeddings, target=target)
    
    if visual==True:
        visualize_attributions(attributions, input)

    return attributions

temp = xai_saliency(wrapper, input_ids, target_index, visual=True)

#### Integrated Gradients

In [None]:
%%time 

from captum.attr import IntegratedGradients

def xai_integratedgradients(model, input, target, visual=False):

    input_embeddings = interpretable_embedding.indices_to_embeddings(input)    
    xai = IntegratedGradients(wrapper)
    attributions = xai.attribute(input_embeddings, target=target)
    
    if visual==True:
        visualize_attributions(attributions, input)

    return attributions

temp = xai_integratedgradients(wrapper, input_ids, target_index, visual=True)

#### Input X Gradient

In [None]:
%%time 

from captum.attr import InputXGradient

def xai_inputgradient(model, input, target, visual=False):

    input_embeddings = interpretable_embedding.indices_to_embeddings(input)    
    xai = InputXGradient(wrapper)
    attributions = xai.attribute(input_embeddings, target=target)
    
    if visual==True:
        visualize_attributions(attributions, input)

    return attributions

temp = xai_inputgradient(wrapper, input_ids, target_index, visual=True)

#### Shapley Values

In [None]:
%%time 

from captum.attr import ShapleyValueSampling

def xai_shapley(model, input, target, visual=False):

    input_embeddings = interpretable_embedding.indices_to_embeddings(input)    
    xai = ShapleyValueSampling(wrapper)
    attributions = xai.attribute(input_embeddings, target=target)
    
    if visual==True:
        visualize_attributions(attributions, input)

    return attributions

temp = xai_shapley(wrapper, input_ids, target_index, visual=True)

#### Guided Backpropagation

In [None]:
%%time 

from captum.attr import GuidedBackprop

def xai_guidedbackprop(model, input, target, visual=False):

    input_embeddings = interpretable_embedding.indices_to_embeddings(input)    
    xai = GuidedBackprop(wrapper)
    attributions = xai.attribute(input_embeddings, target=target)
    
    if visual==True:
        visualize_attributions(attributions, input)

    return attributions

temp = xai_guidedbackprop(wrapper, input_ids, target_index, visual=True)

#### Deconvolution

In [None]:
%%time 

from captum.attr import Deconvolution

def xai_deconvolution(model, input, target, visual=False):

    input_embeddings = interpretable_embedding.indices_to_embeddings(input)    
    xai = Deconvolution(wrapper)
    attributions = xai.attribute(input_embeddings, target=target)
    
    if visual==True:
        visualize_attributions(attributions, input)

    return attributions

temp = xai_deconvolution(wrapper, input_ids, target_index, visual=True)

#### Lime

In [None]:
%%time 

from captum.attr import Lime

def xai_lime(model, input, target, visual=False):

    input_embeddings = interpretable_embedding.indices_to_embeddings(input)    
    xai = Lime(wrapper)
    attributions = xai.attribute(input_embeddings, target=target)
    
    if visual==True:
        visualize_attributions(attributions, input)

    return attributions

temp = xai_lime(wrapper, input_ids, target_index, visual=True)

#### Guided GradCAM

In [None]:
%%time 

from captum.attr import GuidedGradCam

def xai_gradcam(model, input, target, visual=False):

    input_embeddings = interpretable_embedding.indices_to_embeddings(input)    
    xai = GuidedGradCam(wrapper,wrapper.model.embedding)
    attributions = xai.attribute(input_embeddings, target=target)
    
    if visual==True:
        visualize_attributions(attributions, input)

    return attributions

temp = xai_gradcam(wrapper, input_ids, target_index, visual=True)

#### SHAP

In [None]:
%%time 

from captum.attr import GradientShap

def xai_shap(model, input, target, visual=False):

    input_embeddings = interpretable_embedding.indices_to_embeddings(input)    
    xai = GradientShap(wrapper)
    attributions = xai.attribute(input_embeddings, baselines=torch.randn_like(input_embeddings),target=target_index)
    
    if visual==True:
        visualize_attributions(attributions, input)

    return attributions

temp = xai_shap(wrapper, input_ids, target_index, visual=True)

#### Occlusion

In [None]:
%%time 

from captum.attr import Occlusion

def xai_occlusion(model, input, target, visual=False):

    input_embeddings = interpretable_embedding.indices_to_embeddings(input)    
    xai = Occlusion(wrapper)
    attributions = xai.attribute(input_embeddings, sliding_window_shapes = (1, 3),target=target_index)
    
    if visual==True:
        visualize_attributions(attributions, input)

    return attributions

temp = xai_occlusion(wrapper, input_ids, target_index, visual=True)

## XAI METRICS

In [None]:
from captum.metrics import infidelity, sensitivity_max 

# define a perturbation function for the input
def perturb_fn(inputs):
    noise = torch.tensor(np.random.normal(0, 0.2, inputs.shape)).float()
    return noise, inputs - noise

In [None]:
%%time 

xai_methods = [
    ("Deep Lift", xai_deeplift),
    ("Saliency", xai_saliency),
    ("Integrated Gradients", xai_integratedgradients),
    ("Input X Gradient", xai_inputgradient),
    #("Shapley Values", xai_shapley),
    ("Guided Backpropagation", xai_guidedbackprop),
    ("Deconvolution", xai_deconvolution),
    ("Lime", xai_lime),
    ("GradCAM", xai_gradcam),
    ("SHAP", xai_shap),
    ("Occlusion", xai_occlusion)
]

df_infidelity = pd.DataFrame()
    
for i in range(len(test_df)):
    text = test_df.iloc[i, 0]

    inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
    input_ids = inputs['input_ids']
    input_embeddings = interpretable_embedding.indices_to_embeddings(input_ids)
    
    # Dictionary to store the infidelity values for the current example
    infidelities = {}
    
    # Iterate over the XAI methods and compute infidelities
    for method_name, xai_function in xai_methods:
        attributions = xai_function(wrapper, input_ids, target_index)
        infid = np.round(infidelity(wrapper, perturb_fn, input_embeddings, attributions).item(), 5)
        
        # Store the infidelity value in the dictionary
        infidelities[method_name] = infid
        
    # Append the infidelities dictionary as a new row to the DataFrame
    df_infidelity = pd.concat([df_infidelity, pd.DataFrame([infidelities])], ignore_index=True)
    

df_infidelity.head()

In [None]:
import seaborn as sns

# Create a box plot using Seaborn
plt.figure(figsize=(10, 6))
sns.boxplot(data=df_infidelity)
plt.title('Box Plot of Infidelity Values by Method')
plt.ylabel('Infidelity Value')
plt.xlabel('Method')
plt.xticks(rotation=45)

plt.show()

In [None]:
# Remove interpretable embedding layer after attribution
remove_interpretable_embedding_layer(model, interpretable_embedding)