In [5]:
import os
#os.system('pwd')

import torch
import torchhd
from torchhd.models import Centroid
from torchhd import embeddings
import torchvision
from torchvision.datasets import MNIST
from torchhd.datasets.isolet import ISOLET
from torchhd.datasets import EMGHandGestures
#from torch_geometric.datasets import TUDataset
import torchmetrics
from tqdm import tqdm
import numpy as np

import torch.nn as nn
import torch.nn.functional as F
import random

import matplotlib.pyplot as plt
import pandas as pd

from torch.utils.data import DataLoader, TensorDataset

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using {} device".format(device))

BATCH_SIZE = 1

Using cpu device


In [7]:
# # classes = []
# # modelload = "MNISTmodels/MNIST"+"_"+ str(Encoder)[17:-2]+"_quantize_"+str(q)+"_"+str(d)
# # modelload1 = "./"+modelload+".pt"
# # modelood = Centroid(d, num_classes)
# # weights = torch.load( modelload1 , map_location=torch.device('cpu'))
# # #print(weights[0])
# # modelood.weight = weights
# # #modelood.normalize(quantize=q)
# # #print ("modelood.weight after normalize  ", modelood.weight[4])
# # #classes.append(modelood.weight.cpu().detach().numpy())
# # #print ("classes.  ", classes[0], len(classes) , len(classes[0]))
# # weights = modelood.weight
# # #print ("k.  ", k[0], len(k) , len(k[0]))
def extract_sparse_nonSparse_indexes(weights):
    """
    Extracts sparse and non-sparse column indexes from a 2D list of weights.

    Args:
        weights (list of list of int/float): 2D list of weights.

    Returns:
        tuple: A tuple containing two lists:
            - nonSparse: Indexes of columns with varying values.
            - sparse: Indexes of columns with constant values.
    """
    weights_list = weights.tolist()
    nonSparse = []
    sparse = []
    num_rows = len(weights_list)

    for col_idx in range(len(weights_list[0])):
        # Get all values in the current column
        column_values = [weights_list[row_idx][col_idx] for row_idx in range(num_rows)]

        # Check if all values in the column are the same
        if len(set(column_values)) == 1:
            sparse.append(col_idx)
        else:
            nonSparse.append(col_idx)

    return nonSparse, sparse

In [None]:
class Encoder_rand(nn.Module):
    def __init__(self, out_features, size, levels):
        super(Encoder_rand, self).__init__()
        #self.flatten = torch.nn.Flatten(start_dim=-2)
        self.project = embeddings.Sinusoid(size, out_features)
        self.name="RandomProjectionEncoder"
    
    def forward(self, x):
        #x = self.flatten(x)
        sample_hv = self.project(x)
        #sample_hv = torchhd.multiset(sample_hv)
        return torchhd.hard_quantize(sample_hv)

###### =====================================BASE LEVEL=============================================

class Encoder_base(nn.Module):
    def __init__(self, out_features, size, levels):
        super(Encoder_base, self).__init__()
        #self.flatten = torch.nn.Flatten(start_dim=-2)
        self.position = embeddings.Random(size, out_features)
        self.value = embeddings.Level(levels, out_features)
        self.name="BaseLevelEncoder"

    def forward(self, x):
        #x = self.flatten(x)
        sample_hv = torchhd.bind(self.position.weight, self.value(x))
        sample_hv = torchhd.multiset(sample_hv)
        return torchhd.hard_quantize(sample_hv)


