In [1]:
%matplotlib inline

import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt

from IPython.display import clear_output
from torch.utils.data import DataLoader, TensorDataset
from torchsummary import summary
from sklearn.model_selection import train_test_split

from simple_movement import DataGen, SimpleMovement

In [4]:
dgen = DataGen(100, 100, 5)
x, y = dgen.get_data('./data/data_simple_movement_1/', 10, True)

In [5]:
x.shape, y.shape

((120, 5, 100, 100), (120, 3))

In [2]:
class MMC(nn.Module):
    
    def __init__(self, beta=5, num_iter=1):
        super(MMC, self).__init__()
        self.df = 1. / beta
        self.num_iter = num_iter
        self.weights = nn.Parameter(
            data= torch.tensor([[1, self.df, 0], [-1, 0, 1], [0, 0, 1]]),
            requires_grad=False
        ) # .unsqueeze(0).expand(9, 3, 3)
            
    def forward(self, x):
        # = torch.bmm(torch.pow(self.weights, self.num_iter), x.unsqueeze(2)).squeeze(2)
        x = torch.matmul(self.weights, x.transpose(0, 1))
        return x

In [3]:
class Encoder(nn.Module):
    
    def __init__(self, num_channel):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=num_channel, out_channels=16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(16, 8, 3, padding=1)
        self.conv3 = nn.Conv2d(8, 8, 3, padding=1)
        self.conv4 = nn.Conv2d(8, 8, 2)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(200, 128)
        self.fc2 = nn.Linear(128, 3)
        self.mmc = MMC()
    
    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = self.pool(F.relu(self.conv4(x)))
        x = x.view(-1, 5*5*8)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        print('before mmc: {}'.format(x))
        x = self.mmc(x)
        print('after mmc: {}\n'.format(x))
        return x

In [4]:
model = Encoder(1)
for n, p in model.named_parameters():
    print(n, p.shape)

conv1.weight torch.Size([16, 1, 3, 3])
conv1.bias torch.Size([16])
conv2.weight torch.Size([8, 16, 3, 3])
conv2.bias torch.Size([8])
conv3.weight torch.Size([8, 8, 3, 3])
conv3.bias torch.Size([8])
conv4.weight torch.Size([8, 8, 2, 2])
conv4.bias torch.Size([8])
fc1.weight torch.Size([128, 200])
fc1.bias torch.Size([128])
fc2.weight torch.Size([3, 128])
fc2.bias torch.Size([3])
mmc.weights torch.Size([3, 3])


In [5]:
# summary(model, (1, 100, 100))

In [6]:
def get_model(num_channel):
    
    net = Encoder(num_channel=num_channel)
    optimizer = torch.optim.Adam(net.parameters(), lr=0.001)
    
    return net, optimizer

In [7]:
def to_tensor(x):
    return torch.from_numpy(x).float()

def get_data(height, width, num_channel, path, size, bs):
    
    dgen = DataGen(height, width, num_channel)
    x, y = dgen.get_feature_target_pairs(path, size)
    print('Data loaded...\nx:{}\ty:{}\n'.format(x.shape, y.shape))
    
    x = x / 255.
    x = x[:90]
    y = y[:90]
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.1, shuffle=True, random_state=331)
    x_train, x_test, y_train, y_test = map(to_tensor, (x_train, x_test, y_train, y_test))
    
    train_ds = TensorDataset(x_train, y_train)
    train_dl = DataLoader(train_ds, batch_size=bs, shuffle=True)

    test_ds = TensorDataset(x_test, y_test)
    test_dl = DataLoader(test_ds, batch_size=bs)
    
    return train_dl, test_dl

In [8]:
def grad_flow(named_parameters):
    ave_grads = []
    max_grads= []
    layers = []
    for n, p in named_parameters:
        if(p.requires_grad) and ("bias" not in n):
            layers.append(n)
            ave_grads.append(p.grad.abs().mean())
            max_grads.append(p.grad.abs().max())
    print('layers: {}'.format(layers))
    print('max: {}'.format(max_grads))
    print('mean: {}\n\n'.format( ave_grads))

In [9]:
def fit(net, optimizer, train_dl, test_dl, epochs=5):
    
    loss_function = nn.MSELoss()
    for epoch in range(epochs):
        net.train()
        for x, y in train_dl:
            pred = net(x)
            loss = loss_function(pred, y)

            optimizer.zero_grad()
            loss.backward()
            grad_flow(net.named_parameters())
            optimizer.step()

        net.eval()
        with torch.no_grad():
            test_loss = sum(loss_function(net(x), y) for x, y in test_dl)
        print('epoch:{}, test_loss:{}'.format(epoch+1, test_loss/len(test_dl)))

In [10]:
net, optimizer = get_model(num_channel=2)

train_dl, test_dl = get_data(
    height=100, 
    width=100,
    num_channel=2,
    path='./data/data_simple_movement_1/',
    size=8,
    bs=1
)

fit(net, optimizer, train_dl, test_dl, epochs=10)

Data loaded...
x:(96, 2, 100, 100)	y:(96, 3)

before mmc: tensor([[-0.0213,  0.0899, -0.0765]], grad_fn=<AddmmBackward>)
after mmc: tensor([[-0.0033],
        [-0.0552],
        [-0.0765]], grad_fn=<MmBackward>)

layers: ['conv1.weight', 'conv2.weight', 'conv3.weight', 'conv4.weight', 'fc1.weight', 'fc2.weight']
max: [tensor(0.1430), tensor(0.2257), tensor(0.1869), tensor(0.3095), tensor(0.2646), tensor(3.3810)]
mean: [tensor(0.0233), tensor(0.0215), tensor(0.0147), tensor(0.0227), tensor(0.0238), tensor(0.2322)]


before mmc: tensor([[-0.0213,  0.0990, -0.0473]], grad_fn=<AddmmBackward>)
after mmc: tensor([[-0.0015],
        [-0.0260],
        [-0.0473]], grad_fn=<MmBackward>)

layers: ['conv1.weight', 'conv2.weight', 'conv3.weight', 'conv4.weight', 'fc1.weight', 'fc2.weight']
max: [tensor(0.1275), tensor(0.2933), tensor(0.1842), tensor(0.4965), tensor(0.2690), tensor(3.4809)]
mean: [tensor(0.0245), tensor(0.0241), tensor(0.0203), tensor(0.0528), tensor(0.0228), tensor(0.2333)]


befo

KeyboardInterrupt: 

In [11]:
a = np.array([[-0.0472], [-0.0123],  [0.0237]])
b = np.array([[1, 0.2, 0], [-1, 0, 1], [0, 0, 1]])
np.dot(b, a)

array([[-0.04966],
       [ 0.0709 ],
       [ 0.0237 ]])

In [11]:
#         iteration = 0
#         while iteration < self.num_iter:
#             for i in range(x.shape[0]):
#                 x[i] = torch.matmul(self.weights, x[i])
#             iteration += 1
        # x = torch.acos(x)

In [20]:
a = torch.tensor([[1, 2, 3]])
a

tensor([[1, 2, 3]])

In [21]:
a.transpose(0, 1)

tensor([[1],
        [2],
        [3]])