In [None]:
import tensorflow as tf
import numpy as np
import torch
from sklearn.metrics import accuracy_score
from scipy.spatial.distance import cdist
import pickle
import os
# Implementation using LDAuCID algorithm
def load_weights(filename: str = "loaded.npz"):
    """Load classifier prototypes (random initialization)."""
    prototypes = np.random.rand(10, 256)  
    return {
        "prototypes": prototypes
    }

def save_weights(classifier, filename: str = "saved.npz"):
    """Save classifier prototypes."""
    weights = {
        "prototypes": classifier.prototypes
    }
    with open(filename, "wb") as f:
        pickle.dump(weights, f)
    print(f"Weights saved to {filename}")

# Step 1: Load the dataset
def load_dataset(filepath: str):
    """Loads a dataset from a given file path."""
    t = torch.load(filepath)
    data, targets = t['data'], t.get('targets')  
    return data, targets


# Step 2: Feature Extraction with MobileNet
class MobileNetFeatureExtractor:
    def __init__(self, input_shape=(224, 224, 3)):
        # Load MobileNet without the top classification layer
        self.model = tf.keras.applications.MobileNet(
            weights='imagenet', include_top=False, pooling='avg', input_shape=input_shape
        )

    def preprocess(self, data: np.ndarray):
        """Preprocess raw image data to match MobileNet input requirements."""
        data = data.astype(np.float32)
        resized_data = np.array([tf.image.resize(img, (224, 224)).numpy() for img in data])
        preprocessed_data = tf.keras.applications.mobilenet.preprocess_input(resized_data)
        return preprocessed_data

    def extract(self, data: np.ndarray):
        """Extract features using MobileNet."""
        preprocessed_data = self.preprocess(data)
        features = self.model.predict(preprocessed_data, batch_size=32, verbose=1)
        return features

# Step 4: LwP Classifier with Regularization and Adaptation
class LwPClassifier:
    def __init__(self, num_classes: int, reg=0.1, adt=0.1):
        self.num_classes = num_classes
        self.prototypes = None
        self.reg = reg  # Regularization strength
        self.adt = adt  # Adaptation strength 

    def fit(self, data: np.ndarray, labels: np.ndarray):
        """Initialize prototypes based on labeled data.""" 
        self.prototypes = []
        for cls in range(self.num_classes):
            cls_data = data[labels == cls]
            if len(cls_data) > 0:
                cls_prototype = cls_data.mean(axis=0)
                self.prototypes.append(cls_prototype)
        self.prototypes = np.array(self.prototypes)

    def predict(self, data: np.ndarray):
        """Predict labels for the given data."""
        distances = cdist(data, self.prototypes)
        return np.argmin(distances, axis=1)

    def update(self, data: np.ndarray, pseudo_labels: np.ndarray, alpha=0.7):
        """Update prototypes using pseudo-labeled data with regularization and adaptation."""
        for cls in range(self.num_classes):
            cls_data = data[pseudo_labels == cls]
            if len(cls_data) > 0:
                cls_mean = cls_data.mean(axis=0)
                # Regularized update
                self.prototypes[cls] = (alpha * self.prototypes[cls] + (1 - alpha) * cls_mean) - \
                                       self.reg * (self.prototypes[cls] - cls_mean)

    def lcud(self, old_prototypes, new_prototypes, alpha=0.5):
        """Adapt the prototypes to minimize shift using internal distributions."""
        adapted_prototypes = []
        for old, new in zip(old_prototypes, new_prototypes):
            adapted_proto = alpha * old + (1 - alpha) * new
            adapted_prototypes.append(adapted_proto)
        return np.array(adapted_prototypes)

# Step 5: LCAuCID Classifier
class LCAuCIDClassifier(LwPClassifier):
    def __init__(self, num_classes: int, reg=0.1, adt=0.1):
        super().__init__(num_classes, reg, adt)

    def update(self, data: np.ndarray, pseudo_labels: np.ndarray, alpha=0.7):
        """Update prototypes with adaptation and regularization."""
        super().update(data, pseudo_labels, alpha)
        # Perform adaptation 
        self.prototypes = self.lcud(self.prototypes, self.prototypes, self.adt)

# Train and evaluate function
def train_and_evaluate(tf: list, ef: list, num_classes=10):
    n = len(tf)  
    accuracies = np.zeros((n, n)) 
    
    # Mobilenet feature extraction
    feature_extractor = MobileNetFeatureExtractor()
    classifier = LCAuCIDClassifier(num_classes=num_classes)
    
    # Randomly initialize weights
    weights = load_weights()  
    classifier.prototypes = weights["prototypes"] 

    # features and targets 
    reduced_features_list = []
    targets_list = []

    for i in range(n):
       
        data, targets = load_dataset(tf[i])
        print(f"D{i+1}")
        
        # Check targets
        if targets is None:
            print(f"Dataset D{i+1} done")
        
       
        features = feature_extractor.extract(data)
        
        # Generate pseudo-labels
        if targets is None:
            pseudo_labels = classifier.predict(features)
            classifier.update(features, pseudo_labels)  # Update prototypes using pseudo-labels
        else:
            targets = np.array(targets)
            classifier.fit(features, targets)  # Fit with actual labels

        # Store features and targets/pseudo-labels 
        reduced_features_list.append(features)
        targets_list.append(targets if targets is not None else pseudo_labels)
        
  
        save_weights(classifier, filename="loaded.npz")
        
        # Evaluate on D1 to Di-1
        for j in range(i + 1):
            eval_features = reduced_features_list[j]
            eval_targets = targets_list[j]
            predictions = classifier.predict(eval_features)
            accuracies[i, j] = accuracy_score(eval_targets, predictions)
            print(f"Evaluated on dataset D{j+1}: Accuracy = {accuracies[i, j]:.4f}")
    
    return accuracies


tf = [f"part_one_dataset/train_data/{i}_train_data.tar.pth" for i in range(1, 11)]  
ef = [f"part_one_dataset/eval_data/{i}_eval_data.tar.pth" for i in range(1, 11)] 

# D11 to D20 
tf_1 = [f"part_two_dataset/train_data/{i-10}_train_data.tar.pth" for i in range(11, 21)]  
ef_1 = [f"part_two_dataset/eval_data/{i-10}_eval_data.tar.pth" for i in range(11, 21)] 

# Merge the two
tf.extend(tf_1)
ef.extend(ef_1)

accuracies = train_and_evaluate(tf, ef)
print("Accuracy matrix:")
print(accuracies)