In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data import sampler
!pip install matplotlib
import matplotlib.pyplot as plt

print(torch.__version__)
!pip install torch==1.8.0
!pip install opencv-python
import cv2
print(torch.__version__)

import torchvision.datasets as dset
import torchvision.transforms as T
from keras.datasets import cifar10
import numpy as np, numpy.linalg

USE_GPU = True
dtype = torch.float64 # We will be using float throughout this tutorial.

if USE_GPU and torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

# Constant to control how frequently we print train loss.
print_every = 100
print('using device:', device)

1.8.0
1.8.0
using device: cpu


In [2]:

#class for forward pass and loss

class custom_multiclass_torch(torch.nn.Module):
    def __init__(self, N, P, C, K, f,a,b,c,beta, x_patches_train, x_patches_test):
        super(custom_multiclass_torch, self).__init__()

        
        #initialize several variables 
        # parameters, N = number of images, K = number of patches, P = pooling size, C = output dimension, f = filter size

        self.N = N
        self.P = P
        self.C = C 
        self.K = K
        self.f = f 
        self.a = a
        self.b = b
        self.c = c
        self.beta = beta
        self.x_patches_train = x_patches_train
        self.x_patches_test = x_patches_test
        
        
        #initialize tensor array variables as dictionaries
        
        self.Z_1_arr_train = nn.ParameterDict({  })
        self.Z_1_arr_prime_train = nn.ParameterDict({})
        self.Z_2_arr_train = nn.ParameterDict({})
        self.Z_2_arr_prime_train = nn.ParameterDict({})
        self.Z_4_arr_train = nn.ParameterDict({})
        self.Z_4_arr_prime_train = nn.ParameterDict({})
        self.Z_arr_train = {}
        self.Z_arr_prime_train = {}
        
        # fill in dictionaries with Z1 Z2 Z4 tensor parameters
        for i in range(1,K//P+1):
            for j in range(1,C+1):
                fan_in = f+1

                self.Z_1_arr_train[str(i)+','+str(j)] =  torch.nn.Parameter(data = torch.randn((3*f**2,3*f**2), device = device, dtype = dtype), requires_grad = True)
                self.Z_1_arr_prime_train[str(i)+','+str(j)] =  torch.nn.Parameter(data = torch.randn((3*f**2,3*f**2), device = device, dtype = dtype), requires_grad = True)
                self.Z_2_arr_train[str(i)+','+str(j)] =  torch.nn.Parameter(data = torch.randn((3*f**2,1), device = device, dtype = dtype), requires_grad = True)
                self.Z_2_arr_prime_train[str(i)+','+str(j)] =  torch.nn.Parameter(data = torch.randn((3*f**2,1), device = device, dtype = dtype), requires_grad = True)
                self.Z_4_arr_train[str(i)+','+str(j)] =  torch.nn.Parameter(data = torch.randn((1,1), device = device, dtype = dtype), requires_grad = True)
                self.Z_4_arr_prime_train[str(i)+','+str(j)] =  torch.nn.Parameter(data = torch.randn((1,1), device = device, dtype = dtype), requires_grad = True)
                
                self.Z_arr_train[str(i)+','+str(j)] = torch.vstack((torch.hstack((self.Z_1_arr_train[str(i)+','+str(j)], self.Z_2_arr_train[str(i)+','+str(j)])), torch.hstack((torch.transpose(self.Z_2_arr_train[str(i)+','+str(j)], 0, 1),self.Z_4_arr_train[str(i)+','+str(j)]))))
                self.Z_arr_prime_train[str(i)+','+str(j)] = torch.vstack((torch.hstack((self.Z_1_arr_prime_train[str(i)+','+str(j)], self.Z_2_arr_prime_train[str(i)+','+str(j)])), torch.hstack((torch.transpose(self.Z_2_arr_prime_train[str(i)+','+str(j)], 0, 1),self.Z_4_arr_prime_train[str(i)+','+str(j)]))))
                

    def forward(self, i):
        
        Z_1_new = {}
        Z_2_new = {}
        Z_4_new = {}
        Z_1_prime_new = {}
        Z_2_prime_new = {}
        Z_4_prime_new = {}
        Z_new = {} 
        Z_new_prime = {}
        
        
     
        #transform Z matrices into positve-semidefinite matrices
        
        #convertTo = "positive semidefinite"
        convertTo = "other"
        for key in self.Z_arr_train:
            if convertTo == "positive semidefinite":
                Z_new[key] = torch.matmul(self.Z_arr_train[key], self.Z_arr_train[key].T)+torch.eye(3*f**2+1)
            elif convertTo == "symmetric:":
                Z_new[key] = 0.5 * (self.Z_arr_train[key]+self.Z_arr_train[key].T)
            else:
                Z_new[key] = self.Z_arr_train[key]
            Z_1_new[key] = Z_new[key][:3*f**2,:3*f**2]
            Z_2_new[key] = Z_new[key][3*f**2,:3*f**2]
            Z_4_new[key] = Z_new[key][3*f**2,3*f**2]
            
        for key in self.Z_arr_prime_train:
            if convertTo == "positive semidefinite":
                Z_new_prime[key] = torch.matmul(self.Z_arr_prime_train[key], self.Z_arr_prime_train[key].T)+torch.eye(3*f**2+1)
            elif convertTo == "symmetric":
                Z_new_prime[key] = 0.5 * (self.Z_arr_prime_train[key] + self.Z_arr_prime_train[key].T)
            else:
                Z_new_prime[key] = self.Z_arr_prime_train[key]
            Z_1_prime_new[key] = Z_new_prime[key][:3*f**2,:3*f**2]
            Z_2_prime_new[key] = Z_new_prime[key][3*f**2,:3*f**2]
            Z_4_prime_new[key] = Z_new_prime[key][3*f**2,3*f**2]
        
    
        print(self.Z_1_arr_train["1,2"])
        print(self.Z_4_arr_train["1,2"])

        
        ypred = torch.zeros((C))

        
        # performing calculations for ypred scores 
        for t in range(1,C+1):
            
            constant_part = 0
            for k in range(1,K//P+1):
                constant_part += Z_4_new[str(k)+","+str(t)] - Z_4_prime_new[str(k)+","+str(t)]
            constant_part *= c

            linear_part = 0
            for k in range(1,K//P+1):
                for l in range(1,P+1):
                    linear_part += torch.matmul(torch.transpose(self.x_patches_train[i][(k-1)*P+ l-1].view(self.x_patches_train.size()[2], 1), 0,1),(Z_2_new[str(k)+","+str(t)] - Z_2_prime_new[str(k)+","+str(t)]))
            linear_part *= b/P

            quadratic_part = 0
            for k in range(1,K//P+1):
                for l in range(1,P+1):
                    newpart = torch.matmul(torch.transpose(self.x_patches_train[i][(k-1)*P+ l-1].view(self.x_patches_train.size()[2], 1), 0, 1),(Z_1_new[str(k)+","+str(t)] - Z_1_prime_new[str(k)+","+str(t)]))
                    newpart = torch.matmul(newpart, self.x_patches_train[i][(k-1)*P+ l-1].view(self.x_patches_train.size()[2],1))
                    quadratic_part += newpart
            quadratic_part *= a/P
            
            #print(quadratic_part, linear_part, constant_part)
            
            
         
            ypred[t-1] = quadratic_part + linear_part + constant_part


        return ypred

    
    
    def customloss(self, Yhat, y):
        #convex L2 loss function
        objective1 = 0.5 * torch.norm(Yhat - y)**2 *N/y.shape[0]
        

        # sum of Z4 scalars added to loss
        objective2 = 0 

        for i in range(1,K//P+1):
            for j in range(1,C+1):
                objective2 += self.Z_4_arr_train[str(i)+","+str(j)] + self.Z_4_arr_prime_train[str(i)+","+str(j)]
            
        objective = objective1+objective2

        return objective
            
                  

                    
 





In [3]:


#load cifar data
(X_train, y_train), (X_test, y_test) = cifar10.load_data()


#subset data
X_train = X_train[:1000,:,:,:]
y_train = y_train[:1000]
X_test = X_test[:200,:,:,:]
y_test = y_test[:200]

#set up val/test split
X_val = X_test[:X_test.shape[0]//2,:,:,:]
X_test = X_test[X_test.shape[0]//2:,:,:,:]

y_val = y_test[:y_test.shape[0]//2,:]
y_test = y_test[:y_test.shape[0]//2,:]


#parameters: f is filter size, P is pooling size, C is class count, a,b,c are the polynomial activation constants

f = 4



train_images = X_train.astype(np.float64)
train_labels = y_train.astype(np.float64)
test_images = X_test.astype(np.float64)
test_labels = y_test.astype(np.float64)
val_images = X_val.astype(np.float64)
val_labels = y_val.astype(np.float64)




# #meaning out the images
# mean_image = np.mean(train_images, axis = 0)
# train_images -= mean_image
# test_images -= mean_image
# val_images -= mean_image

# # RGB 0 to 255
# #-127.5 to +127.5
# #-1 to 1

# # scale to [-1, 1]

# train_images /= 127.5
# test_images /= 127.5
# val_images /= 127.5


train_images /= 255
#train_images *= 2
#train_images -= 1

test_images /= 255
#test_images *= 2
#test_images -= 1

val_images /= 255
#val_images *= 2
#val_images -= 1

train_images_v2 = np.swapaxes(train_images.reshape(train_images.shape[0], 3, 32, 32), 2, 3)
test_images_v2 = np.swapaxes(test_images.reshape(test_images.shape[0], 3, 32, 32), 2, 3)
val_images_v2 = np.swapaxes(val_images.reshape(val_images.shape[0],3, 32, 32), 2, 3)

#setting up patches
patches_train = torch.nn.functional.unfold(torch.tensor(train_images_v2), kernel_size=(f,f), stride=f, padding=0)
patches_test = torch.nn.functional.unfold(torch.tensor(test_images_v2), kernel_size=(f, f), stride=f, padding=0)
patches_val = torch.nn.functional.unfold(torch.tensor(val_images_v2), kernel_size=(f,f), stride=f, padding=0)


patches_train = patches_train.permute(0,2,1)
patches_test = patches_test.permute(0,2,1)
patches_val = patches_val.permute(0,2,1)

N = X_train.shape[0]
K = patches_train.shape[1]

Yhat_train =  None
Yhat_test = None

P = K
C = 10
a=0.09
b=0.5
c=0.47
beta = 1e-6

print("K: ", K)
print("N: ", N)
print("patches_train size: ", patches_train.size())
print("patches_val size: ", patches_val.size())
print("patches_test size: ", patches_test.size())

K:  64
N:  1000
patches_train size:  torch.Size([1000, 64, 48])
patches_val size:  torch.Size([100, 64, 48])
patches_test size:  torch.Size([100, 64, 48])


In [4]:
def train(model, loss_fn, optimizer, epochs=1):
    model = model.to(device=device) # move the model parameters to CPU/GPU
    
    loss_train = []
    accuracies_train = []
    accuracies_val = []

    for e in range(epochs):
        for t, (x, y) in enumerate(zip(train_images, train_labels)):
            model.train() # put model to training mode

            scores = model(t)
            
      

            y_hot = torch.zeros(scores.size(), dtype = dtype)
            
            y_hot[y] = 1
            


            loss = loss_fn(scores, y_hot)
            loss_train.append(loss)

            # Zero out all of the gradients for the variables which the optimizer
            # will update.
            optimizer.zero_grad()

            # This is the backwards pass: compute the gradient of the loss with
            # respect to each  parameter of the model.
            loss.backward()

            # Actually update the parameters of the model using the gradients
            # computed by the backwards pass.
            optimizer.step()

            if t % 100 == 0:
                print('Iteration %d, loss = %.4f' % (t, loss.item()))
                accuracies_train.append(check_accuracy(X_train, y_train, model, segment = "train"))
                
                accuracies_val.append(check_accuracy(X_val, y_val, model, segment = "val"))
                

    return loss_train, accuracies_train, accuracies_val

                
           

In [5]:
def check_accuracy(X, Y, model, segment = "train"):
    if segment=='train':
        print('Checking accuracy on train set')
    elif segment == "val":
        print('Checking accuracy on val set')
    else:
        print('Checking accuracy on test set')
    num_correct = 0
    num_samples = 0
    model.eval()  # set model to evaluation mode
    with torch.no_grad():
        for t, (x, y) in enumerate(zip(X, Y)):
            scores = model(t)
            max_score_idx = torch.argmax(scores)
            y = torch.tensor(y)
            print(scores, y)
            addvalue = 1 if (max_score_idx == y) else 0
            num_correct += addvalue
            num_samples += 1
            
        acc = float(num_correct) / num_samples
        print('Got %d / %d correct (%.2f)'% (num_correct, num_samples, 100 * acc))
        return acc

In [6]:
model = custom_multiclass_torch(N, P, C, K, f,a,b,c,beta, patches_train, patches_test)


In [7]:
count = 0
for param in model.parameters():
    #print(param.shape)
    count += 1
print(count)

60


In [8]:
loss_fn = model.customloss

optimizer = optim.Adam(model.parameters(), lr=1e-4)



loss_train, accuracies_train, accuracies_val = train(model, loss_fn, optimizer, epochs=1)



Parameter containing:
tensor([[-0.8907, -0.6682,  0.8294,  ..., -0.3283, -0.1726,  1.2581],
        [-2.0239,  0.1107,  0.0067,  ..., -1.0406, -1.8674, -0.1110],
        [ 1.5277,  2.7141,  0.3903,  ..., -0.7939,  2.4227,  0.2194],
        ...,
        [-0.3398,  0.6941,  0.0763,  ..., -0.3218,  0.6683,  0.9873],
        [ 1.9783,  1.8039, -0.2071,  ...,  0.5478, -0.3675,  0.1147],
        [ 0.9563, -0.6527, -0.1232,  ..., -0.4529, -1.6135, -1.1644]],
       dtype=torch.float64, requires_grad=True)
Parameter containing:
tensor([[0.7556]], dtype=torch.float64, requires_grad=True)


AttributeError: 'custom_multiclass_torch' object has no attribute 'Z_1_prime_arr_train'

In [None]:


plt.figure(figsize=(10,5))
plt.title('Training Accuracy')
#print(accuracies_train)
plt.plot(accuracies_train)
plt.xlabel('Epochs')
plt.ylabel('Val Accuracy')
#plt.legend()
plt.show()


plt.figure(figsize=(10,5))
plt.title('Validation Accuracy')
#print(accuracies_val)
plt.plot(accuracies_val)
plt.xlabel('Epochs')
plt.ylabel('Val Accuracy')
#plt.legend()
plt.show()



plt.figure(figsize=(10,5))
plt.title('Training Loss')
#print(loss_train)
plt.plot(loss_train)
plt.xlabel('Epochs')
plt.ylabel('Train Loss')
#plt.legend()
plt.show()

In [None]:
check_accuracy(X_test, y_test, model, segment = "test")
