## Imports

In [None]:
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import numpy as np
import torch
from transformers import BertTokenizer, BertModel
import torch.nn as nn
from transformers import BertModel, BertTokenizer
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score
from tqdm import tqdm


In [None]:
1

In [None]:
pip install datasets

## Let's download dataset

IMDb Movie Reviews: A dataset containing movie reviews labeled as positive or negative sentiment. It's commonly used for sentiment analysis tasks.

In [None]:
from datasets import load_dataset
from sklearn.model_selection import train_test_split

imdb_dataset = load_dataset("imdb")

texts = imdb_dataset["train"]["text"]
labels = imdb_dataset["train"]["label"]

X_train, X_val, y_train, y_val = train_test_split(texts, labels, test_size=0.2, random_state=42)

print(f"Training set size: {len(X_train)}")
print(f"Validation set size: {len(X_val)}")


In [None]:
unique_labels, label_counts = np.unique(y_train, return_counts=True)

# Print the label frequencies
for label, count in zip(unique_labels, label_counts):
    print(f"Label {label}: {count} samples")

In [None]:
unique_labels, label_counts = np.unique(y_val, return_counts=True)

# Print the label frequencies
for label, count in zip(unique_labels, label_counts):
    print(f"Label {label}: {count} samples")

In [None]:
y_train[10]

In [None]:
# Load the pretrained En-BERT-base model and tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')

In [None]:

max_length = 128  # Maximum sequence length for BERT
train_token_ids = [tokenizer.encode(text, max_length=max_length, truncation=True) for text in X_train]
val_token_ids = [tokenizer.encode(text, max_length=max_length, truncation=True) for text in X_val]


train_token_ids = [ids + [0] * (max_length - len(ids)) for ids in train_token_ids]
val_token_ids = [ids + [0] * (max_length - len(ids)) for ids in val_token_ids]

train_token_ids_tensor = torch.tensor(train_token_ids)
val_token_ids_tensor = torch.tensor(val_token_ids)
train_labels_tensor = torch.tensor(y_train)
val_labels_tensor = torch.tensor(y_val)

train_dataset = TensorDataset(train_token_ids_tensor, train_labels_tensor)
val_dataset = TensorDataset(val_token_ids_tensor, val_labels_tensor)


batch_size = 32


train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

In [None]:
train_dataset

## Baseline

In [None]:
from sklearn.metrics import precision_score, recall_score
def epoch_validation(model, classifier, val_loader):
    classifier.eval()
    val_preds = []
    val_labels = []
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)[1]
            logits = classifier(outputs)
            val_preds.extend(torch.argmax(logits, dim=1).tolist())
            val_labels.extend(labels.tolist())
    val_acc = accuracy_score(val_labels, val_preds)
    precision = precision_score(val_labels, val_preds)
    recall = recall_score(val_labels, val_preds)
    return val_acc, precision, recall

In [None]:
# Define a custom classifier with a linear layer on top of the pooled output
class Classifier(nn.Module):
    def __init__(self, input_size, num_classes):
        super(Classifier, self).__init__()
        self.linear = nn.Linear(input_size, num_classes)
    def forward(self, x):
        return self.linear(x)


# def epoch_validation(model, classifier, val_loader):

#     classifier.eval()
#     val_preds = []
#     val_labels = []
#     with torch.no_grad():
#         for inputs, labels in val_loader:
#             inputs, labels = inputs.to(device), labels.to(device)
#             outputs = model(inputs)[1]
#             logits = classifier(outputs)
#             val_preds.extend(torch.argmax(logits, dim=1).tolist())
#             val_labels.extend(labels.tolist())

#     val_acc = accuracy_score(val_labels, val_preds)
#     return val_acc


