In [None]:
# import all modules here
import torch
import matplotlib.pyplot as plt
import numpy as np
import os
import torchvision.transforms as transforms
from torchvision import models

In [None]:
# os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments: True'

In [None]:
def load_data(number: int, data_type: str, part: str, X_only = False):
    """
    Loads image and label data based on the provided number, type, and part.
    Parameters:
        number (int): A number from 1 to 10, specifying the dataset part.
        data_type (str): A string, either 'eval' or 'train', specifying the dataset type.
        part (str): A string, either 'one' or 'two', specifying the dataset part.
    Returns:
        tuple: A tuple containing two arrays, images and labels if available, otherwise only images.
    Raises:
        ValueError: If `number` is not between 1 and 10, `data_type` is not 'eval' or 'train',
                    or `part` is not 'one' or 'two'.
    """

    # Check if inputs are valid
    if number not in range(1, 11):
        raise ValueError("Number must be between 1 and 10.")
    if data_type not in ["eval", "train"]:
        raise ValueError("Type must be 'eval' or 'train'.")
    if part not in ["one", "two"]:
        raise ValueError("Part must be 'one' or 'two'.")

    # Construct the path
    path = f'dataset/part_{part}_dataset/{data_type}_data/{number}_{data_type}_data.tar.pth'
    
    # Load data
    data = torch.load(path)
    images = data.get('data')  # Expected shape (2500, 32, 32, 3)
    
    if X_only == True:
        return images

    if 'targets' in data:
        labels = data['targets']  # Expected shape (2500,)
        return images, labels
    else:
        return images  # Return only images if labels are not present

In [None]:
images, labels = load_data(1, "train", "one")

In [None]:
#Number of unique labels
np.unique(labels)

In [None]:
def visualize_images(images, labels, num_images=30):
    """
    Visualizes the first `num_images` images and their corresponding labels in a grid.
    Parameters:
        images : ndarray, labels : ndarray.
        num_images (int): Number of images to display (default is 30).
    """
    # Set up the grid for displaying images
    num_images = min(num_images, len(images))
    cols = 6
    rows = (num_images + cols - 1) // cols 
    
    plt.figure(figsize=(10, rows * 2))  # Adjust figure size based on number of rows
    
    for i in range(num_images):
        plt.subplot(rows, cols, i + 1)
        plt.imshow(images[i], cmap= 'gray')
        plt.title(f"Label: {labels[i]}")
        plt.axis("off")
    
    plt.tight_layout()
    plt.show()

In [None]:
visualize_images(images, labels, 10)

In [None]:
# def convert_to_grayscale(images):
#     """
#     Converts a batch of RGB images to grayscale.
#     Parameters: images -> (N, 32, 32, 3) Returns: ndarray -> (N, 32, 32)
#     """
#     N = len(images)
#     grayscale_images = np.zeros((N, 32, 32), dtype = np.uint8)

#     for k in range(N):
#         for i in range(len(images[k])):
#             for j in range(len(images[k][0])):
#                 grayscale_images[k][i][j] = 0.3 * images[k][i][j][0] + 0.59 * images[k][i][j][1] + 0.11 * images[k][i][j][2]
    
#     return grayscale_images

# Above code is also covert_to_grayscale but it's execution time is slow

def convert_to_grayscale(images):
    N = images.shape[0] 
    grayscale_images = np.zeros((N, 32, 32), dtype=np.uint8) 

    grayscale_images = (0.2989 * images[:, :, :, 0] + 
                        0.5870 * images[:, :, :, 1] + 
                        0.1140 * images[:, :, :, 2]).astype(np.uint8)

    return grayscale_images

In [None]:
images = convert_to_grayscale(images)
images.shape

In [None]:
visualize_images(images, labels, 10)

In [None]:
 # Flatten each 32x32 image to a 1024-dimensional vector
N = images.shape[0]
flattened_images = images.reshape(N, -1) 

In [None]:
flattened_images.shape

## LwP model with Euclidean distance

