In [132]:
# MNIST MLP to Match Equilibrium Propagation

import numpy as np
from abc import ABC, abstractmethod

class Layer(ABC):
    
    def __init__(self):
        self.__prevIn__ = []
        self.__prevOut__ = []
        
    def setPrevIn(self, dataIn):
        self.__prevIn = dataIn
        
    def setPrevOut(self, out):
        self.__prevOut = out
        
    def getPrevIn(self):
        return self.__prevIn
    
    def getPrevOut(self):
        return self.__prevOut
    
    def backward(self, gradIn):
        return (gradIn @ self.gradient())
    
    @abstractmethod
    def forward(self, dataIn):
        pass
    
    @abstractmethod
    def gradient(self):
        pass

class HardSigLayer(Layer):
    def __init__(self):
        super().__init__()
    
    def forward(self, dataIn):
        self.setPrevIn(dataIn)      
        z = torch.clip(self.getPrevIn(), -1, 1)
        self.setPrevOut(z)
        return self.getPrevOut()
        
    def gradient(self): 
        z = (self.getPrevOut() > -1) & (self.getPrevOut() < 1)
        return z

In [133]:
from keras.datasets import mnist

(train_x, train_y), (test_x, test_y) = mnist.load_data()

In [134]:
print(train_x.shape)
print(train_x.flatten())
print(test_x.shape)

data = []
train_data = train_x.tolist()
test_data = test_x.tolist()
for value in train_data:
    data.append(value)
for item in test_data:
    data.append(item)
data = np.array(data)

#Standardize data
inputs = train_x - np.mean(data)
inputs = inputs/(np.std(data))

#standardize test data
test_inputs = test_x - np.mean(data)
test_inputs = test_inputs/(np.std(data))

targets = train_y
test_targets = test_y

train_x = inputs
valid_x = test_inputs
train_y = targets
valid_y = test_targets

(60000, 28, 28)
[0 0 0 ... 0 0 0]
(10000, 28, 28)


In [135]:
import numpy as np
%matplotlib inline
import matplotlib.pyplot as plt
from PIL import Image
import torch
#from torchvision import datasets, models, transforms
from torchvision import models, transforms
import torch.nn as nn
from torch.nn import functional as F
import torch.optim as optim
import time
import copy

In [136]:
from torch.utils.data import Dataset

class MyDataset(Dataset):
    def __init__(self, x, y):
        super(MyDataset, self).__init__()
        assert x.shape[0] == y.shape[0]
        self.x = x
        self.y = y
    
    def __len__(self):
        return self.y.shape[0]
    
    def __getitem__(self, index):
        return self.x[index], self.y[index]

In [137]:
traindata = MyDataset(train_x, train_y)
validation = MyDataset(valid_x, valid_y)

In [138]:
image_datasets = {
    'train': 
    traindata,
    'validation': 
    validation
}

dataloaders = {
    'train':
    torch.utils.data.DataLoader(traindata,
                                batch_size=1,
                                shuffle=True, num_workers=0),
    'validation':
    torch.utils.data.DataLoader(validation,
                                batch_size=1,
                                shuffle=False, num_workers=0)
}

In [139]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
#device = torch.device("cpu")
print(device)

cuda:0


In [140]:
class LinReg(nn.Module):
    def __init__(self):
        super().__init__()
        self.d = 28
        self.flatten = nn.Flatten()
        self.fc0 = nn.Linear(self.d**2,500)
        self.hs0 = HardSigLayer()
        #self.relu0 = nn.ReLU()
        self.do0 = nn.Dropout(0.25)
        self.fc1 = nn.Linear(500,10)
        self.hs1 = HardSigLayer()
        #self.relu1 = nn.ReLU()
        #self.do1 = nn.Dropout(0.1)
        #self.fc2 = nn.Linear(64,2)
        #self.do2 = nn.Dropout(0.2)
        
    def forward(self, x):
        x = x.float()
        x = self.flatten(x)
        x = self.fc0(x)
        x = self.hs0.forward(x)
        #x = self.relu0(x)
        x = self.do0(x)
        x = self.fc1(x)
        x = self.hs1.forward(x)
        #x = self.relu1(x)
        #x = self.do1(x)
        #x = self.fc2(x)
        #x = self.do2(x)
        return x
    