# Baseline approach
def train_baseline(model, classifier, train_loader, val_loader):
    # Freeze the parameters of the BERT model
    for param in model.parameters():
        param.requires_grad = False

    classifier.train()
    for epoch in range(num_epochs):

        avg_loss = 0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)[1]  # Get pooled output from BERT
            logits = classifier(outputs)
            loss = criterion(logits, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            avg_loss += loss

        val_acc, precision, recall = epoch_validation(model, classifier, val_loader)
        print(f"Epoch {epoch+1}/{num_epochs}, Validation Accuracy: {val_acc}")
        print("Validation Precision:", precision)
        print("Validation Recall:", recall)
        print("Train loss: ", avg_loss.item())
        print("_______________________")

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
classifier = Classifier(input_size=768, num_classes=2)
classifier = classifier.to(device)
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(classifier.parameters(), lr=1e-3)

In [None]:
num_epochs = 5
print("device: ", device)
train_baseline(model, classifier, train_loader, val_loader)

## Attention map extraction

In [None]:
import matplotlib.pyplot as plt
from transformers import BertTokenizer, BertModel
import torch

In [None]:
# Load pre-trained BERT model and tokenizer with output attentions
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased', output_attentions=True)

# Tokenize input text
#input_text = "There is snowing today"
input_text = "It was raining yesterday"
input_ids = tokenizer.encode(input_text, add_special_tokens=True, return_tensors="pt")

# Forward pass through the model
with torch.no_grad():
    outputs = model(input_ids)

# Extract attention matrices from the output
attention_matrices = outputs.attentions  #Tuple of torch.FloatTensor (one for each layer) of shape (batch_size, num_heads, sequence_length, sequence_length).


In [None]:
# Print attention_matrices
print(len(attention_matrices))

In [None]:
input_ids

In [None]:
print(len(attention_matrices[0]))

https://stackoverflow.com/questions/77546636/understanding-output-attentions

In [None]:
attention_matrices[-1].shape # 12 heads   # lets take last layer attention

In [None]:


# Assuming you have the attention map stored in the variable attention_map
#attention_map = torch.randn(1, 12, 8, 8)  # Example random attention map
attention_map = attention_matrices[-1]
# Extract attention map for the 5th head
attention_map_5th_head = attention_map[:, 2, :, :]  # Extracting the 5th head, index 4 because indexing starts from 0

# Convert the attention map tensor to a NumPy array
attention_map_5th_head_np = attention_map_5th_head.squeeze().numpy()

# Plot the heatmap
plt.imshow(attention_map_5th_head_np, cmap='hot', interpolation='nearest')
plt.title('Attention Map for 5th Head')
plt.xlabel('Sequence Length')
plt.ylabel('Sequence Length')
plt.colorbar()  # Add color bar indicating the intensity of attention
plt.show()


In [None]:
input_text = "Hello My name is Kamil"
tokens = tokenizer.tokenize(input_text)
print(tokens)

In [None]:

# Assuming you have the attention maps stored in a tuple named attention_maps


# Create a grid of subplots to visualize each head's attention map
fig, axes = plt.subplots(3, 4, figsize=(15, 10))  # Assuming 12 heads, arrange in a 3x4 grid
fig.suptitle('Attention Maps for All Heads')
attention_map = attention_matrices[0]
# Plot each attention map as a heatmap
for i in range(len(attention_matrices)):
    #print(i)
    attention = attention_map[:, i, :, :]
    #print(attention)
    ax = axes[i // 4, i % 4]  # Get the appropriate subplot
    ax.set_title(f'Head {i+1}')  # Set title for the subplot
    ax.imshow(attention.squeeze().numpy(), cmap='hot', interpolation='nearest')
    ax.set_xlabel('Sequence Length')
    ax.set_ylabel('Sequence Length')
    plt.colorbar(ax.imshow(attention.squeeze().numpy(), cmap='hot', interpolation='nearest'), ax=ax)  # Add color bar indicating the intensity of attention

# Adjust layout
plt.tight_layout()
plt.show()


## Feature extraction

In [None]:
!pip install giotto-tda

In [None]:
import numpy as np
from numpy.random import default_rng
rng = default_rng(42)  # Create a random number generator

from scipy.spatial.distance import pdist, squareform
from scipy.sparse import coo_matrix

from gtda.graphs import GraphGeodesicDistance
from gtda.homology import VietorisRipsPersistence, SparseRipsPersistence, FlagserPersistence

from igraph import Graph

from IPython.display import SVG, display

In [None]:
# Create a single weighted adjacency matrix of a FCW graph
n_vertices = 10
x = rng.random((n_vertices, n_vertices))
# Fill the diagonal with zeros (not always necessary, see below)
np.fill_diagonal(x, 0)

# Create a trivial collection of weighted adjacency matrices, containing x only
X = [x]

# Instantiate topological transformer
VR = VietorisRipsPersistence(metric="precomputed")

# Compute persistence diagrams corresponding to each entry (only one here) in X
diagrams = VR.fit_transform(X)

print(f"diagrams.shape: {diagrams.shape} ({diagrams.shape[1]} topological features)")

In [None]:
diagrams

If we make one scatter plot per available homology dimension, and plot births and deaths as x- and y-coordinates of points in 2D, we end up with a 2D representation of diagrams[i], and the reason why it is called a persistence diagram:

In the diagram, each point indicates a topological feature in the data which appears at a certain “birth” scale and remains present all the way up to a later “death” scale. A point’s distance from the diagonal is directly proportional to the difference between the point’s “death” and its “birth”. Hence, this distance visually communicates how “persistent” the associated topological feature is. Topological features are partitioned by dimension using colors: above, features in dimension 0 are red while those in dimension 1 are green. In dimension 0, the diagram describes connectivity structure in the data in a very similar way to linkage clustering: we see three points along the vertical axis, which are in one-to-one correspondence with “merge” events in the sense of hierarchical clustering. In dimension 1, the diagram describes the presence of “independent” one-dimensional holes in the data: as expected, there are only two significant points, corresponding to the two “persistent” circles.

In [None]:
from gtda.plotting import plot_diagram

plot_diagram(diagrams[0])

In [None]:
fig, axes = plt.subplots(3, 4, figsize=(15, 10))
fig.suptitle('Attention Maps for All Heads')
attention_map = attention_matrices[0]

for i in range(len(attention_matrices)):

    attention = attention_map[:, i, :, :]
    X = [attention[0].numpy()]
    VR = VietorisRipsPersistence(metric="precomputed")
    diagrams = VR.fit_transform(X)
    print("Head: ", i + 1)
    print(f"diagrams.shape: {diagrams.shape} ({diagrams.shape[1]} topological features)")
    print("__________________________________________________")
    ax = axes[i // 4, i % 4]
    ax.set_title(f'Head {i+1}')
    ax.imshow(attention.squeeze().numpy(), cmap='hot', interpolation='nearest')
    ax.set_xlabel('Sequence Length')
    ax.set_ylabel('Sequence Length')
    plt.colorbar(ax.imshow(attention.squeeze().numpy(), cmap='hot', interpolation='nearest'), ax=ax)  # Add color bar indicating the intensity of attention

plt.tight_layout()
plt.show()

In [None]:
attention_map = attention_matrices[0]
attention = attention_map[:, 9, :, :]
X = [attention[0].numpy()]
VR = VietorisRipsPersistence(metric="precomputed")
diagrams = VR.fit_transform(X)
print(f"diagrams.shape: {diagrams.shape} ({diagrams.shape[1]} topological features)")
print(diagrams[0])
plot_diagram(diagrams[0])

• The sum of lengths of bars;

• The mean of lengths of bars;

• The variance of lengths of bars;

• The number of bars with time of birth/death
greater/lower than threshold;

• The time of birth/death of the longest bar (excluding infinite);

• The overall number of bars;

• The entropy of the barcode.

In [None]:
diagrams

In [None]:
len(diagrams[0])

## Feature extraction pipeline

In [None]:
import numpy as np

def histogram_entropy(hist):
    total = np.sum(hist)
    probabilities = hist / total
    entropy = -np.sum(probabilities * np.log2(probabilities + 1e-10))  # Adding a small value to avoid log(0)
    return entropy

In [None]:
import numpy as np

def histogram_entropy(hist):
    total = np.sum(hist)
    probabilities = hist / total
    entropy = -np.sum(probabilities * np.log2(probabilities + 1e-10))  # Adding a small value to avoid log(0)
    return entropy

# Example histogram (replace this with your own histogram data)
histogram_data = np.array([10, 20, 30, 40, 50, 60, 70, 80, 90, 100])

# Compute entropy
entropy = histogram_entropy(histogram_data)
print("Entropy:", entropy)
print(histogram_entropy(diagrams))

In [None]:
def compute_features(diagrams):
    diagrams_list = diagrams[0]
    number_of_bars_h0 = 0
    number_of_bars_h1 = 0
    length_h0 = list()
    length_h1 = list()
    diag_h0 = [i for i in diagrams[0] if i[2] == 0]
    diag_h1 = [i for i in diagrams[0] if i[2] == 1]
    entr_h0 = histogram_entropy([i for i in diagrams[0] if i[2] == 0])
    entr_h1 = histogram_entropy([i for i in diagrams[0] if i[2] == 1])

    for bar in diagrams_list:
        birth = bar[0]
        death = bar[1]
        homology_type = bar[2]
        if homology_type == 0. :
            #print("H0")
            length = death - birth
            length_h0.append(length)
            number_of_bars_h0 +=1
        if homology_type == 1. :
            #print("H1")
            length = death - birth
            length_h1.append(length)
            number_of_bars_h1 +=1

    # print(length_h0)
    # print(length_h1)
    # print(number_of_bars_h0, number_of_bars_h1)
    time_of_birth_longest_h0 = diag_h0[length_h0.index(max(length_h0))][0]
    time_of_birth_longest_h1 = diag_h1[length_h1.index(max(length_h1))][0]
    time_of_death_longest_h0 = diag_h0[length_h0.index(max(length_h0))][1]
    time_of_death_longest_h1 = diag_h1[length_h1.index(max(length_h1))][1]
    #print(time_of_birth_longest_h0 ,time_of_birth_longest_h1 ,time_of_death_longest_h0 ,time_of_death_longest_h1)
    sum_of_lenghts_h0 = sum(length_h0)
    sum_of_lenghts_h1 = sum(length_h1)
    mean_of_lenghts_h0 = np.mean(np.array(length_h0))
    mean_of_lenghts_h1 = np.mean(np.array(length_h1))
    var_of_lenghts_h0 = np.var(np.array(length_h0))
    var_of_lenghts_h1 = np.var(np.array(length_h1))
    if number_of_bars_h1 == 0 :
        entr_h1= 0
        time_of_birth_longest_h1= 0
        time_of_death_longest_h1= 0
        sum_of_lenghts_h1 = 0
        mean_of_lenghts_h1= 0
        var_of_lenghts_h1 = 0
    if number_of_bars_h0 == 0 :
        entr_h0 = 0
        time_of_birth_longest_h0= 0
        time_of_death_longest_h0= 0
        sum_of_lenghts_h0 = 0
        mean_of_lenghts_h0= 0
        var_of_lenghts_h0 = 0


    #print(sum_of_lenghts_h0, sum_of_lenghts_h1, mean_of_lenghts_h0, mean_of_lenghts_h1, var_of_lenghts_h0, var_of_lenghts_h1)
    feature_list = [entr_h0, entr_h1, number_of_bars_h0,number_of_bars_h1,time_of_birth_longest_h0,time_of_birth_longest_h1,time_of_death_longest_h0,
                    time_of_death_longest_h1,sum_of_lenghts_h0, sum_of_lenghts_h1, mean_of_lenghts_h0,mean_of_lenghts_h1,var_of_lenghts_h0, var_of_lenghts_h1 ]
    return {
                'length_h0' : length_h0, # not a feature
                'length_h1' : length_h1, # not a feature
                'entropy_h0': entr_h0,
                'entropy_h1': entr_h1,
                'number_of_bars_h0' : number_of_bars_h0,
                'number_of_bars_h1' : number_of_bars_h1,
                'time_of_birth_longest_h0' : time_of_birth_longest_h0,
                'time_of_birth_longest_h1' : time_of_birth_longest_h1,
                'time_of_death_longest_h0' : time_of_death_longest_h0,
                'time_of_death_longest_h1' : time_of_death_longest_h1,
                'sum_of_lenghts_h0' : sum_of_lenghts_h0,
                'sum_of_lenghts_h1' : sum_of_lenghts_h1,
                'mean_of_lenghts_h0' : mean_of_lenghts_h0,
                'mean_of_lenghts_h1' : mean_of_lenghts_h1,
                'var_of_lenghts_h0' : var_of_lenghts_h0,
                'var_of_lenghts_h1' : var_of_lenghts_h1,
                'features' : feature_list
        }


In [None]:
compute_features(diagrams)

In [None]:
attention = attention_map[:, 4, :, :].squeeze().numpy()
X = [attention]
VR = VietorisRipsPersistence(metric="precomputed")
diagrams = VR.fit_transform(X)
print(f"diagrams.shape: {diagrams.shape} ({diagrams.shape[1]} topological features)")
print(diagrams)
compute_features(diagrams)

In [None]:
import warnings
warnings.filterwarnings("ignore")


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# from tqdm import tqdm
# print("device:", device)
# tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
# model = BertModel.from_pretrained('bert-base-uncased', output_attentions=True)
# model = model.to(device)
# all_features = list()

# for data, labels in tqdm(train_loader):
#     data, labels = data.to(device), labels.to(device)
#     #print(data.shape)

#     # print(data[31].numpy())
#     # print(data[31].numpy()[10])
#     with torch.no_grad():
#         outputs = model(data)

#     attention_matrices = outputs.attentions  #Tuple of torch.FloatTensor (one for each layer) of shape (batch_size, num_heads, sequence_length, sequence_length).
#     attention_map = attention_matrices[-1] # from the last layer
#     #print(attention_map.shape)
#     #print(attention_map[31].shape)

#     for i in range(batch_size):
#         list_features = list()
#         attention_for_sample = attention_map[i]
#         for head in range(12):
#             #print(attention_for_sample.shape)
#             attention = attention_for_sample[head, :, :].cpu().numpy()
#             #print(attention.shape)
#             X = [attention]
#             VR = VietorisRipsPersistence(metric="precomputed")
#             diagrams = VR.fit_transform(X)
#             #print(f"diagrams.shape: {diagrams.shape} ({diagrams.shape[1]} topological features)")
#             #print(diagrams)
#             features = compute_features(diagrams)['features']
#             list_features.append(features)
#         all_features.append(np.array(list_features))
#     break


In [None]:
from tqdm import tqdm
print("device:", device)
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased', output_attentions=True)
model = model.to(device)
all_features = list()
c = 0
for data, labels in tqdm(train_loader):
    data, labels = data.to(device), labels.to(device)
    #print(data.shape)

    # print(data[31].numpy())
    # print(data[31].numpy()[10])
    with torch.no_grad():
        outputs = model(data)

    attention_matrices = outputs.attentions  #Tuple of torch.FloatTensor (one for each layer) of shape (batch_size, num_heads, sequence_length, sequence_length).
    attention_map = attention_matrices[-1] # from the last layer
    #print(attention_map.shape)
    #print(attention_map[31].shape)

    for i in range(batch_size):
        list_features = list()
        attention_for_sample = attention_map[i]

        #print(attention_for_sample.shape)
        attention = attention_for_sample[-1, :, :].cpu().numpy()
        #print(attention.shape)
        X = [attention]
        VR = VietorisRipsPersistence(metric="precomputed")
        diagrams = VR.fit_transform(X)
        #print(f"diagrams.shape: {diagrams.shape} ({diagrams.shape[1]} topological features)")
        #print(diagrams)
        features = compute_features(diagrams)['features']
        #list_features.append(features)
        all_features.append(np.array(features))
    # c+=1
    # if c == 3:
    #     break

In [None]:
import numpy as np

file_path = 'features.npy'
# Save the array to file
np.save(file_path, np.array(all_features))


In [None]:
from google.colab import drive

# Mount Google Drive to /content/drive
drive.mount('/content/drive')


In [None]:
import numpy as np
# Specify the file path in your Google Drive
file_path = '/content/drive/My Drive/features_array.npy'

# Save the array to file
np.save(file_path, np.array(all_features))



In [None]:
all_features[2].shape

In [None]:
all_features[0]

In [None]:
all = np.array(all_features)
all.shape

In [None]:
len(y_train)

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import MinMaxScaler
# Assuming X is your feature matrix and y is your target variable
# Split the data into training and test sets
all[all == np.inf] = 1000
X_train2, X_test2, y_train2, y_test2 = train_test_split(all, y_train, test_size=0.2, random_state=42)




scaler = MinMaxScaler()
X_train2_scaled = scaler.fit_transform(X_train2)
imputer = SimpleImputer(strategy='mean') # Initialize SimpleImputer with strategy='mean'
X_train2_imputed = imputer.fit_transform(X_train2_scaled) # Fit the imputer on X_train2 and transform X_train2


LogReg = LogisticRegression()
LogReg.fit(X_train2_imputed, y_train2)# Train the model on the training data


X_test2_scaled = scaler.transform(X_test2)  # Use the same scaler fitted on the training data
X_test2_imputed = imputer.transform(X_test2_scaled)

y_pred2 = LogReg.predict(X_test2_imputed) # Make predictions on the test data

# Evaluate the model
accuracy = accuracy_score(y_test2, y_pred2)
print("Accuracy:", accuracy)

# Additional evaluation metrics
print(classification_report(y_test2, y_pred2))


## Attention-based approach

In [None]:
def val_attention(classifier, model, val_loader):
    classifier.eval()
    val_preds = []
    val_labels = []
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            attention_weights = outputs.attentions
            features = derive_features_from_attention(attention_weights)
            logits = classifier(features)
            val_preds.extend(torch.argmax(logits, dim=1).tolist())
            val_labels.extend(labels.tolist())

    val_acc = accuracy_score(val_labels, val_preds)
    return val_acc


In [None]:
# Attention-based approach
def train_attention(classifier,model,train_loader, val_loader):
    # Training loop
    for epoch in range(num_epochs):
        avg_loss = 0
        classifier.train()
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            # Extract attention maps from BERT
            outputs = model(inputs)
            attention_weights = outputs.attentions  # Assuming model returns attention weights
            # Use attention weights to derive features (implementation needed)
            features = derive_features_from_attention(attention_weights)
            logits = classifier(features)
            loss = criterion(logits, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            avg_loss += loss

        print("Train loss: ", avg_loss)
        val_acc = epoch_validation(model, classifier, val_loader)
        print(f"Epoch {epoch+1}/{num_epochs}, Validation Accuracy: {val_acc}")
        print("Train loss: ", avg_loss)
        print("_______________________")




In [None]:
# Define the classifier
classifier = Classifier(input_size=768, num_classes=2)
classifier = classifier.to(device)
model = model.to(device)


# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(classifier.parameters(), lr=1e-3)

In [None]:
# Assuming you have loaded and preprocessed your data into PyTorch DataLoader objects train_loader and val_loader
num_epochs = 5
train_attention(train_loader, val_loader)