## Generating Data

In [1]:
# Imports
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

In [2]:
data = pd.read_pickle("final_data.pkl")

In [3]:
my_dpi = 128
def plot_index(data, idx):
    fig = plt.figure(frameon=False, figsize=(256/my_dpi, 256/my_dpi), dpi=my_dpi)
    plt.gray()
    plt.plot(data.loc[data.index[idx]].X, data.loc[data.index[idx]].Y)
    fig.subplots_adjust(bottom = 0)
    fig.subplots_adjust(top = 1)
    fig.subplots_adjust(right = 1)
    fig.subplots_adjust(left = 0)
    fig.savefig(f"images/plot{idx}-{get_user(data, idx)}.png", cmap=plt.get_cmap('gray'))
def get_label(data, idx):
    return 1 if data.loc[data.index[idx]].label == 1 else 0
def get_user(data, idx):
    return data.loc[data.index[idx]].user

In [12]:
df = pd.DataFrame(columns=['image', 'user'])
for idx in range(len(data)):
    if get_label(data, idx):
        user = get_user(data, idx)
        plot_index(data, idx)
        img = plt.imread(f"images/plot{idx}-{user}.png")
        df = df.append({
            "image": img[:,:,1],
            "user": user
        }, ignore_index=True)

In [13]:
df['user'] = df['user'].astype('int')

## Preparing data for training

In [14]:
import torch
from torch.utils.data import TensorDataset
from torch.utils.data.sampler import SubsetRandomSampler
from torch.utils.data.dataloader import DataLoader
import torch.nn as nn
import torch.nn.functional as F

In [15]:
df['image'][0].shape

(256, 256)

In [16]:
dataset = TensorDataset(torch.tensor(df['image']),torch.tensor(df['user']))

In [17]:
def split_indices(n, test_pct=0.2, seed=99):
    n_test = int(test_pct*n)
    np.random.seed(seed)
    idxs = np.random.permutation(n)
    return idxs[n_test:], idxs[:n_test]

In [18]:
test_pct = 0.2
rand_seed = 42
train_indices, test_indices = split_indices(len(dataset), test_pct, rand_seed)
print(len(train_indices), len(test_indices))

1997 499


In [19]:
batch_size = 100
train_sampler = SubsetRandomSampler(train_indices)
train_dl = DataLoader(dataset, batch_size, sampler=train_sampler)

test_sampler = SubsetRandomSampler(test_indices)
test_dl = DataLoader(dataset, batch_size, sampler=test_sampler)

## Model

In [26]:
model = nn.Sequential(
    nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(4,4), #op: 16x64x64
    
    nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(4,4), # op: 16x16x16
    
    nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(2,2), # op: 16x8x8

    nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(2,2), # op: 16x4x4
    
    nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(2,2), # 16x2x2

    nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(2,2), # 16x1x1
    nn.Flatten(), #16
    nn.Linear(32, 100) #op: 100 classes
)

In [27]:
for xb, yb, in train_dl:
    xb = xb.unsqueeze(1)
    print(xb.shape)
    preds = model(xb)
    print(preds)
    break

torch.Size([100, 1, 256, 256])
tensor([[ 0.0130, -0.1420,  0.1841,  ...,  0.0311, -0.0815,  0.0651],
        [ 0.0127, -0.1422,  0.1843,  ...,  0.0311, -0.0820,  0.0645],
        [ 0.0127, -0.1420,  0.1841,  ...,  0.0312, -0.0818,  0.0649],
        ...,
        [ 0.0129, -0.1421,  0.1837,  ...,  0.0308, -0.0814,  0.0647],
        [ 0.0127, -0.1419,  0.1838,  ...,  0.0310, -0.0815,  0.0647],
        [ 0.0128, -0.1420,  0.1843,  ...,  0.0314, -0.0816,  0.0649]],
       grad_fn=<AddmmBackward>)


In [41]:
def loss_batch(model, loss_fn, xb, yb, opt=None, metric=None):
    preds = model(xb)
    loss = loss_fn(preds, yb)
    if opt is not None:
        loss.backward()
        opt.step()
        opt.zero_grad()
    metric_result = None
    if metric is not None:
        metric_result = metric(preds, yb)
    return loss.item(), len(yb), metric_result

In [42]:
def evaluate(model, loss_fn, test_dl, metric=None):
    with torch.no_grad():
        results = [loss_batch(model, loss_fn, xb.unsqueeze(1), yb, metric=metric) for xb, yb in test_dl]
        losses, nums, metrics = zip(*results)
        total = np.sum(nums)
        avg_loss = np.sum(np.multiply(losses, nums)) / total
        avg_metric = None
        if metric is not None:
            avg_metric = np.sum(np.multiply(metrics, nums)) / total
    return avg_loss, total, avg_metric

In [43]:
def fit(epochs, model, loss_fn, train_dl, test_dl, 
        opt_fn=None, lr=None, metric=None):
    train_losses, test_losses, test_metrics = [], [], []
    
    # Instantiate the optimizer
    if opt_fn is None: opt_fn = torch.optim.SGD
    opt = opt_fn(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        # Training
        model.train()
        for xb,yb in train_dl:
            xb = xb.unsqueeze(1)
            train_loss,_,_ = loss_batch(model, loss_fn, xb, yb, opt)

        # Evaluation
        model.eval()
        result = evaluate(model, loss_fn, test_dl, metric)
        test_loss, total, test_metric = result
        
        # Record the loss & metric
        train_losses.append(train_loss)
        test_losses.append(test_loss)
        test_metrics.append(test_metric)
        
        # Print progress
        if metric is None:
            print('Epoch [{}/{}], train_loss: {:4f}, test_loss: {:.4f}'
                  .format(epoch+1, epochs, train_loss, test_loss))
        else:
            print('Epoch [{}/{}], train_loss: {:.4f}, test_loss: {:.4f}, test_{}: {:.4f}'
                  .format(epoch+1, epochs, train_loss, test_loss, 
                          metric.__name__, test_metric))
    return train_losses, test_losses, test_metrics

In [44]:
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.sum(preds == labels).item() / len(preds)

In [45]:
test_loss, _, test_acc = evaluate(model, F.cross_entropy, 
                                    test_dl, metric=accuracy)
print('Loss: {:.4f}, Accuracy: {:.4f}'.format(test_loss, test_acc))

Loss: 4.6144, Accuracy: 0.0100


In [46]:
num_epochs = 10
opt_fn = torch.optim.Adam
lr = 0.005

In [47]:
history = fit(num_epochs, model, F.cross_entropy, 
              train_dl, test_dl, opt_fn, lr, accuracy)
train_losses, test_losses, test_metrics = history

Epoch [1/10], train_loss: 4.6097, test_loss: 4.6176, test_accuracy: 0.0060
Epoch [2/10], train_loss: 4.6102, test_loss: 4.6222, test_accuracy: 0.0060
Epoch [3/10], train_loss: 4.5968, test_loss: 4.6276, test_accuracy: 0.0040
Epoch [4/10], train_loss: 4.6084, test_loss: 4.6347, test_accuracy: 0.0000
Epoch [5/10], train_loss: 4.6159, test_loss: 4.6378, test_accuracy: 0.0000
Epoch [6/10], train_loss: 4.6096, test_loss: 4.6374, test_accuracy: 0.0000


KeyboardInterrupt: 