In [1]:
from typing import Tuple, List, Union, Any, Optional, Dict, Literal, Callable
import time
import collections
import os
import sys
sys.path.append(os.path.dirname(os.getcwd()))
sys.path.append(os.path.dirname(os.path.dirname(os.getcwd())))

from tqdm import tqdm
import numpy as np
import torch
import torch.nn as nn
from torch import Tensor, tensor
import pandas as pd
import openml

#from aeon.regression.sklearn import RotationForestRegressor
from sklearn.metrics import root_mean_squared_error, mean_absolute_error
from sklearn.model_selection import train_test_split


np.set_printoptions(precision=3, threshold=5) # Print options
device = "cuda" # torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# MNIST

In [2]:
from torchvision import datasets, transforms


def normalize_mean_std_traindata(X_train: Tensor, X_test: Tensor) -> Tuple[Tensor, Tensor]:
    mean = X_train.mean(dim=0)
    std = X_train.std(dim=0)
    X_train = (X_train - mean) / std
    X_test = (X_test - mean) / std

    X_train = torch.clip(X_train, -5, 5)
    X_test = torch.clip(X_test, -5, 5)
    return X_train, X_test


# Define a transform to normalize the data
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# Download and load the training data
mnist_path = "/home/nikita/hdd/MNIST"
trainset = datasets.MNIST(mnist_path, download=True, train=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=len(trainset), shuffle=False)

# Download and load the test data
testset = datasets.MNIST(mnist_path, download=True, train=False, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=len(testset), shuffle=False)

# Flatten the data
X_train, y_train_cat = next(iter(trainloader))
X_train = X_train.view(len(trainset), -1).to(device)
X_test, y_test_cat = next(iter(testloader))
X_test = X_test.view(len(testset), -1).to(device)

# Convert train and test labels to one-hot encoding
y_train = nn.functional.one_hot(y_train_cat, num_classes=10).float().to(device)
y_test = nn.functional.one_hot(y_test_cat, num_classes=10).float().to(device)
y_train_cat = y_train_cat.to(device)
y_test_cat = y_test_cat.to(device)

# Normalize by mean and std
X_train, X_test = normalize_mean_std_traindata(X_train, X_test)
print(f"Train data shape: {X_train.shape}")
print(f"Train labels shape: {y_train.shape}")
print(f"Test data shape: {X_test.shape}")
print(f"Test labels shape: {y_test.shape}")

Train data shape: torch.Size([60000, 784])
Train labels shape: torch.Size([60000, 10])
Test data shape: torch.Size([10000, 784])
Test labels shape: torch.Size([10000, 10])


# Logistic Regression

In [3]:
from models.models import LogisticRegression, FittableModule


class LogisticRegression(FittableModule):
    def __init__(self, 
                 in_dim: int,
                 out_dim: int = 10,
                 l2_reg: float = 0.001,
                 lr: float = 1.0,
                 max_iter: int = 100,
                 ):
        super(LogisticRegression, self).__init__()
        self.linear = nn.Linear(in_dim, out_dim)
        self.l2_reg = l2_reg
        self.lr = lr
        self.max_iter = max_iter

        if out_dim > 1:
            self.loss = nn.functional.cross_entropy #this is with logits
        else:
            self.loss = nn.functional.binary_cross_entropy_with_logits


    def fit(self, 
            X: Tensor, 
            y: Tensor,
            init_W_b: Optional[Tuple[Tensor, Tensor]] = None,
            ):
        
        # No onehot encoding
        if y.dim() > 1:
            y_labels = torch.argmax(y, dim=1)
        else:
            y_labels = y

        # Put model on device
        device = X.device
        self.to(device)

        # Initialize weights and bias
        if init_W_b is not None:
            W, b = init_W_b
            self.linear.weight.data = W
            self.linear.bias.data = b
        else:
            nn.init.kaiming_normal_(self.linear.weight)
            nn.init.zeros_(self.linear.bias)
            
        with torch.enable_grad():
            # Optimize
            optimizer = torch.optim.LBFGS(self.linear.parameters(), lr=self.lr, max_iter=self.max_iter)
            def closure():
                optimizer.zero_grad()
                logits = self.linear(X)
                loss = self.loss(logits, y_labels)
                loss += self.l2_reg * torch.linalg.norm(self.linear.weight)**2
                loss.backward()
                return loss
            optimizer.step(closure)
        return self


    def forward(self, X: Tensor) -> Tensor:
        return self.linear(X)


