# Second Kaggle Competition Report - IFT6390B
**Students:** Nadia Gonzalez Fernandez and Yorguin José Mantilla Ramos

**Team:** "Nadia + Yorguin"


## Milestone 1

### Feature Design
#### PCA

*Resource*: https://www.askpython.com/python/examples/principal-component-analysis

This code implements a custom Principal Component Analysis (PCA) function from scratch. This allows for dimensionality reduction by transforming the original features into a new, smaller set of uncorrelated features that capture the most variance in the data.

In [1]:
def pca(X, n_components):
    X_centered = X - np.mean(X, axis=0)
    n_samples = X.shape[0]
    cov_matrix = np.dot(X_centered.T, X_centered) / (n_samples - 1)

    # Compute eigenvectors and eigenvalues
    eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)

    # Sort eigenvalues and eigenvectors in descending order
    idx = np.argsort(eigenvalues)[::-1]
    eigenvalues = eigenvalues[idx]
    eigenvectors = eigenvectors[:, idx]

    # Select top components
    components = eigenvectors[:, :n_components]

    X_reduced = np.dot(X_centered, components)

    return X_reduced, components

#### Stratified Sampling

This function implements a stratified sampling technique designed to handle imbalanced datasets while maintaining the proportional representation of classes. It calculates the number of samples to draw from each class based on their original distribution, ensuring that the sampling reflects the original class ratios. The method distributes the total desired sample size across classes, taking into account the available samples in each class and avoiding over-sampling.

In [2]:
def stratified_sampling(X, y, sample_size, random_seed=42):
    np.random.seed(random_seed)
    classes, class_counts = np.unique(y, return_counts=True)
    n_classes = len(classes)

    # Calculate target samples per class while accounting for class sizes
    total_samples = len(y)
    class_ratios = class_counts / total_samples
    target_samples_per_class = np.floor(sample_size * class_ratios).astype(int)

    # Distribute any remaining samples
    remaining_samples = sample_size - np.sum(target_samples_per_class)
    if remaining_samples > 0:
        sorted_class_indices = np.argsort(class_counts)[::-1]
        for idx in sorted_class_indices:
            if remaining_samples <= 0:
                break
            available_samples = class_counts[idx] - target_samples_per_class[idx]
            samples_to_add = min(remaining_samples, available_samples)
            target_samples_per_class[idx] += samples_to_add
            remaining_samples -= samples_to_add

    # Collect samples from each class
    indices = []
    for i, cls in enumerate(classes):
        cls_indices = np.where(y == cls)[0]
        n_samples = min(target_samples_per_class[i], len(cls_indices))
        sampled_indices = np.random.choice(cls_indices, size=n_samples, replace=False)
        indices.extend(sampled_indices)

    # Shuffle
    indices = np.array(indices)
    np.random.shuffle(indices)

    return X[indices], y[indices]

## SVM

*Reference:* https://towardsdatascience.com/implement-multiclass-svm-from-scratch-in-python-b141e43dc084

In [3]:
import numpy as np
from scipy.spatial.distance import cdist


class LabelBinarizer:
    def __init__(self):
        self.classes_ = None

    def fit_transform(self, y):
        self.classes_ = np.unique(y)
        n_samples = len(y)
        n_classes = len(self.classes_)
        Y = np.zeros((n_samples, n_classes))

        for i, cls in enumerate(self.classes_):
            Y[:, i] = (y == cls)
        return Y

    def inverse_transform(self, Y):
        return self.classes_[np.argmax(Y, axis=1)]