def inject_fault(classHyperVectors, bits, amount, mood):
    """
    Injects faults into the specified columns (mood is the list of column that are going to be faulty) of classHyperVectors.

    Args:
        classHyperVectors: Object containing the weight data.
        bits (int): Number of bits for the representation.
        amount (int): Number of faults to inject.
        mood (list): List of column indices where faults should be injected.

    Returns:
        torch.Tensor: Modified tensor with injected faults.
    """
    
    rng = np.random.default_rng()

    # Extract the weights data
    weight_data = classHyperVectors.weight.data.clone()
    rows, cols = weight_data.shape

    # Restrict fault injection to the mood columns
    mood_cols = torch.tensor(mood, dtype=torch.long)
    fault_index = rng.choice(
        len(mood_cols) * rows * bits, amount, replace=False
    )

    # Flatten only the mood columns
    mood_weights = weight_data[:, mood_cols].flatten()
    if bits == 1:
        mood_weights = F.relu(mood_weights).type(torch.int)
    else:
        mood_weights = mood_weights.type(torch.int)

    # Convert to bitstring representation
    bitstring = np.array(
        list(
            ''.join(
                [format(weight & (2 ** bits - 1), '0' + str(bits) + 'b') for weight in mood_weights]
            )
        )
    ).astype(int)

    # Inject faults
    bitstring[fault_index] = (bitstring[fault_index] - 1) * -1

    # Convert back from bitstring
    bitstring = bitstring.astype(str)
    modified_weights = [
        twos_comp(int("".join(bitstring[i:i + bits]), 2), bits)
        for i in range(0, len(bitstring), bits)
    ]
    modified_weights = torch.tensor(modified_weights).reshape(rows, len(mood_cols))

    # Update the original tensor with modified values in the mood columns
    result = weight_data.clone()
    result[:, mood_cols] = modified_weights

    if bits == 1:
        result *= -1
        result *= 2
        result -= 1

    return result

def twos_comp(val, bits):
    """compute the 2's complement of int value val"""
    if (val & (1 << (bits - 1))) != 0: # if sign bit is set e.g., 8bit: 128-255
        val = val - (1 << bits)        # compute negative value
    return val                         # return positive value as is

def train_model(encoder, levels, d, size, num_classes, q, dataset, train_ld, test_ld, flatten = False, runs=10):
    #train 10 times and take the best model (and encoder)
    best_accuracy = 0
    best_model = Centroid(d, num_classes)
    if encoder == "RandomProjectionEncoder":
        best_encoder = Encoder_rand(d, size, levels).to(device)
    elif encoder =="BaseLevelEncoder":
        best_encoder = Encoder_base(d, size, levels).to(device)

    for i in range(runs):
        if encoder == "RandomProjectionEncoder":
            encode = Encoder_rand(d, size, levels).to(device)
        elif encoder =="BaseLevelEncoder":
            encode = Encoder_base(d, size, levels).to(device)
            
        model = Centroid(d, num_classes)
        model = model.to(device)

        with torch.no_grad():
            for samples, labels in tqdm(train_ld, desc="Training"):
                samples = samples.to(device)
                if flatten:
                    samples = samples.flatten(start_dim=-2)
                    samples = samples.reshape(samples.shape[0], samples.shape[-1])
                labels = labels.to(device)
                samples_hv = encode(samples)
                model.add(samples_hv, labels)
        
        accuracy = torchmetrics.Accuracy("multiclass", num_classes=num_classes)
        dotsim = []
        dotsim10 = []
        classification = []
        lb = []
        with torch.no_grad():
            model.min_max_normalize(quantize = q)

            for samples, labels in tqdm(test_ld, desc="Testing"):
                samples = samples.to(device)
                if flatten:
                    samples = samples.flatten(start_dim=-2)
                    samples = samples.reshape(samples.shape[0], samples.shape[-1])
                samples_hv = encode(samples)
                outputs = model(samples_hv, dot=True)
                accuracy.update(outputs.cpu(), labels)
                dotsim.extend(torch.max(outputs.cpu(),dim=1).values.tolist())
                dotsim10.extend(np.sort(outputs.cpu().detach().numpy()))
                classification.extend(torch.argmax(outputs.cpu(),dim=1).tolist())
                lb.extend(labels.tolist())
            dotsimbase = np.array(dotsim)
            dotsimbase10 = np.array(dotsim10)
            lb = np.array(lb)
            classification = np.array(classification)
    
        #only keep the best model
        current_accuracy = accuracy.compute().item() * 100
        if current_accuracy > best_accuracy:
            best_model.weight.data = model.weight.data.detach().clone()
            best_encoder = encode
            np.save("./models/"+ dataset +"/" + encode.name + "/normal/" + "quantize_"+str(q)+"_"+str(d)+"_"+'dotsimbase10'+'.npy', dotsimbase10)
            np.save("./models/"+ dataset +"/" + encode.name + "/normal/" + "quantize_"+str(q)+"_"+str(d)+"_"+'dotsimbase'+'.npy', dotsimbase)
            np.save("./models/"+ dataset +"/" + encode.name + "/normal/" + "quantize_"+str(q)+"_"+str(d)+"_"+'classified'+'.npy', classification)
            np.save("./models/"+ dataset +"/" + encode.name + "/normal/" + "quantize_"+str(q)+"_"+str(d)+"_"+'labels'+'.npy', lb)
            torch.save(model.weight, "./models/"+ dataset + "/" + encode.name + "/" + "quantize_" + str(q) + "_"+str(d) + ".pt")
            torch.save(encode, "./models/"+ dataset + "/" + encode.name + "/" + "enc_" + "quantize_" + str(q) + "_"+str(d) + ".pt")
    return best_model, best_encoder, accuracy