In [None]:
class LwP_euclidean:
    def __init__(self, n_prototypes):
        """
        Parameters:
            n_prototypes (int): Number of distinct labels.
        """
        self.n_prototypes = n_prototypes
        self.prototypes = None
        self.labels = None
        self.class_counts = None  # Track the count of samples per class
        self.label_to_index = {}  # Maps each label to an index in the prototypes array

    def fit(self, X, y):
        """
        Trains the model by finding prototypes based on the training data.
        Parameters:
            X (ndarray) -> (N, 1024)
            y (ndarray): Labels -> (N,)
        """
        unique_labels = np.unique(y)
        self.labels = unique_labels
        self.prototypes = np.zeros((self.n_prototypes, X.shape[1]))  # Placeholder for prototypes
        self.class_counts = np.zeros(self.n_prototypes, dtype=int)

        # Create label-to-index mapping
        self.label_to_index = {label: idx for idx, label in enumerate(unique_labels)}

        # Calculate prototypes for each label
        for label in unique_labels:
            class_samples = X[y == label]
            if len(class_samples) > 0:
                idx = self.label_to_index[label]
                self.prototypes[idx] = np.mean(class_samples, axis=0)
                self.class_counts[idx] = len(class_samples)

    def euclidean_distance(self, a, b):
        """
        Parameters:
            a (ndarray): First vector.
            b (ndarray): Second vector.
        Returns:
            float: Euclidean distance between a and b.
        """
        return np.sqrt(np.sum((a - b) ** 2))

    def update(self, X_new, y_new):
        """
        Updates the model with new training examples.
        Parameters:
            X_new (ndarray): New samples -> (M, 1024)
            y_new (ndarray): New labels -> (M,)
        """
        for label in np.unique(y_new):
            new_samples = X_new[y_new == label]
            n_new = len(new_samples)
            if n_new > 0:
                idx = self.label_to_index.get(label)
                if idx is None:
                    raise ValueError(f"Label {label} not found in the model. Ensure that all labels are initialized in fit.")

                current_count = self.class_counts[idx]
                total_count = current_count + n_new
                new_mean = np.mean(new_samples, axis=0)
                
                # Update prototype as a weighted mean
                self.prototypes[idx] = (current_count * self.prototypes[idx] + n_new * new_mean) / total_count
                self.class_counts[idx] = total_count

    def predict(self, X):
        n_samples = X.shape[0]
        predictions = np.zeros(n_samples)

        for i in range(n_samples):
            distances = np.zeros(len(self.labels))
            for j, prototype in enumerate(self.prototypes):
                distances[j] = self.euclidean_distance(X[i], prototype)
            predictions[i] = self.labels[np.argmin(distances)]

        return predictions

In [None]:
# LwP model with euclidean distance on flattened_images
model = LwP_euclidean(n_prototypes=10)
model.fit(flattened_images, labels)

# loading eval dataset
X_eval, y_eval = load_data(1, "eval", "one")
X_eval = convert_to_grayscale(X_eval)
X_eval = X_eval.reshape(X_eval.shape[0], -1)

# predicting on it
predictions = model.predict(X_eval)
accuracy = np.mean(predictions == np.asarray(y_eval))
print(f'Accuracy: {accuracy * 100:.2f}%')


### LwP model with Mahanolobis Distance