class BinarySVM:
    def __init__(self, kernel, C=1, tol=1e-3, max_iter=1000, class_weight=None):
        self.kernel = kernel
        self.C = C
        self.tol = tol
        self.max_iter = max_iter
        self.class_weight = class_weight
        self.alpha = None
        self.b = 0
        self.support_vectors = None
        self.support_vector_labels = None

    def _compute_sample_weights(self, y):
        if self.class_weight is None:
            return np.ones(len(y))
        elif self.class_weight == 'balanced':
            # Compute balanced weights
            unique_classes = np.unique(y)
            class_weights = {}
            n_samples = len(y)
            for cls in unique_classes:
                class_weights[cls] = n_samples / (len(unique_classes) * np.sum(y == cls))
        else:
            class_weights = self.class_weight

        sample_weights = np.ones(len(y))
        for cls, weight in class_weights.items():
            sample_weights[y == cls] = weight
        return sample_weights

    def fit(self, X, y):
        n_samples = X.shape[0]
        self.alpha = np.zeros(n_samples)
        self.b = 0
        sample_weights = self._compute_sample_weights(y)
        K = self.kernel(X, X)
        y = y.astype(float)
        
        for iteration in range(self.max_iter):
            alpha_prev = self.alpha.copy()
            predictions = np.dot(K, self.alpha * y) + self.b
            margins = y * predictions
            mask = margins < 1
            updates = np.zeros(n_samples)
            updates[mask] = self.C * sample_weights[mask] * (1 - margins[mask])
            self.alpha += updates
            self.alpha = np.minimum(np.maximum(self.alpha, 0), self.C * sample_weights)
            weight_sum = np.sum(sample_weights)
            self.b = np.sum(sample_weights * (y - np.dot(K, self.alpha * y))) / weight_sum
            diff = np.linalg.norm(self.alpha - alpha_prev)
            
            if diff < self.tol:
                print(f"Converged in {iteration} iterations")
                break

        # Store support vectors
        sv_mask = self.alpha > 1e-5
        self.support_vectors = X[sv_mask]
        self.support_vector_labels = y[sv_mask]
        self.alpha = self.alpha[sv_mask]
        return self

    def predict(self, X):
        if self.support_vectors is None:
            raise ValueError("Model not trained yet!")
        K = self.kernel(X, self.support_vectors)
        return np.sign(np.dot(K, self.alpha * self.support_vector_labels) + self.b)


class MulticlassSVM:
    def __init__(self, kernel='linear', C=1, tol=1e-3, max_iter=500, class_weight=None):
        self.kernel_name = kernel
        self.kernel = self._get_kernel(kernel)
        self.C = C
        self.tol = tol
        self.max_iter = max_iter
        self.class_weight = class_weight
        self.label_binarizer = LabelBinarizer()
        self.classifiers = []

    def _get_kernel(self, kernel_name):
        if kernel_name == 'linear':
            return lambda x, y: np.dot(x, y.T)
        elif kernel_name == 'polynomial':
            return lambda x, y, Q=3: (1 + np.dot(x, y.T)) ** Q
        elif kernel_name == 'rbf':
            return lambda x, y, γ=1: np.exp(-γ * cdist(x, y, 'sqeuclidean'))
        else:
            raise ValueError(f"Invalid kernel: {kernel_name}")

    def fit(self, X, y):
        # Convert to binary problems all at once
        y_binary = self.label_binarizer.fit_transform(y)
        n_classes = y_binary.shape[1]

        # Calculate class weights for multiclass case if 'balanced'
        if self.class_weight == 'balanced':
            n_samples = len(y)
            class_weights = {}
            for i, cls in enumerate(self.label_binarizer.classes_):
                class_weights[1] = n_samples / (2 * np.sum(y == cls))  # weight for positive class
                class_weights[-1] = n_samples / (2 * np.sum(y != cls))  # weight for negative class
        else:
            class_weights = self.class_weight

        # Train all binary classifiers
        self.classifiers = []
        for i in range(n_classes):
            print(f"Training classifier for class {i}")
            clf = BinarySVM(
                kernel=self.kernel,
                C=self.C,
                tol=self.tol,
                max_iter=self.max_iter,
                class_weight=class_weights
            )
            clf.fit(X, 2 * y_binary[:, i] - 1)  # Convert to {-1, 1}
            self.classifiers.append(clf)
        return self

    def predict(self, X):
        n_samples = X.shape[0]
        n_classes = len(self.classifiers)
        predictions = np.zeros((n_samples, n_classes))

        for i, clf in enumerate(self.classifiers):
            predictions[:, i] = clf.predict(X)

        return self.label_binarizer.inverse_transform(predictions > 0)

    def score(self, X, y):
        return np.mean(self.predict(X) == y)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pickle
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from svm2 import MulticlassSVM