In [None]:
DIMENSION = [1000, 5000, 10000]

quantize = [1, 8]

encoders = ["RandomProjectionEncoder", "BaseLevelEncoder"]

averaging = [0,10]

In [None]:
#ISOLET

# DIMENSIONS = 5000  # number of hypervector dimensions
NUM_LEVELS = 100


train_ds = ISOLET("data/", train=True, download=False)
train_ld = torch.utils.data.DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)

test_ds = ISOLET("data/", train=False, download=False)
test_ld = torch.utils.data.DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False)

num_classes = len(train_ds.classes)

for d in DIMENSION:
    for q in quantize:
        for Encoder in encoders:
            
            model, encode, accuracy = train_model(Encoder,
                                        NUM_LEVELS,
                                        d,
                                        train_ds[0][0].size(-1),
                                        num_classes,
                                        q,
                                        "isolet",
                                        train_ld,
                                        test_ld)

            #inject fault
            print("quantize_"+str(q)+"_"+str(d))
            print(f"Testing accuracy of {(accuracy.compute().item() * 100):.3f}%")
            w = model.weight.data.detach().clone()
            
            for avg in range(averaging[0], averaging[1]):
                df = pd.DataFrame(columns=[
                'Fault %', 'Accuracy'
                ])
                faultyPoints = 0

                while faultyPoints < d*num_classes*q:
                    
                    accuracies = []
                    dotsim = []
                    dotsim10 = []
                    classification = []
                    model_faulty = Centroid(d, num_classes)
                    model_faulty = model_faulty.to(device)
                    model_faulty.weight.data = w.clone()
                    
                    model_faulty.weight.data = inject_fault(model_faulty, amount=faultyPoints, bits=q).to(device)

                    accuracy = torchmetrics.Accuracy("multiclass", num_classes=num_classes)
                    with torch.no_grad():
                        # model.normalize(quantize = q)
                        for samples, labels in tqdm(test_ld, desc="Testing", disable= True):
                            samples = samples.to(device)
                            samples_hv = encode(samples)
                            outputs = model_faulty(samples_hv, dot=True)
                            accuracy.update(outputs.cpu(), labels)
                            dotsim.extend(torch.max(outputs.cpu(),dim=1).values.tolist())
                            dotsim10.extend(np.sort(outputs.cpu().detach().numpy()))
                            classification.extend(torch.argmax(outputs.cpu(),dim=1).tolist())
                    
                    dotsimbase = np.array(dotsim)
                    dotsimbase10 = np.array(dotsim10) 
                    classification = np.array(classification)

                    np.save("./models/isolet/" + encode.name + "/faulty/" + "quantize_"+str(q)+"_"+str(d)+"_"+str(avg)+"_"+"dotsimbase10"+".npy", dotsimbase10)
                    np.save("./models/isolet/" + encode.name + "/faulty/" + "quantize_"+str(q)+"_"+str(d)+"_"+str(avg)+"_"+"dotsimbase"+".npy", dotsimbase)
                    np.save("./models/isolet/" + encode.name + "/faulty/" + "quantize_"+str(q)+"_"+str(d)+"_"+str(avg)+"_"+"classified"+".npy", classification)

                    row = {
                            'Fault %': [faultyPoints/(d*num_classes*q)],
                            'Accuracy': [accuracy.compute().item() * 100],
                        }
                    faultyPoints = int(faultyPoints + (d*num_classes*q)/100)
                    row = pd.DataFrame.from_dict(row)
                    df = pd.concat([df,row], ignore_index=True)
                print(df)
                df.to_excel("./models/isolet/" + encode.name + "/faulty/quantize_"+str(q)+"_"+str(d)+"_"+str(avg)+"result.xlsx", index=False)               
        