In [None]:
class LwP_Mahalanobis:
    def __init__(self, n_prototypes):
        """
        Parameters:
            n_prototypes (int): Number of distinct labels.
        """
        self.n_prototypes = n_prototypes
        self.prototypes = None
        self.labels = None
        self.inv_cov_matrix = None
        self.class_counts = None  # Track the count of samples per class
        self.label_to_index = {}  # Maps each label to an index in the prototypes array

    def fit(self, X, y):
        """
        Trains the model by finding prototypes based on the training data.
        Parameters:
            X (ndarray) -> (N, 1024)
            y (ndarray): Labels -> (N,)
        """
        unique_labels = np.unique(y)
        self.labels = unique_labels
        self.prototypes = np.zeros((self.n_prototypes, X.shape[1]))  # Placeholder for prototypes
        self.class_counts = np.zeros(self.n_prototypes, dtype=int)

        # Create label-to-index mapping
        self.label_to_index = {label: idx for idx, label in enumerate(unique_labels)}

        # Calculate prototypes for each label
        for label in unique_labels:
            class_samples = X[y == label]
            if len(class_samples) > 0:
                idx = self.label_to_index[label]
                self.prototypes[idx] = np.mean(class_samples, axis=0)
                self.class_counts[idx] = len(class_samples)

        # Compute the covariance matrix of the dataset and its inverse
        covariance_matrix = np.cov(X, rowvar=False)
        self.inv_cov_matrix = np.linalg.inv(covariance_matrix)

    def mahalanobis_distance(self, a, b):
        """
        Parameters:
            a (ndarray): First vector.
            b (ndarray): Second vector.
        Returns:
            float: Mahalanobis distance between a and b.
        """
        diff = a - b
        return np.sqrt(np.dot(np.dot(diff, self.inv_cov_matrix), diff.T))

    def update(self, X_new, y_new):
        """
        Updates the model with new training examples.
        Parameters:
            X_new (ndarray): New samples -> (M, 1024)
            y_new (ndarray): New labels -> (M,)
        """
        for label in np.unique(y_new):
            new_samples = X_new[y_new == label]
            n_new = len(new_samples)
            if n_new > 0:
                idx = self.label_to_index.get(label)
                if idx is None:
                    raise ValueError(f"Label {label} not found in the model. Ensure that all labels are initialized in fit.")

                current_count = self.class_counts[idx]
                total_count = current_count + n_new
                new_mean = np.mean(new_samples, axis=0)
                
                # Update prototype as a weighted mean
                self.prototypes[idx] = (current_count * self.prototypes[idx] + n_new * new_mean) / total_count
                self.class_counts[idx] = total_count

        # Update covariance matrix based on the combined data
        combined_X = np.vstack([self.prototypes, X_new])
        covariance_matrix = np.cov(combined_X, rowvar=False)
        self.inv_cov_matrix = np.linalg.inv(covariance_matrix)

    def predict(self, X):
        n_samples = X.shape[0]
        predictions = np.zeros(n_samples)

        for i in range(n_samples):
            distances = np.zeros(len(self.labels))
            for j, prototype in enumerate(self.prototypes):
                distances[j] = self.mahalanobis_distance(X[i], prototype)
            predictions[i] = self.labels[np.argmin(distances)]

        return predictions


In [None]:
# LwP model with mahalanobis distance on flattened_images
model = LwP_Mahalanobis(n_prototypes=10)
model.fit(flattened_images, labels)

# loading eval dataset
X_eval, y_eval = load_data(1, "eval", "one")
X_eval = convert_to_grayscale(X_eval)
X_eval = X_eval.reshape(X_eval.shape[0], -1)

# predicting on it
predictions = model.predict(X_eval)
accuracy = np.mean(predictions == np.asarray(y_eval))
print(f'Accuracy: {accuracy * 100:.2f}%')

## Trying new Approach by extracting features using Neural Network

In [None]:
def extract_features(images):
    """
    Extract features from a batch of CIFAR-10 images using a pre-trained ResNet18 model.

    Args:
        images (numpy.ndarray): A 4D array of shape (N, 32, 32, 3), where N is the number of images.

    Returns:
        torch.Tensor: A 2D tensor of shape (N, 512) containing the extracted features.
    """
    
    batch_size = 5
    # Step 1: Convert the input images to a tensor and apply transformations
    transform = transforms.Compose([
        transforms.ToPILImage(),  # Convert to PIL Image
        transforms.Resize(224),   # Resize to 224x224
        transforms.ToTensor(),     # Convert to tensor
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize
        # transforms.Normalize(mean=[0.4914, 0.4822, 0.4465], std=[0.247, 0.243, 0.261]), 
        transforms.RandomHorizontalFlip(0.1),
    ])

    # Transform each image and create a tensor
    transformed_images = torch.stack([transform(images[i]) for i in range(images.shape[0])])

    # Step 2: Load pre-trained ResNet18 model
    model = models.resnet34(pretrained=True)
    model = torch.nn.Sequential(*(list(model.children())[:-1]))  # Remove the classification layer
    model.eval()  # Set the model to evaluation mode

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # print(device)
    model.to(device)
    transformed_images = transformed_images.to(device)

    torch.cuda.set_per_process_memory_fraction(1.0)

    # Step 3: Feature extraction
    features_list = []
    with torch.no_grad():
        for i in range(0, transformed_images.size(0), batch_size):
            torch.cuda.empty_cache()
            batch_images = transformed_images[i : i + batch_size]  # Get features
            batch_features = model(batch_images)  # Flatten the output
            features_list.append(batch_features.view(batch_features.size(0), -1))

    return features_list