def stratified_sampling(X, y, sample_percentage, random_seed=42):
    np.random.seed(random_seed)
    classes, class_counts = np.unique(y, return_counts=True)

    indices = []
    for cls in classes:
        cls_indices = np.where(y == cls)[0]
        # Calculate the number of samples for the current class
        cls_sample_count = max(1, int(len(cls_indices) * sample_percentage))
        # Sample indices for the current class
        sampled_indices = np.random.choice(cls_indices, size=cls_sample_count, replace=False)
        indices.extend(sampled_indices)

    indices = np.array(indices, dtype=int)  # Ensure indices are integers
    np.random.shuffle(indices)

    return X[indices], y[indices]


def test_hyperparameters(X_train, y_train, X_val, y_val, hyperparams):
    results = []
    for params in hyperparams:
        print(f"Testing with hyperparameters: {params}")

        # Perform sampling if sampling percentage is specified
        if 'sample_percentage' in params:
            X_sampled, y_sampled = stratified_sampling(X_train, y_train, params['sample_percentage'])
        else:
            X_sampled, y_sampled = X_train, y_train

        # Initialize the model with given hyperparameters
        model = MulticlassSVM(kernel=params['kernel'], C=params['C'], tol=params['tol'], max_iter=params['max_iter'])
        model.fit(X_sampled, y_sampled)

        # Evaluate on validation set
        y_val_pred = model.predict(X_val)
        accuracy = accuracy_score(y_val, y_val_pred)
        print(f"Validation Accuracy: {accuracy * 100:.2f}%")

        # Store results
        results.append({'params': params, 'accuracy': accuracy})
    return results


def plot_results(results):
    hyperparams = [res['params'] for res in results]
    accuracy_score = [res['accuracy'] for res in results]

    x_labels = [
        f"{params['kernel'].upper()} (C={params['C']})" for params in hyperparams
    ]
    plt.figure(figsize=(10, 6))
    plt.plot(x_labels, accuracy_score, marker='o', linestyle='-', linewidth=2, markersize=8)
    plt.title('SVM Hyperparameter Performance', fontsize=16)
    plt.xlabel('Hyperparameter Configuration', fontsize=12)
    plt.ylabel('Accuracy (%)', fontsize=12)
    plt.xticks(rotation=45, ha='right')
    for i, accuracy in enumerate(accuracy_score):
        plt.text(i, accuracy, f'{accuracy}%',
                 ha='center', va='bottom', fontweight='bold')
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.show()

print("Loading data...")
with open('train_data.pkl', 'rb') as f:
    train_data = pickle.load(f)

X = np.array(train_data['images']).reshape(len(train_data['images']), -1)
y = np.array(train_data['labels'])

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

hyperparams = [
    {'kernel': 'linear', 'C': 0.1, 'tol': 1e-4, 'max_iter': 1000, 'sample_percentage': 0.3},
    {'kernel': 'linear', 'C': 1.0, 'tol': 1e-4, 'max_iter': 1000, 'sample_percentage': 0.3},
    {'kernel': 'rbf', 'C': 1.0, 'tol': 1e-3, 'max_iter': 2000, 'sample_percentage': 0.3},
    {'kernel': 'rbf', 'C': 10.0, 'tol': 1e-4, 'max_iter': 2000, 'sample_percentage': 0.3}
]

print("Testing hyperparameters...")
results = test_hyperparameters(X_train, y_train, X_val, y_val, hyperparams)
print("Plotting results...")
plot_results(results)



Loading data...
Testing hyperparameters...
Testing with hyperparameters: {'kernel': 'linear', 'C': 0.1, 'tol': 0.0001, 'max_iter': 1000}
Training classifier for class 0


