In [31]:
import math
import torch
import numpy as np

import tensorflow as tf
from keras.layers import Input, AveragePooling2D

from keras.models import Model
from keras.applications.inception_v3 import InceptionV3

from keras.applications.inception_v3 import preprocess_input as incv3_preprocess_input

from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from scipy.stats import multivariate_normal

import pandas as pd

import os

# Importing Data and feature extraction using Pretrained Inception V3

In [2]:
input_shape = (299,299,3)

def create_model_incv3():
    tf_input = Input(shape=input_shape)
    model = InceptionV3(input_tensor=tf_input, weights='imagenet', include_top=False)
    output_pooled = AveragePooling2D((8, 8), strides=(8, 8))(model.output)
    return Model(model.input, output_pooled)

create_model = create_model_incv3

batch_of_images_placeholder = tf.TensorSpec(shape=(None, 32, 32, 3), dtype=tf.uint8)

batch_size = 16

tf_resize_op = lambda images: tf.image.resize(tf.cast(images, tf.float32), input_shape[:2])

preprocess_input = incv3_preprocess_input

# Define generator
def data_generator(data, labels=None):
    def generator():
        n = data.shape[0]
        start = 0
        while start < n:
            end = min(start + batch_size, n)
            batch_of_images = data[start:end]
            batch_of_images_resized = tf_resize_op(batch_of_images)
            batch_of_images_preprocessed = preprocess_input(batch_of_images_resized)
            if labels is not None:
                batch_of_labels = labels[start:end]
            else:
                # Create dummy labels as a placeholder when no labels are provided
                batch_of_labels = np.zeros((batch_of_images_preprocessed.shape[0],))
            yield (batch_of_images_preprocessed, batch_of_labels)
            start += batch_size
    return generator

model_extract = create_model()

In [3]:
t_train_1 = torch.load('dataset/part_one_dataset/train_data/1_train_data.tar.pth')
x_train_1, y_train_1 = t_train_1['data'], t_train_1['targets'] # both numpy.ndarray
t_eval_1 = torch.load('dataset/part_one_dataset/eval_data/1_eval_data.tar.pth')
x_eval_1, y_eval_1 = t_eval_1['data'], t_eval_1['targets'] # both numpy.ndarray

x_train_1 = np.array(x_train_1)
x_eval_1 = np.array(x_eval_1)
y_train_1 = np.array(y_train_1)
y_eval_1 = np.array(y_eval_1)

n_training = x_train_1.shape[0]
n_evaling = y_eval_1.shape[0]

y_train_1 = y_train_1.flatten()
y_eval_1  = y_eval_1.flatten()

data_train_gen = data_generator(x_train_1, y_train_1)()
ftrs_training = model_extract.predict(data_train_gen, steps=math.ceil(n_training/batch_size), verbose=1)

data_eval_gen = data_generator(x_eval_1, y_eval_1)()
ftrs_evaling = model_extract.predict(data_eval_gen, steps=math.ceil(n_evaling/batch_size), verbose=1)

x_train_extracted_1 = np.array( [ftrs_training[i].flatten() for i in range(n_training)] )
x_eval_extracted_1  = np.array( [ftrs_evaling[i].flatten()  for i in range(n_evaling )] )

def extract_features(I, model_extract, data_generator, batch_size):
    # Load the training and evaluation datasets using `I`
    t_train = torch.load(f'dataset/part_one_dataset/train_data/{I}_train_data.tar.pth')
    x_train = t_train['data']  # numpy.ndarray
    t_eval = torch.load(f'dataset/part_one_dataset/eval_data/{I}_eval_data.tar.pth')
    x_eval, y_eval = t_eval['data'], t_eval['targets']  # numpy.ndarray

    # Convert data to numpy arrays
    x_train = np.array(x_train)
    x_eval = np.array(x_eval)
    y_eval = np.array(y_eval).flatten()  # Flatten y_eval for consistency

    # Get number of samples in training and evaluation sets
    n_training = x_train.shape[0]
    n_evaling = y_eval.shape[0]

    # Generate data for model extraction
    data_train_gen = data_generator(x_train, None)()
    ftrs_training = model_extract.predict(data_train_gen, steps=math.ceil(n_training / batch_size), verbose=1)

    data_eval_gen = data_generator(x_eval, None)()
    ftrs_evaling = model_extract.predict(data_eval_gen, steps=math.ceil(n_evaling / batch_size), verbose=1)

    # Extract features by flattening predictions
    x_train_extracted = np.array([ftrs_training[i].flatten() for i in range(n_training)])
    x_eval_extracted = np.array([ftrs_evaling[i].flatten() for i in range(n_evaling)])

    return x_train_extracted, x_eval_extracted, y_eval

