In [54]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
%matplotlib inline
# from mpl_toolkits.mplot3d import Axes3D

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils import data
from torchvision import datasets, transforms as T

import random, os, pathlib, time
from tqdm import tqdm
# from sklearn import datasets

In [55]:
# device = torch.device("cuda:0")
device = torch.device("cpu")

In [56]:
from tqdm import tqdm
import os, time, sys
import json

In [57]:
import dtnnlib as dtnn

In [58]:
mnist_transform = T.Compose([
    T.ToTensor(),
    T.Normalize(
        mean=[0.5,],
        std=[0.5,],
    ),
])

train_dataset = datasets.FashionMNIST(root="../../../../_Datasets/", train=True, download=True, transform=mnist_transform)
test_dataset = datasets.FashionMNIST(root="../../../../_Datasets/", train=False, download=True, transform=mnist_transform)

# train_dataset = datasets.MNIST(root="../../../../_Datasets/", train=True, download=True, transform=mnist_transform)
# test_dataset = datasets.MNIST(root="../../../../_Datasets/", train=False, download=True, transform=mnist_transform)

In [59]:
batch_size = 50
train_loader = data.DataLoader(dataset=train_dataset, num_workers=4, batch_size=batch_size, shuffle=True)
test_loader = data.DataLoader(dataset=test_dataset, num_workers=4, batch_size=batch_size, shuffle=False)

In [60]:
for xx, yy in train_loader:
    xx, yy = xx.to(device), yy.to(device)
    print(xx.shape, yy.shape)
    break

torch.Size([50, 1, 28, 28]) torch.Size([50])


### Single Layer overfitting

In [8]:
h1 = 100
model = nn.Sequential(
            dtnn.DistanceTransform_MinExp(784, h1),
#             dtnn.DistanceTransform_Exp(784, h1),
#             nn.BatchNorm1d(h1),
#             nn.LeakyReLU(),
            nn.Linear(h1, 10))

In [9]:
model.to(device)

Sequential(
  (0): DistanceTransform_MinExp()
  (1): Linear(in_features=100, out_features=10, bias=True)
)

In [10]:
# model[0].set_centroid_to_data_maxdist(train_loader)
# model[0].set_centroid_to_data(train_loader)
# model[0].set_centroid_to_data_randomly(train_loader)

## Randomly

In [11]:
N = model[0].centers.shape[0]
new_center = []
new_labels = []
count = 0
for i, (xx, yy) in enumerate(train_loader):
    xx = xx.reshape(-1, model[0].input_dim).to(model[0].centers.device)
    if count+xx.shape[0] < N:
        new_center.append(xx)
        new_labels.append(yy)
        count += xx.shape[0]
    elif count >= N:
        break
    else:
        new_center.append(xx[:N-count])
        new_labels.append(yy[:N-count])
        count = N
        break
        
new_center = torch.cat(new_center, dim=0)
new_labels = torch.cat(new_labels, dim=0)

In [12]:
new_center.shape, new_labels.shape

(torch.Size([100, 784]), torch.Size([100]))

## Set data as parameters


In [13]:
## Convert weights to one hot.
weights = torch.zeros(len(new_labels), 10)
for i in range(len(new_labels)):
    weights[i, new_labels[i]] = 1.
# weights

In [14]:
weights.shape

torch.Size([100, 10])

In [15]:
model[0].centers.data = new_center.to(model[0].centers.device)
model[-1].weight.data = weights.t().to(model[-1].weight.data)

In [16]:
best_acc = -1
def test(epoch, model):
    global best_acc
    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
#         for batch_idx, (inputs, targets) in enumerate(tqdm(test_loader)):
        for batch_idx, (inputs, targets) in enumerate(test_loader):
            inputs, targets = inputs.to(device).view(-1, 28*28), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)

            test_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
            
    print(f"[Test] {epoch} Loss: {test_loss/(batch_idx+1):.3f} | Acc: {100.*correct/total:.3f} {correct}/{total}")
    
    acc = 100.*correct/total
    return acc

In [17]:
model.eval()

Sequential(
  (0): DistanceTransform_MinExp()
  (1): Linear(in_features=100, out_features=10, bias=True)
)

In [18]:
criterion = nn.CrossEntropyLoss()

In [19]:
test_acc = test(0, model)
test_acc

[Test] 0 Loss: 1.300 | Acc: 66.350 6635/10000


66.35

# Test MultiLayer data init

In [20]:
h2 = 200
model = nn.Sequential(
            dtnn.DistanceTransform_MinExp(784, h2),
            nn.Linear(h2, 784),
            dtnn.DistanceTransform_MinExp(784, h1),
            nn.Linear(h1, 10))

In [21]:
model.eval()