# CNN

In [4]:
import numpy as np
import pickle
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

In [5]:
class SimpleCNN:
    def __init__(self, input_shape, num_classes, lr=0.01, epochs=50, batch_size=32):
        self.input_shape = input_shape
        self.num_classes = num_classes
        self.lr = lr
        self.epochs = epochs
        self.batch_size = batch_size
        self.params = self._initialize_weights()

    def _initialize_weights(self):
        np.random.seed(42)
        filter_size = 3
        num_filters = 8
        hidden_units = 64

        # Calculate the size of the flattened layer after pooling
        conv_output_height = self.input_shape[0] - filter_size + 1  # After convolution
        conv_output_width = self.input_shape[1] - filter_size + 1
        pool_output_height = conv_output_height // 2  # After pooling
        pool_output_width = conv_output_width // 2
        flattened_size = num_filters * pool_output_height * pool_output_width

        weights = {
            "conv_filters": np.random.randn(num_filters, filter_size, filter_size) * 0.1,
            "conv_bias": np.zeros(num_filters),
            "fc_weights": np.random.randn(flattened_size, hidden_units) * 0.1,
            "fc_bias": np.zeros(hidden_units),
            "out_weights": np.random.randn(hidden_units, self.num_classes) * 0.1,
            "out_bias": np.zeros(self.num_classes)
        }
        return weights

    def _relu(self, x):
        return np.maximum(0, x)

    def _relu_derivative(self, x):
        return (x > 0).astype(float)

    def _softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=1, keepdims=True)

    def _conv_forward(self, X):
        n_samples, height, width = X.shape
        num_filters, filter_size, _ = self.params["conv_filters"].shape
        output_size = height - filter_size + 1
        conv_output = np.zeros((n_samples, num_filters, output_size, output_size))

        for i in range(output_size):
            for j in range(output_size):
                region = X[:, i:i + filter_size, j:j + filter_size]
                conv_output[:, :, i, j] = np.tensordot(region, self.params["conv_filters"], axes=([1, 2], [1, 2])) + self.params["conv_bias"]
        return self._relu(conv_output)

    def _pool_forward(self, X):
        n_samples, num_filters, height, width = X.shape
        output_size = height // 2
        pool_output = np.zeros((n_samples, num_filters, output_size, output_size))

        for i in range(output_size):
            for j in range(output_size):
                region = X[:, :, i * 2:i * 2 + 2, j * 2:j * 2 + 2]
                pool_output[:, :, i, j] = np.max(region, axis=(2, 3))
        return pool_output

    def _flatten(self, X):
        return X.reshape(X.shape[0], -1)

    def _fc_forward(self, X, weights, bias):
        return np.dot(X, weights) + bias

    def fit(self, X, y):
        n_samples = X.shape[0]
        for epoch in range(self.epochs):
            shuffled_indices = np.random.permutation(n_samples)
            X_shuffled, y_shuffled = X[shuffled_indices], y[shuffled_indices]

            for i in range(0, n_samples, self.batch_size):
                X_batch = X_shuffled[i:i + self.batch_size]
                y_batch = y_shuffled[i:i + self.batch_size]

                conv_output = self._conv_forward(X_batch)
                pool_output = self._pool_forward(conv_output)
                flat_output = self._flatten(pool_output)
                fc_output = self._relu(self._fc_forward(flat_output, self.params["fc_weights"], self.params["fc_bias"]))
                logits = self._fc_forward(fc_output, self.params["out_weights"], self.params["out_bias"])
                probs = self._softmax(logits)

                y_one_hot = np.eye(self.num_classes)[y_batch]
                loss = -np.sum(y_one_hot * np.log(probs + 1e-8)) / self.batch_size

                grad_logits = (probs - y_one_hot) / self.batch_size
                grad_fc_weights_out = np.dot(fc_output.T, grad_logits)
                grad_fc_bias_out = np.sum(grad_logits, axis=0)

                grad_fc_output = np.dot(grad_logits, self.params["out_weights"].T) * self._relu_derivative(fc_output)
                grad_fc_weights = np.dot(flat_output.T, grad_fc_output)
                grad_fc_bias = np.sum(grad_fc_output, axis=0)

                self.params["fc_weights"] -= self.lr * grad_fc_weights
                self.params["fc_bias"] -= self.lr * grad_fc_bias
                self.params["out_weights"] -= self.lr * grad_fc_weights_out
                self.params["out_bias"] -= self.lr * grad_fc_bias_out

            print(f"Epoch {epoch + 1}/{self.epochs}, Loss: {loss:.4f}")

    def predict(self, X):
        conv_output = self._conv_forward(X)
        pool_output = self._pool_forward(conv_output)
        flat_output = self._flatten(pool_output)
        fc_output = self._relu(self._fc_forward(flat_output, self.params["fc_weights"], self.params["fc_bias"]))
        logits = self._fc_forward(fc_output, self.params["out_weights"], self.params["out_bias"])
        probs = self._softmax(logits)

        return np.argmax(probs, axis=1)

    def score(self, X, y):
        predictions = self.predict(X)
        accuracy = np.mean(predictions == y)
        return accuracy