for I in range(2,11):
    globals()[f"x_train_extracted_{I}"], globals()[f"x_eval_extracted_{I}"], globals()[f"y_eval_{I}"] = extract_features(
        I=I,
        model_extract=model_extract,
        data_generator=data_generator,
        batch_size=batch_size
    )

  t_train_1 = torch.load('dataset/part_one_dataset/train_data/1_train_data.tar.pth')
  t_eval_1 = torch.load('dataset/part_one_dataset/eval_data/1_eval_data.tar.pth')


[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m132s[0m 830ms/step
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m126s[0m 802ms/step


  t_train = torch.load(f'dataset/part_one_dataset/train_data/{I}_train_data.tar.pth')
  t_eval = torch.load(f'dataset/part_one_dataset/eval_data/{I}_eval_data.tar.pth')


[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m136s[0m 859ms/step
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m124s[0m 791ms/step
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m126s[0m 803ms/step
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m128s[0m 813ms/step
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m133s[0m 848ms/step
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m133s[0m 850ms/step
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m135s[0m 862ms/step
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m127s[0m 807ms/step
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m128s[0m 815ms/step
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m139s[0m 884ms/step
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m131s[0m 834ms/step
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m133s[0m 846ms/step
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[

# Saving extracted features and labels

In [37]:
for i in range(1, 11):
    feature = eval(f"x_train_extracted_{i}")
    folder_path = "extracted_features"
    os.makedirs(folder_path, exist_ok=True)
    file_name = f"x_train_extracted_{i}.npy"
    file_path = os.path.join(folder_path, file_name)
    np.save(file_path, feature)

for i in range(1, 11):
    feature = eval(f"x_eval_extracted_{i}")
    folder_path = "extracted_features"
    os.makedirs(folder_path, exist_ok=True)
    file_name = f"x_eval_extracted_{i}.npy"
    file_path = os.path.join(folder_path, file_name)
    np.save(file_path, feature)

for i in range(1, 2):
    feature = eval(f"y_train_{i}")
    folder_path = "extracted_features"
    os.makedirs(folder_path, exist_ok=True)
    file_name = f"y_train_{i}.npy"
    file_path = os.path.join(folder_path, file_name)
    np.save(file_path, feature)

for i in range(1, 11):
    feature = eval(f"y_eval_{i}")
    folder_path = "extracted_features"
    os.makedirs(folder_path, exist_ok=True)
    file_name = f"y_eval_{i}.npy"
    file_path = os.path.join(folder_path, file_name)
    np.save(file_path, feature)

# Loading Saved Extracted features

In [41]:
for i in range(1,11):
    globals()[f'x_eval_extracted_{i}'] = np.load(f'extracted_features/x_eval_extracted_{i}.npy')
    globals()[f'x_train_extracted_{i}'] = np.load(f'extracted_features/x_train_extracted_{i}.npy')
    globals()[f'y_eval_{i}'] = np.load(f'extracted_features/y_eval_{i}.npy')

y_train_1 = np.load('y_train_1.npy')

# Generative Classifier Continual Model

In [42]:
class GenerativeClassifierContinual:
    def __init__(self, n_classes, n_features, alpha=0.5):
        self.n_classes = n_classes
        self.n_features = n_features
        self.means = np.zeros((n_classes, n_features))
        self.covariances = np.array([np.eye(n_features) for _ in range(n_classes)])
        self.alpha = alpha  # Regularization weight

    def fit(self, X_train, y_train, prev_means=None, prev_covariances=None):
        new_means = np.zeros((self.n_classes, self.n_features))
        new_covariances = np.array([np.eye(self.n_features) for _ in range(self.n_classes)])

        # Calculate Gaussian parameters for current data
        for c in range(self.n_classes):
            class_samples = X_train[y_train == c]
            if len(class_samples) > 0:
                new_means[c] = np.mean(class_samples, axis=0)
                if len(class_samples) > 1:
                    new_covariances[c] = np.cov(class_samples, rowvar=False) + 1e-6 * np.eye(self.n_features)

        # Regularize means and covariances with the previous model's parameters
        if prev_means is not None and prev_covariances is not None:
            self.means = self.alpha * prev_means + (1 - self.alpha) * new_means
            self.covariances = self.alpha * prev_covariances + (1 - self.alpha) * new_covariances
        else:
            self.means = new_means
            self.covariances = new_covariances

    def generate_replay_data(self, num_samples_per_class=50):
        # Generate synthetic data based on current Gaussian parameters
        replay_data = []
        replay_labels = []
        for c in range(self.n_classes):
            samples = np.random.multivariate_normal(self.means[c], self.covariances[c], num_samples_per_class)
            replay_data.append(samples)
            replay_labels.append(np.full(num_samples_per_class, c))
        return np.vstack(replay_data), np.concatenate(replay_labels)

    def predict(self, X):
        # Calculate log probabilities for each class
        log_probs = np.zeros((X.shape[0], self.n_classes))
        for c in range(self.n_classes):
            distribution = multivariate_normal(mean=self.means[c], cov=self.covariances[c])
            log_probs[:, c] = distribution.logpdf(X)

        # Assign each sample to the class with the highest log probability
        predictions = np.argmax(log_probs, axis=1)
        return predictions

    def score(self, X, y):
        predictions = self.predict(X)
        accuracy = np.mean(predictions == y)
        return accuracy

scaler = StandardScaler()
pca_components = 40  # Change this to any number of components

# Initialize PCA with the chosen number of components
pca = PCA(n_components=pca_components)

# Apply standardization and PCA to D1
x_train_1_scaled = scaler.fit_transform(x_train_extracted_1)
x_train_1_pca = pca.fit_transform(x_train_1_scaled)

# Initialize the first model for D1 with dynamic n_features
n_classes = np.unique(y_train_1).size
model_list = [GenerativeClassifierContinual(n_classes, n_features=pca_components)]
model_list[0].fit(x_train_1_pca, y_train_1)

# Continual learning with Gaussian class-conditionals, prototype regularization, and generative replay
for i in range(2, 11):
    # Standardize and apply PCA to the current dataset
    x_train_scaled = scaler.transform(globals()[f'x_train_extracted_{i}'])
    x_train_pca = pca.transform(x_train_scaled)

    # Generate replay data using the previous model
    replay_data, replay_labels = model_list[-1].generate_replay_data(num_samples_per_class=50)

    # Combine replay data with current data
    x_combined = np.vstack([x_train_pca, replay_data])
    y_combined = np.concatenate([model_list[-1].predict(x_train_pca), replay_labels])

    # Train a new model with regularized Gaussian parameters
    model = GenerativeClassifierContinual(n_classes, n_features=pca_components, alpha=0.7)
    model.fit(x_combined, y_combined,
              prev_means=model_list[-1].means, prev_covariances=model_list[-1].covariances)

    # Append the new model to the model list
    model_list.append(model)

# Accuracy checking of different Models with different datasets

In [43]:
accuracy_matrix = np.zeros((10, 10))

for i, model in enumerate(model_list):
    for j in range(10):
        # Standardize and apply PCA to the evaluation dataset
        x_eval_scaled = scaler.transform(globals()[f'x_eval_extracted_{j+1}'])
        x_eval_pca = pca.transform(x_eval_scaled)

        eval_labels = globals()[f'y_eval_{j+1}']
        accuracy_matrix[i, j] = model.score(x_eval_pca, eval_labels)*100

print("Accuracy Matrix:")

accuracy_matrix = pd.DataFrame(
    accuracy_matrix,
    index=[f"Model {i+1}" for i in range(10)],
    columns=[f"Eval Dataset {j+1}" for j in range(10)]
)

print(accuracy_matrix)


Accuracy Matrix:
          Eval Dataset 1  Eval Dataset 2  Eval Dataset 3  Eval Dataset 4  \
Model 1            80.52           81.04           80.00           80.04   
Model 2            80.40           81.88           80.44           80.12   
Model 3            80.68           81.16           79.92           79.32   
Model 4            80.44           80.60           79.68           79.16   
Model 5            80.32           80.64           79.52           78.92   
Model 6            80.24           80.16           78.76           78.72   
Model 7            79.76           79.92           78.72           78.64   
Model 8            79.32           79.40           78.32           78.56   
Model 9            78.44           78.84           77.52           78.00   
Model 10           78.28           78.44           77.16           77.72   

          Eval Dataset 5  Eval Dataset 6  Eval Dataset 7  Eval Dataset 8  \
Model 1            79.16           80.48           80.32           80.

# Saving the models and transformation for next task

In [44]:
import joblib

# Save model_list, scaler, and pca
joblib.dump(model_list, 'model_list.joblib')
joblib.dump(scaler, 'scaler.joblib')
joblib.dump(pca, 'pca.joblib')


['pca.joblib']