model = LogisticRegression(
        in_dim = 784,
        out_dim = 10,
        l2_reg = 0.001,
        max_iter = 300,
    )
X_train_pred = model.fit_transform(X_train, y_train_cat)
X_test_pred = model(X_test)

print("X_test_pred", X_test_pred)

train_accuracy = (torch.argmax(X_train_pred, dim=1) == y_train_cat).float().mean().item()
test_accuracy = (torch.argmax(X_test_pred, dim=1) == y_test_cat).float().mean().item()

print(f"Train accuracy: {train_accuracy}")
print(f"Test accuracy: {test_accuracy}")

X_test_pred tensor([[ -0.2510, -10.2403,   0.3849,  ...,  11.1271,   0.0377,   3.3296],
        [  5.8885,   1.3842,  13.1973,  ..., -18.7710,   4.5807, -11.9262],
        [ -5.7043,   6.3781,   1.9346,  ...,   0.8382,   0.3691,  -1.5643],
        ...,
        [ -7.5464,  -7.3031,  -2.6623,  ...,   2.3346,   4.0713,   4.8315],
        [ -2.7700,  -1.8704,  -3.1229,  ...,  -4.0766,   6.4907,  -3.2410],
        [  2.6960, -10.5125,   4.8481,  ...,  -7.0335,  -0.4837,  -4.2359]],
       device='cuda:0', grad_fn=<AddmmBackward0>)
Train accuracy: 0.9335166811943054
Test accuracy: 0.9264000058174133


# GradientRFBoost

In [7]:
from models.models import FittableModule, create_layer, fit_ridge_ALOOCV, Identity

def line_search_cross_entropy(cls, X, y, G_hat):
    """Solves the line search risk minimizatin problem
    R(W, X + a * g) for mutliclass cross entropy loss"""
    # No onehot encoding
    if y.dim() > 1:
        y_labels = torch.argmax(y, dim=1)
    else:
        y_labels = y

    # Optimize
    with torch.enable_grad():
        alpha = torch.tensor([0.0], requires_grad=True, device=X.device, dtype=X.dtype)
        optimizer = torch.optim.LBFGS([alpha])
        def closure():
            optimizer.zero_grad()
            logits = cls(X + alpha * G_hat)
            loss = nn.functional.cross_entropy(logits, y_labels)
            loss.backward()
            return loss
        optimizer.step(closure)

    return alpha.detach().item()


