In [1]:
import torch as torch
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from DataRead import OurData
import traceback
import struct

from kan import KAN, create_dataset

torch.set_default_dtype(torch.float32)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
input_size = 28*28 # from 28*28

In [2]:
def read_idx(filename):
    with open(filename, 'rb') as f:
        zero, dtype, dims = struct.unpack('>HBB', f.read(4))
        shape = tuple(struct.unpack('>I', f.read(4))[0] for _ in range(dims))
        return torch.tensor(np.frombuffer(f.read(), dtype=np.uint8).reshape(shape)).to(torch.float32)
    
def compress(dataset, slice = 1):
        fraction = int(784/slice)
        arr = torch.tensor([])
        for i in range(slice):
            arr = torch.cat((arr, dataset[:, i*fraction:(i+1)*fraction].sum(dim=1, keepdim=True)), dim=1)
        return arr

class OurData:
    def __init__(self):
        self.ourdataset = {}
        train_images = read_idx('/workspaces/KAN-Network/Dataset/train-images-idx3-ubyte/train-images-idx3-ubyte') #contains the training data, each data is the binary representation of an image as per the MNIST dataset
        self.ourdataset['train_input'] = (train_images).view(-1, 28*28) #reshapes the data into a 2D array, each row is an image
        test_images = read_idx('/workspaces/KAN-Network/Dataset/t10k-images-idx3-ubyte/t10k-images-idx3-ubyte') #contains the testing data, same format as train_input
        self.ourdataset['test_input'] = (test_images).view(-1, 28*28) #reshapes the data into a 2D array, each row is an image
        
        #to-do: convert the lablels into 10 element arrays for classification
        train_label = read_idx('/workspaces/KAN-Network/Dataset/train-labels-idx1-ubyte/train-labels-idx1-ubyte').unsqueeze(1) #contains the labels for the training data
        test_label = read_idx('/workspaces/KAN-Network/Dataset/t10k-labels-idx1-ubyte/t10k-labels-idx1-ubyte').unsqueeze(1) #contains the labels for the testing data
        self.ourdataset['train_label'] = torch.zeros(len(train_label), 10)
        self.ourdataset['test_label'] = torch.zeros(len(test_label), 10)
        #print(self.ourdataset['train_label'].size(), self.ourdataset['test_label'].size())
        #the code below assigns a value of 1 to the correct labels in the 10 element array, everything else is 0
        for i in range(len(train_label)):
            self.ourdataset['train_label'][i][train_label[i].long()] = 1
        for i in range(len(test_label)):
            self.ourdataset['test_label'][i][test_label[i].long()] = 1
        #print("These are our training inputs and labels")
    def __getitem__(self):
        return self.ourdataset
    def getitems(self, index, endindex = 10000, test = True):
        key = 'test_input' if test else 'train_input'
        key2 = 'test_label' if test else 'train_label'
        return [self.ourdataset[key][index:endindex], self.ourdataset[key2][index:endindex]]
    def filldata(self, m_ins, m_slices):
        ourdatas = {}
        for key in self.ourdataset:
            thisdata = self.ourdataset[key]
            isin = (key == 'train_input' or key == 'test_input')
            ourdatas[key] = thisdata if (m_slices == 784) else compress(thisdata, m_slices)[:m_ins] if isin else thisdata[:m_ins] 
        return ourdatas


In [3]:
#total train datapoints=60000, test datapoints=10000. approx time:1min/100 points model1, 30s/1000 points model2, 1min/2500 points model3
data = OurData() #Our dataset
ourdata = data.filldata(20, 784)
ourdata2 = data.filldata(200, 112)
ourdata3 = data.filldata(500, 28)
#print(ourdata['train_input'].size(), ourdata2['train_input'].size(), ourdata3['train_input'].size())

In [4]:
'''
initialize model
- refer to MultKAN.py for more information
    width: number of neurons in each layer, in order from input to output
    k: order of the spline
    seed: random seed
    grid: grid intervals/grid points (affects the accuracy of the splines/learnable functions)
'''
model = KAN(width=[784, 10, 10], grid=5, k=3, seed=0, device=device)
model2 = KAN(width=[112, 10, 10], grid=5, k=3, seed=0, device=device, ckpt_path='./model2')
model3 = KAN(width=[28, 10, 10], grid=5, k=3, seed=0, device=device, ckpt_path='./model3')

