## Chebyshev-KAN for MNIST

In [None]:
# !pip install --upgrade scipy
# !pip install pymoo

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
import numpy as np
from ChebyKANLayer import ChebyKANLayer
import pandas as pd
from sklearn.metrics import (
            f1_score,
        )
import matplotlib.pyplot as plt
from gfo import GFOProblem, SOCallback, blocker, build_rand_blocks, get_model_params, set_model_state

In [None]:
# Construct a ChebyKAN for MNIST
class MNISTChebyKAN(nn.Module):
    def __init__(self):
        super(MNISTChebyKAN, self).__init__()
        self.chebykan1 = ChebyKANLayer(28*28, 32, 4)
        # self.ln1 = nn.LayerNorm(32) # To avoid gradient vanishing caused by tanh
        self.chebykan2 = ChebyKANLayer(32, 10, 4)

    def forward(self, x):
        x = x.view(-1, 28*28)  # Flatten the images
        x = self.chebykan1(x)
        # x = self.ln1(x)
        x = self.chebykan2(x)
        return x

In [None]:
# Load MNIST
transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)
trainset = datasets.MNIST(
    root="./data", train=True, download=True, transform=transform
)
testset = datasets.MNIST(
    root="./data", train=False, download=True, transform=transform
)
test_loader = DataLoader(testset, batch_size=10000, shuffle=False)