In [None]:
#UCI_HAR_Dataset

# Paths to the dataset
train_data_path = 'data/UCI_HAR_Dataset/train/X_train.txt'
train_labels_path = 'data/UCI_HAR_Dataset/train/y_train.txt'
test_data_path = 'data/UCI_HAR_Dataset/test/X_test.txt'
test_labels_path = 'data/UCI_HAR_Dataset/test/y_test.txt'

# Load the dataset
train_data = pd.read_csv(train_data_path, delim_whitespace=True, header=None)
train_labels = pd.read_csv(train_labels_path, delim_whitespace=True, header=None)
test_data = pd.read_csv(test_data_path, delim_whitespace=True, header=None)
test_labels = pd.read_csv(test_labels_path, delim_whitespace=True, header=None)

# Convert data to PyTorch tensors
train_data_tensor = torch.tensor(train_data.values, dtype=torch.float32)
train_labels_tensor = torch.tensor(train_labels.values.squeeze(), dtype=torch.long) - 1
test_data_tensor = torch.tensor(test_data.values, dtype=torch.float32)
test_labels_tensor = torch.tensor(test_labels.values.squeeze(), dtype=torch.long) - 1

# Create TensorDatasets
train_dataset = TensorDataset(train_data_tensor, train_labels_tensor)
test_dataset = TensorDataset(test_data_tensor, test_labels_tensor)

# Create DataLoaders
train_ld = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_ld = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

NUM_LEVELS = 1000
IMG_SIZE = 561
num_classes = 7 #len(train_loader.classes)
print(num_classes)

for d in DIMENSION:
    for q in quantize:
        for Encoder in encoders:

            model, encode, accuracy = train_model(Encoder,
                                        NUM_LEVELS,
                                        d,
                                        IMG_SIZE,
                                        num_classes,
                                        q,
                                        "ucihar",
                                        train_ld,
                                        test_ld)
            
            print("quantize_"+str(q)+"_"+str(d))
            print(f"Testing accuracy of {(accuracy.compute().item() * 100):.3f}%")

            w = model.weight.data.detach().clone()
            for avg in range(averaging[0], averaging[1]):
                df = pd.DataFrame(columns=[
                'Fault %', 'Accuracy'
                ])
                faultyPoints = 0
                
                while faultyPoints < d*num_classes*q:
                    
                    accuracies = []
                    dotsim = []
                    dotsim10 = []
                    classification = []
                    model_faulty = Centroid(d, num_classes)
                    model_faulty = model_faulty.to(device)
                    model_faulty.weight.data = w.clone()
                    
                    model_faulty.weight.data = inject_fault(model_faulty, amount=faultyPoints, bits=q).to(device)

                    accuracy = torchmetrics.Accuracy("multiclass", num_classes=num_classes)
                    with torch.no_grad():
                        # model.normalize(quantize = q)
                        for samples, labels in tqdm(test_ld, desc="Testing", disable= True):
                            samples = samples.to(device)
                            samples_hv = encode(samples)
                            outputs = model_faulty(samples_hv, dot=True)
                            accuracy.update(outputs.cpu(), labels)
                            dotsim.extend(torch.max(outputs.cpu(),dim=1).values.tolist())
                            dotsim10.extend(np.sort(outputs.cpu().detach().numpy()))
                            classification.extend(torch.argmax(outputs.cpu(),dim=1).tolist())
                    
                    dotsimbase = np.array(dotsim)
                    dotsimbase10 = np.array(dotsim10) 
                    classification = np.array(classification)

                    np.save("./models/ucihar/" + encode.name + "/faulty/" + "quantize_"+str(q)+"_"+str(d)+"_"+str(avg)+"_"+"dotsimbase10"+".npy", dotsimbase10)
                    np.save("./models/ucihar/" + encode.name + "/faulty/" + "quantize_"+str(q)+"_"+str(d)+"_"+str(avg)+"_"+"dotsimbase"+".npy", dotsimbase)
                    np.save("./models/ucihar/" + encode.name + "/faulty/" + "quantize_"+str(q)+"_"+str(d)+"_"+str(avg)+"_"+"classified"+".npy", classification)

                    row = {
                            'Fault %': [faultyPoints/(d*num_classes*q)],
                            'Accuracy': [accuracy.compute().item() * 100],
                        }
                    faultyPoints = int(faultyPoints + (d*num_classes*q)/100)
                    row = pd.DataFrame.from_dict(row)
                    df = pd.concat([df,row], ignore_index=True)
                print(df)
                df.to_excel("./models/ucihar/" + encode.name + "/faulty/quantize_"+str(q)+"_"+str(d)+"_"+str(avg)+"result.xlsx", index=False)               
        