In [6]:
def callCNN(X_train, y_train, X_test):
    # Initialize and train CNN
    print("Training model...")
    cnn = SimpleCNN(input_shape=(28, 28), num_classes=4, lr=0.01, epochs=5, batch_size=32)
    cnn.fit(X_train, y_train)

    # Make predictions
    print("Making predictions...")
    y_test_pred = cnn.predict(X_test)

    train_accuracy = cnn.score(X_train, y_train)
    print(f"Training Accuracy: {train_accuracy * 100:.2f}%")

    # Save predictions
    save('cnn_submission.csv', y_test_pred)

def analyze_epochs(X_train, y_train, X_val, y_val, epoch_range=range(1, 16)):
    train_accuracies = []
    val_accuracies = []

    for epochs in epoch_range:
        print(f"Training with {epochs} epochs...")

        # Create and train model
        cnn = SimpleCNN(input_shape=(28, 28), num_classes=4,
                        lr=0.01, epochs=epochs, batch_size=32)
        cnn.fit(X_train, y_train)

        # Compute accuracies
        train_acc = cnn.score(X_train, y_train)
        val_acc = cnn.score(X_val, y_val)

        train_accuracies.append(train_acc)
        val_accuracies.append(val_acc)

    return train_accuracies, val_accuracies


def plot_epoch_performance(train_accuracies, val_accuracies, epoch_range = range(10, 13)):
    # Plot Accuracy
    plt.figure(figsize=(10, 6))
    plt.plot(epoch_range, train_accuracies, marker='o', label='Training Accuracy')
    plt.plot(epoch_range, val_accuracies, marker='o', label='Validation Accuracy')
    plt.title('Accuracy vs. Number of Epochs')
    plt.xlabel('Number of Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

In [7]:
print("Loading data...")
with open('train_data.pkl', 'rb') as f:
    train_data = pickle.load(f)
with open('test_data.pkl', 'rb') as f:
    test_data = pickle.load(f)

    # Prepare data
    print("Preparing data...")
    X = np.array(train_data['images']).reshape(len(train_data['images']), 28, 28)
    y = np.array(train_data['labels'])

    # Split the data into training and validation sets
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

    # Analyze epochs
    train_accuracies, val_accuracies = analyze_epochs(X_train, y_train, X_val, y_val)

    # Plot results
    plot_epoch_performance(train_accuracies, val_accuracies)

    # Print best epoch
    best_epoch = val_accuracies.index(max(val_accuracies)) + 1
    print(f"\nBest number of epochs: {best_epoch}")
    print(f"Best validation accuracy: {max(val_accuracies) * 100:.2f}%")

Loading data...
Preparing data...
Training with 1 epochs...


KeyboardInterrupt: 