In [None]:
def validate(model, test_loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    f1 = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            total_loss += loss.item()
            pred = output.argmax(dim=1)
            correct += pred.eq(target.view_as(pred)).sum().item()
            f1 += f1_score(target.view_as(pred).cpu().numpy(), pred.cpu().numpy(), average='macro')


    return total_loss / len(test_loader), correct / len(test_loader.dataset), f1 / len(test_loader) # (Loss, Accuracy, F1-score)

# CMA-ES

In [None]:
from pymoo.algorithms.soo.nonconvex.cmaes import CMAES
from pymoo.optimize import minimize

# Parameter Setting
NP = 100
block_size = 100
# Define model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MNISTChebyKAN().to(device)
init_params = get_model_params(model)
D = len(init_params)
print(f"Original dims: {D} D")

import os
import pickle 

codebook = {}
if os.path.exists(f'out/codebook_D{D}_blocksize{block_size}.pkl'):
    with open(f'out/codebook_D{D}_blocksize{block_size}.pkl', 'rb') as f:
        codebook = pickle.load(f)
else:
    codebook = build_rand_blocks(D, block_size=block_size)
    
    with open(f'out/codebook_D{D}_blocksize{block_size}.pkl', 'wb') as f:
        pickle.dump(codebook, f)

bD = len(codebook)
print(f"Blocked dims: {bD} D")
# init_pop = np.random.uniform(low=-0.5, high=0.5, size=(NP, bD))
init_params = blocker(init_params, codebook)
# x0 = np.random.normal(loc=init_params, scale=1.0, size=(NP, bD))
x0 = np.random.uniform(low=-2, high=2, size=(NP, bD))


num_samples = 1024
data_loader = DataLoader(trainset, batch_size=num_samples, shuffle=True)
problem = GFOProblem(n_var=bD, model=model, dataset=trainset, test_loader=test_loader, train_loader=data_loader, 
                      set_model_state=set_model_state,
                      batch_size=num_samples, device=device, criterion="f1",
                      block=True, codebook=codebook, orig_dims=D)
out={"F": []}
problem._evaluate(np.array([x0, init_params]), out=out)
print(out)

algorithm = CMAES(
    x0=x0,
    sigma=0.1,
    tolconditioncov=0,
    tolfacupx=np.inf,
    tolupsigma=np.inf,
    tolfun=0,
    tolfunhist=0,
    tolstagnation=np.inf,
    tolx=0,
    # restarts=4, 
    # bipop=True,
)

scheme = f"4deg_block_bs{block_size}_gfo_f1_{num_samples}data_cmaes"
csv_path = f"out/{scheme}_hist.csv"
plt_path = f"out/{scheme}_plt.pdf"
df = pd.DataFrame({
            'n_step': [0],
            'n_eval': [1],
            'f_best': [out["F"][0]],
            'f_avg': [out["F"][0]],
            'f_std': [0],
            'test_f1_best': problem.test_func(x0),
        })
df.to_csv(csv_path, index=False)

res = minimize(problem,
               algorithm,
               ('n_eval', 100000),
               callback=SOCallback(k_steps=100, csv_path=csv_path, plt_path=plt_path),
               seed=None,
               verbose=True)

print("Best solution found: \nX = %s\nF = %s" % (res.X, res.F))

In [None]:
# Save the best solution model parameters state
best_X = res.X.copy()
if len(res.X) != D:
    best_X = problem.unblocker(best_X)
set_model_state(model, best_X)
torch.save(model.state_dict(), f"out/{scheme}_model.pth")

# Differential Evolution

In [None]:
from scipy.optimize import differential_evolution

# Parameter Setting
NP = 100 # Number of population
block_size = 100
# Define model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MNISTChebyKAN().to(device)
init_params = get_model_params(model)
D = len(init_params)
print(f"Original dims: {D} D")

import os
import pickle 

codebook = {}
if os.path.exists(f'out/codebook_D{D}_blocksize{block_size}.pkl'):
    with open(f'out/codebook_D{D}_blocksize{block_size}.pkl', 'rb') as f:
        codebook = pickle.load(f)
else:
    codebook = build_rand_blocks(D, block_size=block_size)
    
    with open(f'out/codebook_D{D}_blocksize{block_size}.pkl', 'wb') as f:
        pickle.dump(codebook, f)

bD = len(codebook)
print(f"Blocked dims: {bD} D")
# Population initialization
# Uniform distribution
init_pop = np.random.uniform(low=-2, high=2, size=(NP, bD))
# Normal distribution
# init_pop = np.random.normal(loc=init_params.mean(), scale=0.1, size=(NP, bD))
# Block the initial params from model
init_params = blocker(init_params, codebook)
init_pop[0] = init_params.copy()
# One center-based solution to accelerate population
init_pop[-1] = init_pop[:-1].mean(0)

# Define gradient-free optimization problem object
num_samples = 1024
data_loader = DataLoader(trainset, batch_size=num_samples, shuffle=True)
problem = GFOProblem(n_var=bD, model=model, dataset=trainset, test_loader=test_loader, train_loader=data_loader, 
                      set_model_state=set_model_state,
                      batch_size=num_samples, device=device, criterion="f1",
                      block=True, codebook=codebook, orig_dims=D)

# Save history and plot in the out paths
scheme = f"4deg_block_bs{block_size}_gfo_f1_{num_samples}data_de"
csv_path = f"out/{scheme}_hist.csv"
plt_path = f"out/{scheme}_plt.pdf"
df = pd.DataFrame({
            'n_step': [0],
            'n_eval': [1],
            'f_best': [out["F"][0]],
            'f_avg': [out["F"][0]],
            'f_std': [0],
            'test_f1_best': problem.test_func(x0),
        })
df.to_csv(csv_path, index=False)

de_callback = SOCallback(problem=problem, csv_path=csv_path, plt_path=plt_path)
result = differential_evolution(problem.scipy_fitness_func, bounds=[(-2, 2)]*bD, strategy='rand1bin', maxiter=int(1e+6)//NP, popsize=NP, init=init_pop,
                                tol=1e-3, mutation=(0.5, 1.5), recombination=0.9, seed=None,
                                callback=de_callback.scipy_func,
                                disp=True, polish=False, atol=0, updating='deferred', workers=1, vectorized=False)


print("Best solution found: \nX = %s\nF = %s" % (result.x, result.fun))

In [None]:
# Save the best solution model parameters state
best_X = result.x.copy()
if len(result.x) != D:
    best_X = problem.unblocker(best_X)
set_model_state(model, best_X)
torch.save(model.state_dict(), f"out/{scheme}_model.pth")

## Take a peek at what the Chebyshev Polynomials look like

In [None]:
def chebyshev_polynomials(x, degree):
    # T_0(x) and T_1(x)
    if degree == 0:
        return np.ones_like(x)
    elif degree == 1:
        return x
    else:
        Tn_2 = np.ones_like(x)
        Tn_1 = x
        Tn = None
        for n in range(2, degree + 1):
            Tn = 2 * x * Tn_1 - Tn_2
            Tn_2, Tn_1 = Tn_1, Tn
        return Tn

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(8, 4))
# choose the layer u want to inspect
all_coeffs = model.chebykan1.cheby_coeffs.detach().cpu().numpy()
# choose the input dim
all_coeffs = all_coeffs[28 * 28 // 2]
# Plot over output dim
for some_coeffs in all_coeffs:
    x_values = np.linspace(-1, 1, 400)
    y_values = np.zeros_like(x_values)
    for i, coeff in enumerate(some_coeffs):
        y_values += coeff * chebyshev_polynomials(x_values, i)
    
    plt.plot(x_values, y_values)
    
plt.title('Function Represented by Chebyshev Coefficients')
plt.xlabel('x')
plt.ylabel('f(x)')
plt.grid(True)
plt.show()

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(8, 4))
# choose the layer u want to inspect
all_coeffs = model.chebykan2.cheby_coeffs.detach().cpu().numpy()
# choose the input dim
all_coeffs = all_coeffs[16]
# Plot over output dim
for some_coeffs in all_coeffs:
    x_values = np.linspace(-1, 1, 400)
    y_values = np.zeros_like(x_values)
    for i, coeff in enumerate(some_coeffs):
        y_values += coeff * chebyshev_polynomials(x_values, i)
    
    plt.plot(x_values, y_values)
    
plt.title('Function Represented by Chebyshev Coefficients')
plt.xlabel('x')
plt.ylabel('f(x)')
plt.grid(True)
plt.show()

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(8, 4))
# choose the layer u want to inspect
all_coeffs = model.chebykan3.cheby_coeffs.detach().cpu().numpy()
# choose the input dim
all_coeffs = all_coeffs[8]
# Plot over output dim
for some_coeffs in all_coeffs:
    x_values = np.linspace(-1, 1, 400)
    y_values = np.zeros_like(x_values)
    for i, coeff in enumerate(some_coeffs):
        y_values += coeff * chebyshev_polynomials(x_values, i)
    
    plt.plot(x_values, y_values)
    
plt.title('Function Represented by Chebyshev Coefficients')
plt.xlabel('x')
plt.ylabel('f(x)')
plt.grid(True)
plt.show()