In [None]:
#MNIST dataset

transform = torchvision.transforms.ToTensor()


train_ds = MNIST("data", train=True, transform=transform, download=True)
train_ld = torch.utils.data.DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
test_ds = MNIST("data", train=False, transform=transform, download=True)
test_ld = torch.utils.data.DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False)



NUM_LEVELS = 256
IMG_SIZE = 28*28
num_classes = 10 

for d in DIMENSION:
    for q in quantize:
        for enc in encoders:
            print("quantize_"+str(q)+"_"+str(d))
            model, encode, accuracy = train_model(enc,
                                        NUM_LEVELS,
                                        d,
                                        IMG_SIZE,
                                        num_classes,
                                        q,
                                        "mnist",
                                        train_ld,
                                        test_ld,
                                        flatten=True,runs=10)
            
            print(f"Testing accuracy of {(accuracy.compute().item() * 100):.3f}%")

            w = model.weight.data.detach().clone()
            #inject fault
            for avg in range(averaging[0], averaging[1]):
                faultyPoints = 0
                w = model.weight.data.detach().clone()
                df = pd.DataFrame(columns=[
                        'Fault %', 'Accuracy'
                    ])
                while faultyPoints < d*num_classes*q:

                    accuracies = []
                    dotsim = []
                    dotsim10 = []
                    classification = []
                    model_faulty = Centroid(d, num_classes)
                    model_faulty = model_faulty.to(device)
                    model_faulty.weight.data = w.clone()
                    model_faulty.weight.data = inject_fault(model_faulty, amount=faultyPoints, bits=q).to(device)
                    
                    accuracy = torchmetrics.Accuracy("multiclass", num_classes=num_classes)
                    with torch.no_grad():
                        # model.normalize(quantize = q)
                        for samples, labels in tqdm(test_ld, desc="Testing", disable= True):
                            samples = samples.to(device)
                            samples = samples.flatten(start_dim=-2)
                            samples_hv = encode(samples)
                            outputs = model_faulty(samples_hv, dot=True)
                            accuracy.update(torch.argmax(outputs.cpu(),dim=-1).flatten(), labels)
                            dotsim.extend(torch.max(outputs.cpu(),dim=1).values.tolist())
                            dotsim10.extend(np.sort(outputs.cpu().detach().numpy()))
                            classification.extend(torch.argmax(outputs.cpu(),dim=1).tolist())
                    
                    dotsimbase = np.array(dotsim)
                    dotsimbase10 = np.array(dotsim10)
                    classification = np.array(classification)

                    np.save("./models/mnist/" + encode.name + "/faulty/" + "quantize_"+str(q)+"_"+str(d)+"_"+str(avg)+"_"+"dotsimbase10"+".npy", dotsimbase10)
                    np.save("./models/mnist/" + encode.name + "/faulty/" + "quantize_"+str(q)+"_"+str(d)+"_"+str(avg)+"_"+"dotsimbase"+".npy", dotsimbase)
                    np.save("./models/mnist/" + encode.name + "/faulty/" + "quantize_"+str(q)+"_"+str(d)+"_"+str(avg)+"_"+"classified"+".npy", classification)

                    row = {
                            'Fault %': [faultyPoints/(d*num_classes*q)],
                            'Accuracy': [accuracy.compute().item() * 100],
                        }
                    faultyPoints = int(faultyPoints + (d*num_classes*q)/100)
                    row = pd.DataFrame.from_dict(row)
                    df = pd.concat([df,row], ignore_index=True)
                print(df)
                df.to_excel("./models/mnist/" + encode.name + "/faulty/quantize_"+str(q)+"_"+str(d)+"_"+str(avg)+"result.xlsx", index=False)               
        