model = LinReg().to(device)

In [141]:
criterion = nn.CrossEntropyLoss()
#criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters())
#optimizer = optim.SGD(model.fc.parameters(),lr=0.1,momentum=0.9)

In [142]:
def train_model(model, criterion, optimizer, num_epochs=50):
    best_acc = 0
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch+1, num_epochs))
        print('-' * 10)

        for phase in ['train', 'validation']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.type(torch.LongTensor)
                labels = labels.to(device)

                if phase == 'validation':
                    with torch.no_grad():
                        outputs = model(inputs)
                        loss = criterion(outputs, labels)
                else:
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                if phase == 'train':
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()

                _, preds = torch.max(outputs, 1)
                running_loss += loss.detach() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(image_datasets[phase])
            epoch_acc = running_corrects.float() / len(image_datasets[phase])
            #if phase == 'validation' and epoch_acc > best_acc:
            if phase == 'validation':
                print('saving best model...')
                torch.save(model.state_dict(), 'models/pytorch/weights.h5')
                best_acc = epoch_acc

            print('{} loss: {:.4f}, acc: {:.4f}'.format(phase,
                                                        epoch_loss.item(),
                                                        epoch_acc.item()))
    return model, best_acc

In [143]:
model_trained, accuracy = train_model(model, criterion, optimizer, num_epochs=5)
print('\nBest test accuracy: %f'%accuracy)

Epoch 1/5
----------
train loss: 2.1066, acc: 0.2441
saving best model...
validation loss: 2.0967, acc: 0.2413
Epoch 2/5
----------
train loss: 2.0881, acc: 0.2503
saving best model...
validation loss: 2.0866, acc: 0.2462
Epoch 3/5
----------
train loss: 2.0816, acc: 0.2553
saving best model...
validation loss: 2.0632, acc: 0.2608
Epoch 4/5
----------
train loss: 2.0783, acc: 0.2555
saving best model...
validation loss: 2.0828, acc: 0.2497
Epoch 5/5
----------
train loss: 2.0771, acc: 0.2552
saving best model...
validation loss: 2.0655, acc: 0.2581

Best test accuracy: 0.258100


In [28]:
model.load_state_dict(torch.load('models/pytorch/reg_9613_acc.h5'))

<All keys matched successfully>

In [29]:
for name, param in model.named_parameters():
    print('name: ', name)
    print(type(param))
    print('param.shape: ', param.shape)
    print('param.requires_grad: ', param.requires_grad)
    print('=====')

name:  fc0.weight
<class 'torch.nn.parameter.Parameter'>
param.shape:  torch.Size([500, 784])
param.requires_grad:  True
=====
name:  fc0.bias
<class 'torch.nn.parameter.Parameter'>
param.shape:  torch.Size([500])
param.requires_grad:  True
=====
name:  fc1.weight
<class 'torch.nn.parameter.Parameter'>
param.shape:  torch.Size([10, 500])
param.requires_grad:  True
=====
name:  fc1.bias
<class 'torch.nn.parameter.Parameter'>
param.shape:  torch.Size([10])
param.requires_grad:  True
=====


In [30]:
W1 = model.fc0.weight.cpu().detach().numpy().T
print(W1.shape)
np.save('reg_w1.npy',W1)

(784, 500)


In [31]:
W2 = model.fc1.weight.cpu().detach().numpy().T
print(W2.shape)
np.save('reg_w2.npy',W2)

(500, 10)


In [32]:
bh = model.fc0.bias.cpu().detach().numpy()
print(bh.shape)
np.save('reg_bh.npy',bh)

(500,)


In [33]:
by = model.fc1.bias.cpu().detach().numpy()
print(by.shape)
np.save('reg_by.npy',by)

(10,)