class GradientRFBoostClassifier(FittableModule):
    def __init__(self, 
                 hidden_dim: int = 128, # TODO
                 randfeat_xt_dim: int = 128,
                 randfeat_x0_dim: int = 128,
                 out_dim: int = 10,
                 n_layers: int = 5,
                 activation: nn.Module = nn.Tanh(),
                 l2_reg: float = 1,
                 feature_type = "SWIM", # "dense", identity
                 boost_lr: float = 1.0,
                 upscale: Optional[str] = "dense",
                 max_iter: int = 100,
                 ridge_l2: float = 0.001,
                 ):
        super(GradientRFBoostClassifier, self).__init__()
        self.hidden_dim = hidden_dim
        self.randfeat_xt_dim = randfeat_xt_dim
        self.randfeat_x0_dim = randfeat_x0_dim
        self.out_dim = out_dim
        self.n_layers = n_layers
        self.activation = activation
        self.l2_reg = l2_reg
        self.feature_type = feature_type
        self.boost_lr = boost_lr
        self.upscale = upscale
        self.max_iter = max_iter
        self.ridge_l2 = ridge_l2
    

    def fit_transform(self, X: Tensor, y: Tensor):
        with torch.no_grad():
            X0 = X

            #optional upscale/downscale
            if self.upscale == "dense":
                self.upscale_fun = create_layer(self.upscale, X.shape[1], self.hidden_dim, None)
                X = self.upscale_fun.fit_transform(X, y)
            elif self.upscale == "SWIM":
                self.upscale_fun = create_layer(self.upscale, X.shape[1], self.hidden_dim, self.activation)
                X = self.upscale_fun.fit_transform(X, y)
            elif self.upscale == "identity":
                self.upscale_fun = Identity()
                self.hidden_dim = X.size(1)
            else:
                raise ValueError(f"Parameter not recoginized. Given: {self.upscale}")


            # Create classifier W_0
            cls = LogisticRegression(
                in_dim = self.hidden_dim,
                out_dim = self.out_dim,
                l2_reg = self.l2_reg,
                max_iter = self.max_iter,
            ).to(X.device)
            cls.fit(X, y)
            # save for now. for more memory efficient implementation, we can remove a lot of this
            self.classifiers = [cls]
            self.layers_fxt = []
            self.layers_fx0 = []
            self.deltas = []

            # Layerwise boosting
            N = X.size(0)
            prev_cls = None if self.upscale != "identity" else cls
            for t in range(self.n_layers):
                # Step 2: Obtain activation gradient
                # X shape (N, D) --- ResNet neurons
                # F shape (N, p) --- random features
                # y shape (N, d) --- one-hot target
                # r shape (N, D) --- residual at currect boosting iteration
                # W shape (D, d) --- top level classifier
                # probs shape (N, d) --- predicted probabilities

                # Step 1: Create random feature layer   
                fxt_fun = create_layer(self.feature_type, self.hidden_dim, self.randfeat_xt_dim, self.activation)
                fx0_fun = create_layer(self.feature_type, X0.size(1), self.randfeat_x0_dim, self.activation)
                Fxt = fxt_fun.fit_transform(X, y)
                Fx0 = fx0_fun.fit_transform(X0, y)
                F = torch.cat([Fxt, Fx0], dim=1)


                # Step 2: Obtain activation gradient and learn Delta
                probs = nn.functional.softmax(cls(X), dim=1)
                G = (y - probs) @ cls.linear.weight #negative gradient TODO divide by N?
                G = G / torch.norm(G) * N**0.5 #normalize to unit L2(mu) norm?
                # fit Least Squares to negative gradient (finding functional direction)
                Delta, Delta_b, _ = fit_ridge_ALOOCV(F, G, alphas=[self.ridge_l2])
                # Line search for risk minimization of R(W_t, Phi_t + linesearch * G_hat)
                G_hat = F @ Delta + Delta_b
                linesearch = line_search_cross_entropy(cls, X, y, G_hat)


                # Step 3: Learn top level classifier
                X = X + self.boost_lr * linesearch * G_hat
                cls = LogisticRegression(
                    in_dim = self.hidden_dim,
                    out_dim = self.out_dim,
                    l2_reg = self.l2_reg,
                    max_iter = self.max_iter,
                ).to(X.device)
                cls.fit(
                    X, 
                    y, 
                    init_W_b = (
                        (prev_cls.linear.weight.detach().clone(), 
                        prev_cls.linear.bias.detach().clone())
                        if prev_cls is not None else None
                        ) 
                )
                prev_cls = cls

                #update Delta scale
                Delta = Delta * linesearch
                Delta_b = Delta_b * linesearch

                # store
                self.layers_fxt.append(fxt_fun)
                self.layers_fx0.append(fx0_fun)
                self.deltas.append((Delta, Delta_b))
                self.classifiers.append(cls)

        return cls(X)


    def forward(self, X: Tensor) -> Tensor:
        with torch.no_grad():
            X0 = X
            if self.upscale is not None:
                X = self.upscale_fun(X)
            for fxt_fun, fx0_fun, (Delta, Delta_b) in zip(self.layers_fxt, self.layers_fx0, self.deltas):
                features = torch.cat([fxt_fun(X), fx0_fun(X0)], dim=1)
                X = X + self.boost_lr * (features @ Delta + Delta_b)
            return self.classifiers[-1](X)
        

model = GradientRFBoostClassifier(
        hidden_dim = 128,
        randfeat_xt_dim = 256,
        randfeat_x0_dim = 256,
        out_dim = 10,
        n_layers = 20,
        l2_reg = 0.000001,
        ridge_l2 = 0.000000001,
        feature_type="SWIM",
        upscale = "SWIM",
        max_iter = 300,
        boost_lr = 1.0,
    )
X_train_pred = model.fit_transform(X_train, y_train)
X_test_pred = model(X_test)

print("X_test_pred", X_test_pred)

train_accuracy = (torch.argmax(X_train_pred, dim=1) == y_train_cat).float().mean().item()
test_accuracy = (torch.argmax(X_test_pred, dim=1) == y_test_cat).float().mean().item()

print(f"Train accuracy: {train_accuracy}")
print(f"Test accuracy: {test_accuracy}")

#TODO NEXT: add xtx0 to the classification case