Sequential(
  (0): DistanceTransform_MinExp()
  (1): Linear(in_features=200, out_features=784, bias=True)
  (2): DistanceTransform_MinExp()
  (3): Linear(in_features=100, out_features=10, bias=True)
)

### Randomly select second sets of samples

In [22]:
N = h2
new_center2 = []
new_labels2 = []
count = 0
for i, (xx, yy) in enumerate(train_loader):
    xx = xx.reshape(-1, model[0].input_dim).to(model[0].centers.device)
    if count+xx.shape[0] < N:
        new_center2.append(xx)
        new_labels2.append(yy)
        count += xx.shape[0]
    elif count >= N:
        break
    else:
        new_center2.append(xx[:N-count])
        new_labels2.append(yy[:N-count])
        count = N
        break
        
new_center2 = torch.cat(new_center2, dim=0)
new_labels2 = torch.cat(new_labels2, dim=0)

In [23]:
new_center.shape, new_center2.shape

(torch.Size([100, 784]), torch.Size([200, 784]))

In [24]:
# if len(new_center) < h2:
#     raise Exception("The function below does not support increasing hidden units ..")

#### Old Layers

In [25]:
model[-2].centers.data = new_center.to(device)

In [26]:
weights = torch.zeros(h1, 10)
for i in range(h1): ## not all activations(of centers) fit into neurons in layer 2
    weights[i, new_labels[i]] = 1.

In [27]:
model[-1].weight.shape, weights.shape

(torch.Size([10, 100]), torch.Size([100, 10]))

In [28]:
model[-1].weight.data = weights.t().to(model[-1].weight.data)

#### New Layer 1

In [29]:
torch.unique(new_labels, return_counts=True)