In [None]:
X_train, y_train = load_data(1, "train", "one")
X_train = [t.cpu().numpy() for t in extract_features(X_train)]
X_train = np.vstack(X_train)

In [None]:
def get_from_gpu(df):
    res = [t.cpu().numpy() for t in df]
    return np.vstack(res)

In [None]:
model = LwP(n_prototypes=10)
model.fit(X_train, y_train)

In [None]:
X_eval, y_eval = load_data(1, "eval", "one")
X_eval = get_from_gpu(extract_features(X_eval))
y_pred = model.predict(X_eval)
acc = np.mean(y_eval == y_pred)
print(f'Accuracy on eval dataset is {acc * 100:.2f}%')

In [None]:
"""
TODO
Train on D1

for i = 2 to 10
    update train on Di
        for j: i to 1
            predict Dj with ith model
"""

In [None]:
data_eval = []
data_train = []
for i in range(2, 11):
    X_e, y_e = load_data(i, 'eval', 'one')
    X_t = load_data(i, 'train', 'one')

    X_e = get_from_gpu(extract_features(X_e))
    data_eval.append( (X_e, y_e) )

    X_t = get_from_gpu(extract_features(X_t))
    data_train.append( X_t )

In [None]:
# data_eval = np.array(data_eval)
# data_train = np.array(data_train)

In [None]:
def predict(j):
    X_eval, y_eval = data_eval[j]

    y_pred = model.predict(X_eval)
    accuracy = np.mean(y_pred == y_eval)

    return f'{accuracy * 100:.2f}'

In [None]:
res = []

for i in range(2, 11):
    X = data_train[i - 2]
    res.append([])

    y_label = model.predict(X)
    model.update(X, y_label)

    for j in range(i, 0, -1):
        acc = predict(j - 2)
        res[i - 2].append(acc)

Observe dropping accuracy. Need higher accuracy on initial data

After changing model - found a sweet spot

# sufy @ Nov 7 midnight - 1.2

In [None]:
for l1 in res:
    for l2 in l1:
        print(l2, end=' ')
    print()

what I want to do here is on every new input dataset, learn the model which gives best accuracy on validation

what this runs into - computational issues. There are $10^{2500}$ possible models straight out of the bat. Even if I take it down to say, choose among the top 2 by the current model, it is still $2^{2500}$ (between about $10^{750}$ and $10^{833}$)

In [None]:
# lets do a naive run first
res = []

for i in range(1, 11):
    X_e, y_e = load_data(i, 'eval', 'two')
    X_t = load_data(i, 'train', 'two')

    X_e = get_from_gpu(extract_features(X_e))
    data_eval.append( (X_e, y_e) )

    X_t = get_from_gpu(extract_features(X_t))
    data_train.append( X_t )

for i in range(11, 21):
    X = data_train[i - 2]
    res.append([])

    y_label = model.predict(X)
    model.update(X, y_label)

    for j in range(i, 0, -1):
        acc = predict(j - 2)
        res[-1].append(acc)


In [None]:
for l1 in res:
    for l2 in l1:
        print(l2, end=' ')
    print()

naive is subpar. need better update for these datasets

In [None]:
def save_extracted_feature():
     part = ["one", "two"]
     types = ["eval", "train"]
     numbers = [1,2,3,4,5,6,7,8,9,10]

     for p in part :
          for t in types:
               for n in numbers :
                    images = load_data(n, t, p, True)
                    images = extract_features(images)
                    images_tensor = torch.from_numpy(images)

                    save_path = f'extracted_feature/part_{part}_feature/{data_type}_feature/{number}_{data_type}_feature.tar.pth'

                    torch.save(images_tensor, save_path)

save_extracted_feature()

# Task 1.2

In [None]:
mann nahi kar raha - amir