X_test_pred tensor([[ -0.4578,  -7.3349,  -3.2638,  ...,  19.2302,  -3.6720,   8.1709],
        [  1.8385,   4.3136,  15.2803,  ..., -14.3408,   8.0894, -12.2165],
        [ -6.2760,  12.3083,  -0.7033,  ...,   0.5448,  -0.3209,  -2.4590],
        ...,
        [ -6.0342,  -7.2200, -11.2291,  ...,   3.2755,   6.3079,   8.0130],
        [ -4.2226,  -1.5429,  -9.6328,  ...,  -0.4453,  10.5957,  -5.8550],
        [ 11.1178, -13.2252,   8.9488,  ..., -10.6908,  -1.5249,  -5.1460]],
       device='cuda:0')
Train accuracy: 0.9985499978065491
Test accuracy: 0.9706999659538269


In [5]:
def see_results_for_every_layer(X_train, X_test):
    with torch.no_grad():
        X0_train = X_train
        X0_test = X_test

        if model.upscale is not None:
            X_train = model.upscale_fun(X0_train)
            X_test = model.upscale_fun(X0_test)

        y_pred_train = model.classifiers[0](X_train)
        y_pred_test = model.classifiers[0](X_test)
        print(f"Train acc at layer 0: {torch.argmax(y_pred_train, dim=1).eq(y_train_cat).float().mean()}")
        print(f"Test acc at layer 0: {torch.argmax(y_pred_test, dim=1).eq(y_test_cat).float().mean()}")
        print()
        
        for t, (fxt_fun, fx0_fun, (Delta, Delta_b)) in enumerate(zip(model.layers_fxt, model.layers_fx0, model.deltas)):
            features_train = torch.cat([fxt_fun(X_train), fx0_fun(X0_train)], dim=1)
            features_test = torch.cat([fxt_fun(X_test), fx0_fun(X0_test)], dim=1)
            X_train = X_train + model.boost_lr * (features_train @ Delta + Delta_b)
            X_test = X_test + model.boost_lr * (features_test @ Delta + Delta_b)
            
            y_pred_train = model.classifiers[t+1](X_train)
            y_pred_test = model.classifiers[t+1](X_test)

            print(f"Train acc at layer {t+1}: {torch.argmax(y_pred_train, dim=1).eq(y_train_cat).float().mean()}")
            print(f"Test acc at layer {t+1}: {torch.argmax(y_pred_test, dim=1).eq(y_test_cat).float().mean()}")
            print()


see_results_for_every_layer(X_train, X_test)

Train acc at layer 0: 0.9192166924476624
Test acc at layer 0: 0.9194999933242798

Train acc at layer 1: 0.9477666616439819
Test acc at layer 1: 0.9449999928474426

Train acc at layer 2: 0.9585166573524475
Test acc at layer 2: 0.9540999531745911

Train acc at layer 3: 0.9649333357810974
Test acc at layer 3: 0.9585999846458435

Train acc at layer 4: 0.9692833423614502
Test acc at layer 4: 0.960099995136261

Train acc at layer 5: 0.9713667035102844
Test acc at layer 5: 0.9629999995231628

Train acc at layer 6: 0.9738500118255615
Test acc at layer 6: 0.964199960231781

Train acc at layer 7: 0.9765666723251343
Test acc at layer 7: 0.965399980545044

Train acc at layer 8: 0.9784500002861023
Test acc at layer 8: 0.9656999707221985

Train acc at layer 9: 0.9797999858856201
Test acc at layer 9: 0.9646999835968018

Train acc at layer 10: 0.9814500212669373
Test acc at layer 10: 0.9651999473571777

Train acc at layer 11: 0.982699990272522
Test acc at layer 11: 0.9660999774932861

Train acc at lay

# End2End

In [6]:
from models.models import End2EndMLPResNet

model = End2EndMLPResNet(
    in_dim = X_train.shape[1],
    hidden_dim = 128,
    bottleneck_dim = 32,
    out_dim = 10,
    n_blocks = 4,
    lr = 0.01,
    end_lr_factor = 0.01,
    n_epochs = 20,
    weight_decay = 0.001,
    batch_size = 512
    )
X_train_pred = model.fit_transform(X_train, y_train)
X_test_pred = model(X_test)

print("X_test_pred", X_test_pred)

train_accuracy = (torch.argmax(X_train_pred, dim=1) == y_train_cat).float().mean().item()
test_accuracy = (torch.argmax(X_test_pred, dim=1) == y_test_cat).float().mean().item()

print(f"Train accuracy: {train_accuracy}")
print(f"Test accuracy: {test_accuracy}")

 40%|████      | 8/20 [00:10<00:15,  1.32s/it]


KeyboardInterrupt: 