(tensor([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
 tensor([ 8,  5, 11, 11, 16, 16, 10,  9,  8,  6]))

In [30]:
### Finding samples for given target values
target_samples = {}
for l in torch.unique(new_labels):
    mask = new_labels == l
#     print(l, torch.count_nonzero(mask))
    target_samples[int(l)] = new_center[mask]
target_samples

{0: tensor([[-1., -1., -1.,  ..., -1., -1., -1.],
         [-1., -1., -1.,  ..., -1., -1., -1.],
         [-1., -1., -1.,  ..., -1., -1., -1.],
         ...,
         [-1., -1., -1.,  ..., -1., -1., -1.],
         [-1., -1., -1.,  ..., -1., -1., -1.],
         [-1., -1., -1.,  ..., -1., -1., -1.]]),
 1: tensor([[-1., -1., -1.,  ..., -1., -1., -1.],
         [-1., -1., -1.,  ..., -1., -1., -1.],
         [-1., -1., -1.,  ..., -1., -1., -1.],
         [-1., -1., -1.,  ..., -1., -1., -1.],
         [-1., -1., -1.,  ..., -1., -1., -1.]]),
 2: tensor([[-1., -1., -1.,  ..., -1., -1., -1.],
         [-1., -1., -1.,  ..., -1., -1., -1.],
         [-1., -1., -1.,  ..., -1., -1., -1.],
         ...,
         [-1., -1., -1.,  ..., -1., -1., -1.],
         [-1., -1., -1.,  ..., -1., -1., -1.],
         [-1., -1., -1.,  ..., -1., -1., -1.]]),
 3: tensor([[-1., -1., -1.,  ..., -1., -1., -1.],
         [-1., -1., -1.,  ..., -1., -1., -1.],
         [-1., -1., -1.,  ..., -1., -1., -1.],
         ...,


In [31]:
model[0].centers.shape

torch.Size([200, 784])

In [32]:
for i in range(h2):
    model[0].centers.data[i] = new_center2[i].to(device)
    ts = target_samples[int(new_labels2[i])]
#     print(i, len(ts))
#     print(ts[np.random.randint(0, len(ts))].shape)
#     print(model[1].weight.data.shape)

    ### SELECT RANDOMLY
#     model[1].weight.data[:, i] = ts[np.random.randint(0, len(ts))]
    
    ### SELECT CLOSEST SAMPLES
#     sample = new_center2[i:i+1]
#     nearest = torch.cdist(ts, sample).squeeze().argmin()
#     model[1].weight.data[:, i] = ts[nearest]
    
    ### SELECT SAMPLE WITH MINIMUM LOSS
    out = model[-1](model[-2](ts)).data
    minimal = F.cross_entropy(out, 
                              torch.ones(len(ts), dtype=torch.long)*int(new_labels2[i]), 
                              reduction="none").argmin()
    model[1].weight.data[:, i] = ts[minimal]

#     break

In [33]:
torch.ones(len(ts), dtype=torch.long)*int(new_labels2[i])

tensor([0, 0, 0, 0, 0, 0, 0, 0])

In [34]:
model[0].centers.shape, model[1].weight.shape

(torch.Size([200, 784]), torch.Size([784, 200]))

In [35]:
model(xx).shape

torch.Size([50, 10])

#### Test performance

In [36]:
test_acc2 = test(0, model)
test_acc2

[Test] 0 Loss: 1.943 | Acc: 42.510 4251/10000


42.51

### Using Same Set of centers in layer 1 and mapping to itself.

In [37]:
assert h1 == h2, "mapping to itself in its simplest form requires both value same"

AssertionError: mapping to itself in its simplest form requires both value same

In [None]:
h2 = 200
model = nn.Sequential(
            dtnn.DistanceTransform_MinExp(784, h2),
            nn.Linear(h2, 784),
            dtnn.DistanceTransform_MinExp(784, h1),
            nn.Linear(h1, 10))

In [None]:
model.eval()

#### Old Layers

In [None]:
model[-2].centers.data = new_center.to(device)

In [None]:
weights = torch.zeros(h1, 10)
for i in range(h1): ## not all activations(of centers) fit into neurons in layer 2
    weights[i, new_labels[i]] = 1.

In [None]:
model[-1].weight.shape, weights.shape

In [None]:
model[-1].weight.data = weights.t().to(model[-1].weight.data)

#### New Layer 1

In [None]:
model[0].centers.shape, new_center.shape

In [None]:
model[0].centers.data = new_center.to(device)
model[1].weight.data = new_center.t().to(device)

In [None]:
model[0].centers.shape, model[1].weight.shape

In [None]:
model(xx).shape

#### Test performance

In [None]:
test_acc3 = test(0, model)
test_acc3

In [None]:
test_acc, test_acc2, test_acc3

# Initial First block with residual layer to input samples

In [38]:
h2 = 200
model = nn.Sequential(
            dtnn.DistanceTransform_MinExp(784, h2),
            nn.Linear(h2, 784),
            dtnn.DistanceTransform_MinExp(784, h1),
            nn.Linear(h1, 10))

In [39]:
model.eval()

Sequential(
  (0): DistanceTransform_MinExp()
  (1): Linear(in_features=200, out_features=784, bias=True)
  (2): DistanceTransform_MinExp()
  (3): Linear(in_features=100, out_features=10, bias=True)
)

#### Old Layers

In [40]:
model[-2].centers.data = new_center.to(device)

In [41]:
weights = torch.zeros(h1, 10)
for i in range(h1): ## not all activations(of centers) fit into neurons in layer 2
    weights[i, new_labels[i]] = 1.

In [42]:
model[-1].weight.shape, weights.shape

(torch.Size([10, 100]), torch.Size([100, 10]))

In [43]:
model[-1].weight.data = weights.t().to(model[-1].weight.data)

#### New Layer 1

In [44]:
torch.unique(new_labels, return_counts=True)

(tensor([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
 tensor([ 8,  5, 11, 11, 16, 16, 10,  9,  8,  6]))

In [45]:
### Finding samples for given target values
target_samples = {}
for l in torch.unique(new_labels):
    mask = new_labels == l
    target_samples[int(l)] = new_center[mask]
# target_samples

In [46]:
model[0].centers.shape

torch.Size([200, 784])

In [47]:
for i in range(h2):
    model[0].centers.data[i] = new_center2[i].to(device)
    ts = target_samples[int(new_labels2[i])]
    ### SELECT CLOSEST SAMPLES
    sample = new_center2[i:i+1]
    nearest = torch.cdist(ts, sample).squeeze().argmin()
    model[1].weight.data[:, i] = ts[nearest] - new_center2[i]
    ### SELECT SAMPLE WITH MINIMUM LOSS
#     out = model[-1](model[-2](ts)).data
#     minimal = F.cross_entropy(out, 
#                               torch.ones(len(ts), dtype=torch.long)*int(new_labels2[i]), 
#                               reduction="none").argmin()
#     model[1].weight.data[:, i] = ts[minimal] - new_center2[i]
    break

In [48]:
torch.ones(len(ts), dtype=torch.long)*int(new_labels2[i])

tensor([3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3])

In [49]:
model[0].centers.shape, model[1].weight.shape

(torch.Size([200, 784]), torch.Size([784, 200]))

In [50]:
model(xx).shape

torch.Size([50, 10])

In [51]:
class model_residual(nn.Module):
    def __init__(self, model):
        super().__init__()
        self.model = model
        
    def forward(self, x):
        r = self.model[1](self.model[0](x))
        x = r+x
        return self.model[3](self.model[2](x))
    
model_res = model_residual(model)

#### Test performance

In [52]:
test_acc4 = test(0, model_res)
test_acc4

[Test] 0 Loss: 1.337 | Acc: 59.620 5962/10000


59.62

In [53]:
# test_acc, test_acc2, test_acc3, test_acc4
test_acc, test_acc2, test_acc4

(66.35, 42.51, 59.62)