checkpoint directory created: ./model
saving model version 0.0
checkpoint directory created: ./model2
saving model version 0.0
checkpoint directory created: ./model3
saving model version 0.0


In [None]:
model(ourdata['train_input']) #forward pass of the model
model2(ourdata2['train_input']) 
model3(ourdata3['train_input']) 
#print("model pass complete")
#model.plot() #plots the model, avoid doing this since it will plot functions for all the neurons(and we have a lot since we are dealing with images)

#steps = intervals to divide the dataset and update model, epochs = how many times the entire dataset is passed through the model
modelresults = []
try:
    modelresults.append(model.fit(ourdata, opt="LBFGS", steps=25, lamb=0.001))
    modelresults.append(model2.fit(ourdata2, opt="LBFGS", steps=25, lamb=0.001))
    modelresults.append(model3.fit(ourdata3, opt="LBFGS", steps=25, lamb=0.001))
except Exception as e:
    print(e)
    traceback.print_exc()

: 

In [None]:
plt.plot(range(1, len(modelresults[0]['train_loss']) + 1), modelresults[0]['train_loss'])
plt.title('Model1 Train Loss')
plt.show()
plt.plot(range(1, len(modelresults[0]['test_loss']) + 1), modelresults[0]['test_loss'])
plt.title('Model1 Test Loss')
plt.show()

In [None]:
plt.plot(range(1, len(modelresults[1]['train_loss']) + 1), modelresults[1]['train_loss'])
plt.title('Model2 Train Loss')
plt.show()
plt.plot(range(1, len(modelresults[1]['test_loss']) + 1), modelresults[1]['test_loss'])
plt.title('Model2 Test Loss')
plt.show()

In [None]:
plt.plot(range(1, len(modelresults[2]['train_loss']) + 1), modelresults[2]['train_loss'])
plt.title('Model3 Train Loss')
plt.show()
plt.plot(range(1, len(modelresults[2]['test_loss']) + 1), modelresults[2]['test_loss'])
plt.title('Model3 Test Loss')
plt.show()

In [None]:
eval1 = model.evaluate(ourdata)
eval2 = model2.evaluate(ourdata2)
eval3 = model3.evaluate(ourdata3)
evaluation_results = [eval1, eval2, eval3]
test_losses = [[result['test_loss'] for result in evaluation_results]]

plt.scatter([i for i in range(1, 4)], test_losses, label='Test Loss')
plt.xlabel('Model Number')
plt.ylabel('Loss')
plt.title('Scatter Plot of Models')
plt.show()

In [None]:
testingdata = data.getitems(9950, 10000) 
predictions = model.forward(testingdata[0])
predictions2 = model2.forward(compress(testingdata[0], 28))
predictions3 = model3.forward(compress(testingdata[0], 3))
allpredictions = [predictions, predictions2, predictions3]

In [None]:
for i in range(3):
    plt.plot(range(len(allpredictions[i])), allpredictions[i].detach().numpy(), label=f'Model{i+1} Predictions')
plt.plot(range(len(testingdata[1])), testingdata[1].detach().numpy(), label='Actual')
plt.xlabel('Input Number')
plt.ylabel('Output')
plt.title('Model Predictions vs Actual')
plt.legend()
plt.show()

In [None]:
error = [0, 0, 0]
correct = [0, 0, 0]
for i in range(len(testingdata[1])):
    print(i, testingdata[1][i].item(), end="\t")
    for j in range(3):
        print(j+1, allpredictions[j][i].item(), end="\t")
        difference = abs(allpredictions[j][i].item()-i)
        error[j] += difference
    print()
for j in range(3):
    error[j] /= len(testingdata[1])
    correct[j] = 100 - 100*(error[j] if error[j] < 1 else 1)

plt.scatter([i for i in range(1, 4)], error, label='Average Error') 
plt.scatter([i for i in range(1, 4)], correct, label='Accuracy') 
plt.xlabel('Model Number')
plt.title('Scatter Plot of Models')
plt.legend()
print(error)
print(correct)


In [None]:
#a different method of calculating error, convert into a classification problem
#figure out how to save and load models
#experiment with parameters for better accuracy - grid, k, width/length, lamb, steps, lossfn, 