In [6]:
import time
import itertools
import re
import random
import os
import pickle
import numpy as np

from sklearn.datasets import load_files
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics import accuracy_score
from lime.lime_text import LimeTextExplainer

import numpy as np
import pennylane as qml
from concurrent.futures import ThreadPoolExecutor


from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from collections import defaultdict

import nltk  #This is to do lemmatization
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize

nltk.download("punkt_tab")
nltk.download("wordnet")
nltk.download("omw-1.4")


import tensorflow as tf

[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\migue\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\migue\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     C:\Users\migue\AppData\Roaming\nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


# PART 0: DATA LOADING AND PREPROCESSING

In [7]:

def clean_text(text):
    cleaned = re.sub(r'<.*?>', '', text).lower()
        # Tokenize into words
    tokens = word_tokenize(cleaned)
    # Initialize the lemmatizer
    lemmatizer = WordNetLemmatizer()
    # Apply lemmatization to each token.
    # Here we use 'v' (verb) as the POS tag for simplicity.
    lemmatized_tokens = [lemmatizer.lemmatize(token, pos='v') for token in tokens]
    return ' '.join(lemmatized_tokens)

def load_imdb_subset(
    num_samples=5000, 
    min_df=1, 
    max_features=15, 
    stopwords_option=True,
    stop_words = 'english'
):
    
    data = load_files(
        r"C:/Users/migue/Downloads/aclImdb_v1/aclImdb/train",
        categories=['pos','neg'], 
        encoding="utf-8", 
        decode_error="replace"                  
    )
    X_text_all, y_all = data.data, data.target


    X_text_all = [clean_text(txt) for txt in X_text_all]

    # Shuffle & truncate to num_samples
    full_idx = np.arange(len(X_text_all))
    #np.random.shuffle(full_idx)
    subset_idx = full_idx[:num_samples]
    global X_text 
    X_text = [X_text_all[i] for i in subset_idx]
    y = y_all[subset_idx]

    # Train/test split
    X_train, X_test, y_train, y_test = train_test_split(
        X_text, y, test_size=0.2, random_state=0
    )

    # Vectorizer: presence/absence
    if stopwords_option:
        vectorizer = CountVectorizer(
            binary=True, stop_words=stop_words, 
            min_df=min_df, max_features=max_features
        )
    else:
        vectorizer = CountVectorizer(
            binary=True, stop_words='english', 
            min_df=min_df, max_features=max_features
        )

    vectorizer.fit(X_train)
    return X_train, X_test, y_train, y_test, vectorizer



def train_NN_classifier(X_train, y_train, X_test, y_test, vectorizer):
    """
    Trains a neural network on the binary presence/absence of words.
    Returns the fitted model.
    """
    X_train_bow = vectorizer.transform(X_train)
    X_valid_bow = vectorizer.transform(X_test)
    input_dim = X_train_bow.shape[1]

    model = Sequential([
        Dense(64, activation='relu', input_shape=(input_dim,)),  # First hidden layer
        Dropout(0.3),  # Dropout with 30% probability
        Dense(32, activation='relu'),  # Second hidden layer
        Dropout(0.2),  # Dropout with 20% probability
        Dense(1, activation='sigmoid')  # Output layer for binary classification
    ])

    # Compile the model
    model.compile(optimizer=Adam(learning_rate = 0.00005), loss='binary_crossentropy', metrics=['accuracy'])
    early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

    model.fit(X_train_bow, y_train, epochs=100, batch_size=16, validation_data=(X_valid_bow, y_test), verbose=1, callbacks=[early_stopping])
    return model

def get_cached_NN(X_train, y_train, vectorizer, num_samples, max_features, stop_words, X_valid, y_valid):

    filename = f"cached_classifier_ns{num_samples}_mf{max_features}_sw{stop_words}_NN_classifier_seed0.pkl"
    if os.path.exists(filename):
        print("Loading cached logistic from", filename)
        with open(filename, 'rb') as f:
            clNN = pickle.load(f)
    else:
        print("No cached classifier found. Training a new one...")
        clNN = train_NN_classifier(X_train, y_train, X_valid, y_valid, vectorizer)
        with open(filename, 'wb') as f:
            pickle.dump(clNN, f)
        print("Cached classifier saved as", filename)
    return clNN

# I don't need to create train_NN_classifier_filt no? both _filt and not _filt are the same, so it should be fine withouth it

def get_cached_NN_filt(X_train_filt, y_train_filt, vectorizer_filt, num_samples, max_features, X_valid_filt, y_valid_filt):
    filename = f"cached_classifier_ns{num_samples}_mf{max_features}_sw_LIME_filtered_NN_classifier_seed0.pkl"
    if os.path.exists(filename):
        print("Loading cached logistic from", filename)
        with open(filename, 'rb') as f:
            clNN_filt = pickle.load(f)
    else:
        print("No cached classifier found. Training a new one...")
        clNN_filt = train_NN_classifier(X_train_filt, y_train_filt, X_valid_filt, y_valid_filt, vectorizer_filt)
        with open(filename, 'wb') as f:
            pickle.dump(clNN_filt, f)
        print("Cached classifier saved as", filename)
    return clNN_filt

# I don't need to create train_NN_classifier_filt no? both _filt and not _filt are the same, so it should be fine withouth it

def q_get_cached_NN_filt(q_X_train_filt, q_y_train_filt, q_vectorizer_filt, num_samples, max_features, q_X_valid_filt, q_y_valid_filt):

    filename = f"cached_classifier_ns{num_samples}_mf{max_features}_sw_qLIME_filtered_NN_classifier_seed0.pkl"
    if os.path.exists(filename):
        print("Loading cached logistic from", filename)
        with open(filename, 'rb') as f:
            q_clNN_filt = pickle.load(f)
    else:
        print("No cached classifier found. Training a qlime new one...")
        q_clNN_filt = train_NN_classifier(q_X_train_filt, q_y_train_filt, q_X_valid_filt, q_y_valid_filt, q_vectorizer_filt)
        with open(filename, 'wb') as f:
            pickle.dump(q_clNN_filt, f)
        print("Cached classifier saved as", filename)
    return q_clNN_filt




# CLASSICAL LIME

In [8]:
def run_classical_lime(
    text_sample, clNN, vectorizer,  
    k_features=20, num_samples=50   
):
    
    """
    Runs classical LIME on a single text instance.
    Returns the top (word, weight) pairs.
    """
    class_names = ["negative", "positive"]
    explainer = LimeTextExplainer(class_names=class_names, feature_selection="auto")
    
    #@tf.function(reduce_retracing=True)
    def predict_proba(texts):
        bow = vectorizer.transform(texts) 
        #print('shaspe of box', bow.shape, ', text_sample:',text_sample, 'features: ', k_features, 'samples: ', num_samples)
        # print(bow)
        proba = clNN.predict(bow.toarray(), verbose=None)
        if proba.ndim == 1:  # If 1D, reshape to (num_samples, 1)
            proba = proba.reshape(-1, 1)
        #print('proba', proba, 'dimension', proba.shape, 'return', np.hstack((1 - proba, proba)))
        return np.column_stack((1 - proba, proba))  # Return probabilities for both classes
        
        

    explanation = explainer.explain_instance(
        text_sample,
        predict_proba,
        num_features=k_features,
        num_samples=num_samples 
    )
    return explanation.as_list() 

# Q-LIME pi (probably full of bugs)

In [9]:
def encode_and_flip(features, flip_index=None, shots=None):
    """
    Encode features -> quantum circuit.
    FLIP ONLY if bit == 1 at flip_index (1->0).
    """
    num_qubits = len(features)
    dev = qml.device("default.qubit", wires=num_qubits, shots=shots)

    @qml.qnode(dev)
    def circuit():
        for i, f in enumerate(features):
            if i == flip_index and f == 1:   #;;  This line is the original code commented out
                # 1->0 => RY(0),
                theta = 0
                #My suggestion: 
                #theta = f * (np.pi / 2)            
                #qml.PauliX(wires=i)
            else:
                theta = f * (np.pi / 2) 
 
            qml.RY(theta, wires=i)
            
        return qml.probs(wires=range(num_qubits))

    return circuit()

def sample_state(probabilities):
    """
    Sample an integer state index from the distribution.
    """
    r = random.random()
    cumsum = 0.0
    for idx, p in enumerate(probabilities):
        cumsum += p
        if r <= cumsum:
            return idx
    return len(probabilities) - 1

def measure_and_map_to_classical(features, flip_index=None, shots=None):
    """
    Run the circuit, measure, return a binary array for the top-likelihood state.
    """
    probs = encode_and_flip(features, flip_index=flip_index, shots=shots)
    measured_state = sample_state(probs)
    num_qubits = len(features)
    bin_string = f"{measured_state:0{num_qubits}b}"
    return [int(bit) for bit in bin_string]


#@tf.function(reduce_retracing=True)
def predict_clNN(features, clNN):
    features = np.array(features).reshape(1, -1)
    proba = clNN.predict(features, verbose=None)  # Use your NN's prediction method.
    if proba.ndim == 1:
        proba = proba.reshape(-1, 1)
    # Assuming the network returns the probability of the positive class.
    return proba[0][0]

def quantum_lime_explanation(
    features, clNN, shots=None  
):
    """
    Flip only features that are 1 -> 0.
    Return array of shape (n_features,) with:
       Delta f_k = (original_pred - new_pred).
    """

    original_pred = predict_clNN(features, clNN)
    contributions = []

    def flip_and_predict(i):
        new_vec = measure_and_map_to_classical(features, flip_index=i, shots=None)
        new_pred = predict_clNN(new_vec, clNN)
        return original_pred - new_pred

    # Flip only bits that are 1
    with ThreadPoolExecutor() as executor:
        futures = {
            executor.submit(flip_and_predict, i): i
            for i, val in enumerate(features) if val == 1 # ;  This is the original code commented out 
        }
        for future in futures:
            i = futures[future]
            score = future.result()
            contributions.append((i, score))

    return contributions

# EXPERIMENTAL ROUTINE

In [10]:

def run_experiment(
    num_samples=500,
    min_df=1,
    max_features=15,

    stopwords_option=True,
    lime_num_samples=300,
    stop_words = 'english',
    max_features_filt = 20


):

    X_train, X_test, y_train, y_test, vectorizer = load_imdb_subset(
        num_samples=num_samples,
        min_df=min_df,
        max_features=max_features,
        stopwords_option=stopwords_option,
        stop_words = stop_words
        )

    clNN = get_cached_NN(X_train, y_train, vectorizer, num_samples, max_features, stop_words, X_test, y_test)

    X_test_bow = vectorizer.transform(X_test)
    y_test = y_test.reshape(-1, 1)
    test_acc = accuracy_score(y_test, clNN.predict(X_test_bow, verbose=None) > 0.5)

    instance_local_accuracies = []
    q_instance_local_accuracies = []

    print("Shape of y_train:", y_train.shape)
    print("Shape of y_test:", y_test.shape)
    X_all = X_train + X_test
    y_all = np.concatenate([y_train, y_test.ravel()])



    word_weights_unf = defaultdict(list)
    q_word_weights_unf = defaultdict(list)


    for idx in range(len(X_all)):
        text_sample = X_all[idx]
        y_true = y_all[idx]


        explanation_lime_unfiltered = run_classical_lime(
            text_sample, clNN, vectorizer,
            k_features=max_features, num_samples=lime_num_samples
        )

        bow = vectorizer.transform([text_sample])
        ###bin_features = bow.toarray()[0] #This is used in qlime





        contributions_unfiltered_abs = [(word, abs(score)) for word, score in explanation_lime_unfiltered]
        # print('LSample', idx, "contributions", contributions_unfiltered_abs)

        for word, weight in contributions_unfiltered_abs:

            word_weights_unf[word].append(weight)

        global_avg_weights_unf = {word: sum(weights) / len(weights) for word, weights in word_weights_unf.items()}

        threshold = 0.005


        rubish_words = {word: avg for word, avg in global_avg_weights_unf.items() if avg <= threshold}
        rubish_words_cleaned = {str(word): round(avg, 4) for word, avg in rubish_words.items()}

        filtered_words = {word: avg for word, avg in global_avg_weights_unf.items() if avg >= threshold}
        filtered_words_cleaned = {str(word): round(avg, 4) for word, avg in filtered_words.items()}

        #qlime!!!!!!

        sample_vectorizer = CountVectorizer(
        stop_words=stop_words,
        max_features=max_features,
        binary=True  # Needed for QLIME
    )
        sample_vectorizer.fit([text_sample]) #Aparentlyu I have to fit the vectorizer for each sample, otherwise it will not work globally
        #Lime seems to do it automatically
        bow = sample_vectorizer.transform([text_sample])
        bin_features = bow.toarray()[0]

    # Get feature names for debugging
        all_qfeatures = sample_vectorizer.get_feature_names_out()
        # print(f"QSample {idx}: Vectorizer features:", all_qfeatures)

        explanation_qlime_unfiltered = quantum_lime_explanation(
            features=bin_features, clNN=clNN, shots=None)

        for idx, score in explanation_qlime_unfiltered:
            qword = all_qfeatures[idx]
            #print("QLIME Explanation (raw):", explanation_qlime_unfiltered)
            # print("QLIME Explanation (word):", qword, "Score:", score)
            q_word_weights_unf[qword].append(abs(score))


        global_avg_weights_qlime_unf = {qword: sum(qweights) / len(qweights) for qword, qweights in q_word_weights_unf.items()}

        qthreshold = 0.05
        q_rubish_words = {qword: avg for qword, avg in global_avg_weights_qlime_unf.items() if avg <= qthreshold}
        q_rubish_words_cleaned = {str(qword): round(avg, 4) for qword, avg in q_rubish_words.items()}

        q_filtered_words = {qword: avg for qword, avg in global_avg_weights_qlime_unf.items() if avg >= qthreshold}
        q_filtered_words_cleaned = {str(qword): round(avg, 4) for qword, avg in q_filtered_words.items()}






    #print('filtered_words', filtered_words_cleaned, 'rubish_words:', rubish_words_cleaned)

    features_filt = len(global_avg_weights_unf) - len(rubish_words_cleaned)


    #lime
    X_train_filt, X_test_filt, y_train_filt, y_test_filt, vectorizer_filt = load_imdb_subset(
        num_samples=num_samples,
        min_df=min_df,
        max_features= max_features,
        stopwords_option=stopwords_option,
        stop_words = stop_words + list(rubish_words.keys())
        )

    clNN_filtered = get_cached_NN_filt(X_train_filt, y_train_filt, vectorizer_filt, num_samples, features_filt, X_test_filt, y_test_filt)

    # X_train_bow_filt = vectorizer_filt.transform(X_train_filt)
    # input_dim = X_train_bow_filt.shape[1]
    # print("Shape of training data:", X_train_filt.shape)
    # print("Shape of X_test_bow_filt:", X_test_bow_filt.shape)
    # print("Shape expected by model:", clNN_filtered.input_shape)
    # print("Shape of y_test_filt:", y_test_filt.shape)
    # print("Shape of predictions:", clNN_filtered.predict(X_test_bow_filt.toarray()).shape)
    # print("Model summary:")
    # clNN_filtered.summary()

    X_all_filt = X_train_filt + X_test_filt

    q_X_train_filt, q_X_test_filt, q_y_train_filt, q_y_test_filt, q_vectorizer_filt = load_imdb_subset(
        num_samples=num_samples,
        min_df=min_df,
        max_features= max_features,
        stopwords_option=stopwords_option,
        stop_words = stop_words + list(q_rubish_words.keys())
        )

    qfeatures_filt = len(global_avg_weights_qlime_unf) - len(q_rubish_words_cleaned)

    q_clNN_filtered = q_get_cached_NN_filt(q_X_train_filt, q_y_train_filt, q_vectorizer_filt, num_samples, qfeatures_filt, q_X_test_filt, q_y_test_filt)

    q_X_all_filt = q_X_train_filt + q_X_test_filt


    #for idx in range(len(q_X_all_filt)):

    max_len = max(len(X_all_filt), len(q_X_all_filt))

    word_weights = defaultdict(list)
    q_word_weights = defaultdict(list)

    for idx in range(max_len):

        if idx < len(X_all_filt):
            text_sample_filt = X_all_filt[idx]

            explanation_lime_filtered = run_classical_lime(
                text_sample_filt, clNN_filtered, vectorizer_filt,
                k_features=features_filt, num_samples=lime_num_samples
            )

            bow_filt = vectorizer_filt.transform([text_sample_filt])

            y_pred = clNN_filtered.predict(bow_filt.toarray(), verbose=None)[0].item()
            y_pred_label = 1 if y_pred >= 0.5 else 0

            instance_accuracy = int(y_pred_label == y_true)

            instance_local_accuracies.append(instance_accuracy)

            contributions_filtered_abs = [(word, abs(score)) for word, score in explanation_lime_filtered]

            for word, weight in contributions_filtered_abs:
                word_weights[word].append(weight)

        if idx < len(q_X_all_filt):
            text_sample_qfilt = q_X_all_filt[idx]

            q_bow_filt = q_vectorizer_filt.transform([text_sample_qfilt])
            bin_features = bow.toarray()[0]

            explanation_qlime_filtered = quantum_lime_explanation(
                features=bin_features, clNN=q_clNN_filtered, shots=None
            )



            q_y_pred = q_clNN_filtered.predict(q_bow_filt.toarray(), verbose=None)[0].item()
            q_y_pred_label = 1 if q_y_pred >= 0.5 else 0

            q_instance_accuracy = int(q_y_pred_label == y_true)

            q_instance_local_accuracies.append(q_instance_accuracy)

            contributions_qlime_filtered_abs = [
            (q_vectorizer_filt.get_feature_names_out()[i], abs(score)) for i, score in explanation_qlime_filtered
        ]




            for qword, qweight in contributions_qlime_filtered_abs:
                q_word_weights[qword].append(qweight)

    q_X_test_bow_filt = q_vectorizer_filt.transform(q_X_test_filt)
    q_test_acc_filt = accuracy_score(q_y_test_filt, q_clNN_filtered.predict(q_X_test_bow_filt.toarray(), verbose=None) > 0.5)

    X_test_bow_filt = vectorizer_filt.transform(X_test_filt)
    test_acc_filt = accuracy_score(y_test_filt, clNN_filtered.predict(X_test_bow_filt.toarray(), verbose=None) > 0.5)




    results = {
        "global_acc": np.mean(test_acc), #Global accuracy before filtering
        "num_unf_words": len(word_weights_unf), #Lime

        "num_filt_words": len(filtered_words_cleaned), #Lime
        #"local_accuracy": np.mean(instance_local_accuracies), #Lime local accuracies after filtering
        "qnum_unf_words": len(q_word_weights_unf), #QLime
        "qnum_filt_words": len(q_filtered_words_cleaned), #QLime
        "q_local_accuracy": np.mean(q_instance_local_accuracies), #QLime local accuracies after filtering



        "global_acc_filtered": np.mean(test_acc_filt), #Lime global accuracy after filtering
        "q_global_acc_filtered": np.mean(q_test_acc_filt), #Qlime global accuracy after filtering

        #"list_deleted_words": rubish_words_cleaned,  #Words to delete Lime
        #"qlist_deleted_words": q_rubish_words_cleaned,  #Words to delete Qlime           #THIS HASNT BEEN ADDED YET
        #"list_filt_words": filtered_words_cleaned,  #Words to keep Lime
        #"q_list_filt_words": q_filtered_words_cleaned,  #Words to keep Qlime           #THIS HASNT BEEN ADDED YET
    }
    return results

#print(features_filt)

In [11]:
import pandas as pd
import sys, os

sys.path.append(os.getcwd())
sys.path.append(os.path.dirname(os.getcwd()))

if __name__ == "__main__":


    # Parameter grid to systematically vary certain settings
    param_grid = {
        "num_samples": [10],
        "max_features": [12],
        "stopwords_option": [True],
        "lime_num_samples": [100],
        "stop_words": [['english']],
           
    }

    combos = list(itertools.product(*param_grid.values()))
    all_results = []

    for combo in combos:
        (num_samples_, max_features_, stopwords_, lime_samps_, stop_words_) = combo
        
        print("\n==================================")
        print(f"Running experiment with: "
              f"num_samples={num_samples_}, "
              f"max_features={max_features_}, "
              f"stopwords={stopwords_}, "
              f"lime_num_samples={lime_samps_}, "
              f"stop_words={stop_words_},")
        
        res = run_experiment(
            num_samples=num_samples_,
            max_features=max_features_,
            stopwords_option=stopwords_,
            lime_num_samples=lime_samps_,
            stop_words=stop_words_,
        )
        res_row = {
            "num_samples": num_samples_,
            "max_features": max_features_,
            "stopwords": stopwords_,
            "lime_num_samples": lime_samps_,
            #"local_accuracy": res["local_accuracy"],
            "global_acc": res["global_acc"],
            "global_acc_filtered": res["global_acc_filtered"],
            "q_global_acc_filtered": res["q_global_acc_filtered"],
            "num_unf_words": res["num_unf_words"], #Lime words
            #"list_deleted_words": res["list_deleted_words"],  #Lime words
            "num_filt_words": res["num_filt_words"], #Lime words
            #"list_filt_words": res["list_filt_words"],  #Lime words
            "qnum_unf_words": res["qnum_unf_words"],
            "qnum_filt_words": res["qnum_filt_words"],
            "q_local_accuracy": res["q_local_accuracy"],

        }

        #print("Results =>", res_row)
        all_results.append(res_row)

    # Save results to CSV
    df = pd.DataFrame(all_results)

    df.to_csv("results_expanded_flips.csv", index=False)

    print("\nAll done! Saved results to 'results_expanded_flips.csv'.")


Running experiment with: num_samples=10, max_features=12, stopwords=True, lime_num_samples=100, stop_words=['english'],
Loading cached logistic from cached_classifier_ns10_mf12_sw['english']_NN_classifier_seed0.pkl
Shape of y_train: (8,)
Shape of y_test: (2, 1)
No cached classifier found. Training a new one...


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 3s/step - accuracy: 0.3750 - loss: 0.8246 - val_accuracy: 0.5000 - val_loss: 0.7168
Epoch 2/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 331ms/step - accuracy: 0.6250 - loss: 0.6503 - val_accuracy: 0.5000 - val_loss: 0.7165
Epoch 3/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 309ms/step - accuracy: 0.5000 - loss: 0.6861 - val_accuracy: 0.5000 - val_loss: 0.7161
Epoch 4/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 313ms/step - accuracy: 0.2500 - loss: 0.7965 - val_accuracy: 0.5000 - val_loss: 0.7157
Epoch 5/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 308ms/step - accuracy: 0.2500 - loss: 0.7748 - val_accuracy: 0.5000 - val_loss: 0.7153
Epoch 6/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 310ms/step - accuracy: 0.3750 - loss: 0.8135 - val_accuracy: 0.5000 - val_loss: 0.7149
Epoch 7/100
[1m1/1[0m [32m━━━━━━━━