In [None]:
import numpy as np
import pandas as pd
import os
import re
from IPython.display import display
import matplotlib.pyplot as plt
from curve_functions import FunctionProvider
import sklearn.preprocessing as prep
import typing

RESULTS_BACKUP_PATH = '/home/mnapravnik/Documents/PhD/Efficiency-of-dragging-gestures/Results_backup'

In [None]:
%%time
# first, gather all of the data related to drawn figures
def get_drawn_figures(test:int=0):
    
    path = f'{RESULTS_BACKUP_PATH}{test}'
    columns = [
        'Username',
        'TestIndex',
        'Device',
        'FigureID',
        'Projection',
        'ProjectionName',
        'Order',
        'Fullpath',
        'Npoints'
    ]
    df = pd.DataFrame(columns=columns)
    
    for user_direntry in os.scandir(path):
        # each directory is named after the user participant
        _username = user_direntry.name
        _username = re.sub(' ', '_', _username)
        
        for device_direntry in os.scandir(user_direntry.path):
            _device = device_direntry.name
            _device = re.sub(' ', '_', _device)
                
            for drawing_direntry in os.scandir(device_direntry.path):
                _only_numbers_and_underscore = re.sub('[^0-9,_]', '', drawing_direntry.name)
                _figureid, _projection, _order = _only_numbers_and_underscore.split('_')
                _figureid = int(_figureid)
                _projection = int(_projection)
                _order = int(_order)
                
                # check how many drawn points there are
                npoints = 0
                with open(drawing_direntry.path) as file:
                    npoints = len(file.readlines())
                
                # there seems to be a bug with polar projections
                # where figures in projection 2 are named as projection 3
                # and that happens only when
                # figureid is 0, so just fix it here, on-the-go
                if (_figureid == 0):
                    if(_projection == 2):
                        _projection = 3
                    elif(_projection == 3):
                        _projection = 2
                _projection_name = 'Cartesian' if _projection in [0, 1] else 'Polar'
                    
                
                # append the row to the end of the dataframe
                df.loc[len(df)] = [
                    _username,
                    test,
                    _device,
                    _figureid,
                    _projection,
                    _projection_name,
                    _order,
                    drawing_direntry.path,
                    npoints
                ]
#     df.set_index(['Username'], inplace=True)
    return df
    
df = get_drawn_figures(0)
df = pd.concat([df, get_drawn_figures(1)], ignore_index=True)

display(df)
display(df.Username.unique())

In [None]:
def get_drawn_coordinates(df_entry, scale:bool=False):
    _drawingx, _drawingy = [], []
    with open(df_entry.Fullpath) as file:
        for line in file:
            pointx, pointy = line.split()
            pointx = float(pointx)
            pointy = float(pointy)
            _drawingx.append(pointx)
            _drawingy.append(pointy)
    return _drawingx, _drawingy

def scale_coordinates(df_entry, x, y):
    from display_properties import CARTESIAN_PLOT_LIMITS, POLAR_PLOT_LIMITS
    x = np.array(x)
    y = np.array(y)
    if df_entry.Projection in [0,1]:
        # Cartesian plot
        y = (y - CARTESIAN_PLOT_LIMITS['y'][0]) / CARTESIAN_PLOT_LIMITS['y'][1]
        x = (x - CARTESIAN_PLOT_LIMITS['x'][0]) / CARTESIAN_PLOT_LIMITS['x'][1]
    else:
        # Polar plot
        # due to scaling, polar plots will look a bit idiotic
        y = (y - POLAR_PLOT_LIMITS['y'][0]) / POLAR_PLOT_LIMITS['y'][1]
        x = (x - POLAR_PLOT_LIMITS['x'][0]) / POLAR_PLOT_LIMITS['x'][1]
    return x, y

def unscale_coordinates(x, y, projection):
    from display_properties import CARTESIAN_PLOT_LIMITS, POLAR_PLOT_LIMITS
    x = np.array(x)
    y = np.array(y)
    if projection in [0,1]:
        # Cartesian plot
        y = y * CARTESIAN_PLOT_LIMITS['y'][1] + CARTESIAN_PLOT_LIMITS['y'][0]
        x = y * CARTESIAN_PLOT_LIMITS['x'][1] + CARTESIAN_PLOT_LIMITS['x'][0]
    else:
        # Polar plot
        y = y * POLAR_PLOT_LIMITS['y'][1] + POLAR_PLOT_LIMITS['y'][0]
        x = y * POLAR_PLOT_LIMITS['x'][1] + POLAR_PLOT_LIMITS['x'][0]
    return x, y 
    

def get_real_coordinates(df_entry, x):
    fp = FunctionProvider()
    difficulty = int(df_entry.FigureID / 2)
    task = int(df_entry.FigureID % 2)
    func_y = fp.provide_function_y(difficulty, task, x, df_entry.Projection)
    return x, func_y

# Inspect drawn vs. original data
def plot_function_and_drawing(df_entry):
    _drawingx, _drawingy = get_drawn_coordinates(df_entry)
    
    _, func_y = get_real_coordinates(df_entry, _drawingx)
#     _drawingx, _drawingy = scale_coordinates(df_entry, _drawingx, _drawingy)
#     _, func_y = scale_coordinates(df_entry, _drawingx, func_y)
    
    if df_entry.ProjectionName == 'Cartesian':
        # cartesian projection
        fig, ax = plt.subplots()
    else:
        # polar projection
        fig, ax = plt.subplots(subplot_kw={'projection': 'polar'})
    ax = fig.gca()
    ax.plot(_drawingx, func_y, label='Original')
    ax.plot(_drawingx, _drawingy, label='Drawn', color='gray')
    ax.set_title(
        f'FigureID: {df_entry.FigureID}, Projection: {df_entry.Projection}'
    )
    ax.legend()
    plt.show()

entry = df.query('Projection == 3 and FigureID == 2').iloc[8]
print('Entry ID:', entry.name)
plot_function_and_drawing(entry)

In [None]:
# prepare data for training
columncnt = 5000
from display_properties import X_RANGE
# this is the X RANGE for ALLLLL (!!!) OF THE functions
# both the drawn and the generated (original) functions
X_FOR_ALL_FUNC = np.linspace(X_RANGE['start'], X_RANGE['end'], columncnt)

def prepare_points(df, columncnt:int):
    """
    This accepts the original dataframe as input
    and returns the points encoded and ready for training.
    All coordinates are scaled between [0, 1]
    and scaled to be the same length (columncnt is the length)
    
    Returns three values:
    - the value on the X axis
    - the value drawn by the user
    - the real value of the function
    
    """
    def convert_to_same_size(columncnt:int, x, y):
        resultx = np.zeros(columncnt)
        resulty = np.zeros(columncnt)
        if(len(x) < columncnt):
            # this has the effect of zero-padding
            # because were adding the drawings to an array of zeros
            resultx[:len(x)] = x
            resulty[:len(y)] = y
        else:
            # otherwise crop the drawing
            resultx = x[:columncnt]
            resulty = y[:columncnt]
        return resultx, resulty
    
    x = []
    drawn_y = []
    real_y = []
    for index in df.index:
        entry = df.loc[index]
        # first, get the drawn coordinates
        _drawingx, _drawingy = get_drawn_coordinates(entry)
        tmp1, tmp2 = _drawingx.copy(), _drawingy.copy()
        
        # now, get the real function coordinates
        func_x = X_FOR_ALL_FUNC
        _, func_y = get_real_coordinates(entry, func_x)
        
        # interpolate the drawn curves so they always have the same x values
        if entry.ProjectionName == 'Cartesian':
            _drawingy = np.interp(func_x, _drawingx, _drawingy)
        else:
            # polar function have a period of 2pi so interpolate them as such
            _drawingy = np.interp(func_x, _drawingx, _drawingy, period=360)
        _drawingx = func_x
        
        # test to see if the interpolated function still looks like the original
#         if entry.ProjectionName == 'Cartesian':
#             # cartesian projection
#             fig, ax = plt.subplots()
#         else:
#             # polar projection
#             fig, ax = plt.subplots(subplot_kw={'projection': 'polar'})
#         ax.plot(func_x, func_y, label='original func')
#         ax.plot(tmp1, tmp2, label='original drawn')
#         ax.plot(_drawingx, _drawingy, label='interp')
#         ax.legend()
#         plt.show()
                
        # scale coords between 0 and 1
        _, func_y = scale_coordinates(entry, _drawingx, func_y)
        _drawingx, _drawingy = scale_coordinates(entry, _drawingx, _drawingy)
#         # add zero padding where necessary
#         _, func_y = convert_to_same_size(columncnt, x=_drawingx, y=func_y)
#         _drawingx, _drawingy = convert_to_same_size(columncnt, x=_drawingx, y=_drawingy)
        
        x.append(_drawingx)
        drawn_y.append(_drawingy)
        real_y.append(func_y)
        
    x = np.array(x)
    drawn_y = np.array(drawn_y)
    real_y = np.array(real_y)
    
    return x, drawn_y, real_y

def prepare_dataset_for_user_classification(df, columncnt:int):
    x, drawn_y, real_y = prepare_points(df, columncnt)
    y = np.abs(drawn_y - real_y)
    
    # now prepare the rest: i.e. the device used, the projection and the usernames
    proj_encoder = prep.LabelBinarizer()
    proj_encoded = proj_encoder.fit_transform(df.ProjectionName)
    user_encoder = prep.LabelBinarizer()
    user_encoded = user_encoder.fit_transform(df.Username)
    # the devices only have two values: Mouse & Graphic tablet
    # so use Label Encoder instead
    device_encoder = prep.LabelBinarizer()
    device_encoded = device_encoder.fit_transform(df.Device)
    
    device_encoded = np.array(device_encoded, dtype='int16')
    user_encoded = np.array(user_encoded, dtype='int16')
    proj_encoded = np.array(proj_encoded, dtype='int16')
    
    from collections import namedtuple
    Entry = namedtuple('Entry', ['encoded', 'encoder', 'name'])
    entries_to_add_to_df: typing.List[Entry] = [
        Entry(proj_encoded, proj_encoder, 'Projection'),
        Entry(user_encoded, user_encoder, 'Username'),
        Entry(device_encoded, device_encoder, 'Device'),
#         Entry(x, None, 'Xcoord'),
        Entry(drawn_y, None, 'drawnY'),
        Entry(real_y, None, 'realY'),
#         Entry(y, None, 'YDiff')
    ]
    
    # now, join everything into a single dataframe
    # first, get column names to use
    columns = []
    for encoded, encoder, name in entries_to_add_to_df:
        col_names = []
        for _idx in range(np.shape(encoded)[1]):
            _tmp = _idx
            if(encoder is not None):
                _tmp = encoder.classes_[_idx]
            col_names.append(f'{name}_{_tmp}')
        
        columns = columns + col_names
    # horizontally stack all of the encoded values
    data = np.hstack([entry.encoded for entry in entries_to_add_to_df])
    df = pd.DataFrame(
        columns=columns,
        data=data,
        index=df.index
    )
    return df

df_to_use = df.copy()
# df_to_use = df.copy()
display(df_to_use)
encoded_df = prepare_dataset_for_user_classification(
    df_to_use,
    columncnt=5000
)
display(encoded_df)

In [None]:
train_df = encoded_df[df_to_use.TestIndex == 0]
test_df = encoded_df[df_to_use.TestIndex == 1]

train_dfY = train_df.filter(regex='Username_*')
train_dfX = train_df.drop(train_dfY.columns, axis=1)

test_dfY = test_df.filter(regex='Username_*')
test_dfX = test_df.drop(test_dfY.columns, axis=1)

display(train_dfX)
display(test_dfX)

# Classification model in PyTorch
https://stackabuse.com/introduction-to-pytorch-for-classification/

In [None]:
import torch
import torch.nn as nn
# limit CPU usage
torch.set_num_threads(50)

In [None]:
class ClassificationModel(nn.Module):
    def __init__(self, hidden_layer_sizes:typing.List[int], input_size:int, output_size=int, dropout=0.3):
        super().__init__()
        _all_layers = []
        for _size in hidden_layer_sizes:
            _all_layers.append(nn.Linear(input_size, _size))
            _all_layers.append(nn.ReLU(inplace=True))
            _all_layers.append(nn.BatchNorm1d(_size))
            # apply dropout if specified so
            if dropout > 0:
                _all_layers.append(nn.Dropout(dropout))
            # update the input size of the following layer
            input_size = _size
        # this is the output layer
        _all_layers.append(nn.Linear(in_features=hidden_layer_sizes[-1], out_features=output_size))
        # apply another activation function to the output layer, so that the output is scaled
#         _all_layers.append(nn.Softmax(dim=1))
        
        # finally, create a model that has all these layers
        # aligned in a sequence
        # this means the layers will be called/processed one after the other
        self.layers = nn.Sequential(*_all_layers)
    
    def forward(self, x):
#         print(x[0:10])
        return self.layers(x)

# convert the pandas df into torch tensor
torch_train_dfX = torch.tensor(np.array(train_dfX, dtype='float32'))
torch_train_dfY = torch.tensor(np.array(train_dfY, dtype='float32'))
torch_test_dfX = torch.tensor(np.array(test_dfX, dtype='float32'))
torch_test_dfY = torch.tensor(np.array(test_dfY, dtype='float32'))

In [None]:
input_size = torch_train_dfX.shape[1]
output_size = torch_train_dfY.shape[1]
display(input_size, output_size)
model = ClassificationModel([2**10, 2**8, 2**6], input_size=input_size, output_size=output_size, dropout=0.5)
display(model)

In [None]:
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model.parameters(), lr=0.001)
epochs = 15
aggregated_losses = []

for _epoch_idx in range(epochs):
    y_pred = model(torch_train_dfX)
    loss_in_this_epoch = loss_function(y_pred, torch_train_dfY)
    aggregated_losses.append(loss_in_this_epoch)
    
    _end = '\r'
    if (_epoch_idx + 1) % 10 == 0:
        _end = '\n'
    print(f'Epoch {_epoch_idx+1:4} ===> loss {loss_in_this_epoch.item():10.8f}', end=_end)

    optimizer.zero_grad()
    # propagate the calculated loss backward through the network
    loss_in_this_epoch.backward()
    # update the weights in the network
    optimizer.step()

In [None]:
plt.figure()
plt.plot(range(epochs), [loss.detach().numpy() for loss in aggregated_losses])
plt.xlabel('Epoch'); plt.ylabel('Loss'); plt.show()

with torch.no_grad():
    from sklearn.metrics import classification_report, confusion_matrix
    model.eval()
    y_val = model(torch_test_dfX)
    loss = loss_function(y_val, torch_test_dfY)
    
    _cnt = 10
    argmax_y_val = np.argmax(y_val, axis=1)
    argmax_y_true = np.argmax(torch_test_dfY, axis=1)
    print(f'Loss on test data: {loss}')
    print(argmax_y_val[:_cnt])
    print(argmax_y_true[:_cnt])
    print(y_val[:_cnt])
    display(test_dfY)
    
    print(classification_report(argmax_y_true, argmax_y_val))
    print(confusion_matrix(argmax_y_true, argmax_y_val))
    
    for i in range(100):
        real_user_index = argmax_y_true[i]
        predicted_user_index = argmax_y_val[i]
        if(real_user_index != predicted_user_index):
            # plot how the predicted and the real user
            # drew this figure
            _df_entry = df_to_use.query('TestIndex == 1').iloc[i]
            x_real, y_real = get_drawn_coordinates(_df_entry)
            real_user_name = _df_entry.Username
            
            predicted_user_name = list(test_dfY.columns)[predicted_user_index]
            predicted_user_name = re.sub('Username_', '', predicted_user_name)
            _predicted_user_df_entry = df.query(
                f'Username == "{predicted_user_name}" and Device == "{_df_entry.Device}"' +
                f' and FigureID == {_df_entry.FigureID} and Projection == {_df_entry.Projection}'
            ).sample(1).iloc[0]
            x_pred, y_pred = get_drawn_coordinates(_predicted_user_df_entry)
            
            if _df_entry.ProjectionName == 'Cartesian':
                # cartesian projection
                fig, ax = plt.subplots()
            else:
                # polar projection
                fig, ax = plt.subplots(subplot_kw={'projection': 'polar'})
            ax = fig.gca()
            ax.plot(x_real, y_real, label=f'Real user {real_user_name}')
            ax.plot(x_pred, y_pred, label=f'Pred user {predicted_user_name}')
            ax.legend()
            plt.show()
    

# Autoencoder to generate new drawings
https://debuggercafe.com/getting-started-with-variational-autoencoder-using-pytorch/

In [None]:
class VAEModel(nn.Module):
    def __init__(self, hidden_layer_sizes:typing.List[int], in_size:int, out_size:int, dropout:float=0.3):
        super().__init__()
        
        def get_layers(_layer_sizes:typing.List[int], input_size:int):
            _layers = []
            for i in _layer_sizes:
                _layers.append(nn.Linear(in_features=input_size, out_features=i))
                _layers.append(nn.LeakyReLU(inplace=True))
                _layers.append(nn.BatchNorm1d(i))
                if dropout > 0:
                    _layers.append(nn.Dropout(dropout))
                # update the input size of the following layer
                input_size = i
            return _layers
            
        # encoder part of the network
        _encoder_layers = get_layers(hidden_layer_sizes, input_size=in_size)
        
        # we want the bottleneck to be divisible by two
        # so this right here finds the power of 2 which is the closest
        # to the last layer
        _bottleneck_pow = int(np.log2(hidden_layer_sizes[-1]))
        self.bottleneck_size = 2 ** (_bottleneck_pow - 1)
        
        # decoder part of network
        _reversed_lyrs = list(reversed(hidden_layer_sizes))
        _decoder_layers = get_layers(_reversed_lyrs, input_size=self.bottleneck_size)
        _decoder_layers.append(nn.Linear(in_features=_reversed_lyrs[-1], out_features=out_size))
        
        self.encoder = nn.Sequential(*_encoder_layers)
        self.decoder = nn.Sequential(*_decoder_layers)
    
    def reparametrize(self, mu: typing.Iterable[float], log_var: typing.Iterable[float]):
        # mu = mean from the encoder latent space
        # log_var = log variance of the encoder latent space
        # these two are tensor arrays
        
        # standard deviation --> get form log variance
        std = torch.exp(0.5 * log_var)
        
        # this will return an array of values from the normal distributions
        # where mean is 0 and var is 1
        # i.e. this will be an array of a normalized dist with the same
        # shape as the array 'std'
        eps = torch.randn_like(std)
        
        # shift the distribution to 'mu' mean and 'log_var' variance
        sample = mu + (eps * std)
        return sample
        
    def forward(self, x):
        x = self.encoder(x)
        x = x.view(-1, 2, self.bottleneck_size)
        # x as this point is NOT the latent representation
        
        mu = x[:, 0, :]
        log_var = x[:, 1, :]
        def numnan(arr):
            arr = arr.detach().numpy()
            return np.sum(np.isnan(arr))
        
        
        z = self.reparametrize(mu, log_var)
        # now we have the latent representation
        
        x = self.decoder(z)
        reconstruction = torch.sigmoid(x)
        return reconstruction, mu, log_var

In [None]:
def prepare_dataset_for_vae(df, columncnt):
    x, drawn_y, real_y = prepare_points(df, columncnt=columncnt)
    x = np.array(x, dtype='float32')
    drawn_y = np.array(drawn_y, dtype='float32')
    real_y = np.array(real_y, dtype='float32')
    
    # input should contain the value on the x axis and the
    # original (real) value
#     X = np.hstack([x, real_y])
    X = drawn_y
    
    # the output should contain the drawn value
#     Y = np.hstack([x, drawn_y])
    Y = real_y
    print(X.shape, Y.shape)

    return X, Y

def perform_train(trainx, trainy, testx, testy):    
    model = VAEModel([2**13, 2**12, 2**11], in_size=trainx.shape[1], out_size=trainy.shape[1],  dropout=0)
    display(model)
    
    lr = 0.001
    batch_size = 64
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    bce = nn.BCELoss(reduction='mean')
    
    # dataloaders with batch sizes
    trainx_loader = torch.utils.data.DataLoader(torch.tensor(trainx), batch_size=batch_size, shuffle=False)
    trainy_loader = torch.utils.data.DataLoader(torch.tensor(trainy), batch_size=batch_size, shuffle=False)
    testx_loader = torch.utils.data.DataLoader(torch.tensor(testx), batch_size=batch_size, shuffle=False)
    testy_loader = torch.utils.data.DataLoader(torch.tensor(testy), batch_size=batch_size, shuffle=False)
    
    def loss_function(bce_loss, mu, log_var):
        BCE = bce_loss
        KLD = -0.5 * torch.sum(1 + log_var - mu ** 2 - log_var.exp())
        # the final loss is binary cross entropy + kullback leibler divergence
        return BCE + KLD
    
    def fit(model, xdataloader, ydataloader, validation:bool=False):
        i = 0
        total_epoch_loss = 0.0
        for x, y in zip(xdataloader, ydataloader):
            if validation is False:
                optimizer.zero_grad()

            reconstruction, mu, log_var = model(x)
            bce_loss = bce(reconstruction, y)
            loss = loss_function(bce_loss, mu, log_var)
            total_epoch_loss += loss
                            
#             if i == 0:
#                 fig = plt.figure()
#                 ax = fig.gca()
#                 import random
#                 _ind = random.randint(0, len(x)-1)
#                 tmp = x[_ind]
#                 tmp2 = y[_ind]
#                 tmp3 = reconstruction[_ind]
#                 tmp3 = [t.detach().numpy() for t in tmp3]
#                 ax.plot(X_FOR_ALL_FUNC, tmp)
#                 ax.plot(X_FOR_ALL_FUNC, tmp2)
#                 ax.plot(X_FOR_ALL_FUNC, tmp3)
#                 ax.set_title(f'index {_ind}')
#                 plt.show()
#                 i += 1

            if validation is False:
                loss.backward()
                optimizer.step()
        return total_epoch_loss

    epochs = 50
    train_losses, val_losses = [], []
    for _epoch_idx in range(epochs):
        # switch model to training mode
        model.train()
        train_epoch_loss = fit(model, trainx_loader, trainy_loader, False)
        train_losses.append(train_epoch_loss)
        
        # and now val mode
        model.eval()
        with torch.no_grad():
            val_epoch_loss = fit(model, testx_loader, testy_loader, True)
            val_losses.append(val_epoch_loss)
        
        print(f'Epoch: {_epoch_idx+1:5} / {epochs};',
              f' train loss: {train_epoch_loss:.5f}, val loss: {val_epoch_loss:.5f}')
    
    def plot_training_stats():
        fig = plt.figure()
        ax = fig.gca()
        ax.plot(range(epochs), [loss.detach().numpy() for loss in train_losses], label='Train')
        ax.plot(range(epochs), [loss.detach().numpy() for loss in val_losses], label='Val')
        ax.set_title('Losses')
        ax.legend()
        plt.show()
    
    plot_training_stats()
    return model

def do_everything(columncnt):
    trainx, trainy = prepare_dataset_for_vae(df.query('TestIndex == 0'), columncnt)
    testx, testy   = prepare_dataset_for_vae(df.query('TestIndex == 1'), columncnt)
    model = perform_train(trainx, trainy, testx, testy)

    return model, (trainx, trainy, testx, testy)

columncnt = len(X_FOR_ALL_FUNC)
vae_model, datasets = do_everything(columncnt)

In [None]:
def draw_prediction(testx, testy, model, columncnt):
    fig = plt.figure()
    ax = fig.gca()
    import random
    _ind = random.randint(0, len(testx))
    real_func_x = X_FOR_ALL_FUNC
    real_func_y = testx[_ind]
    ax.plot(real_func_x, real_func_y, label='Original')
    
    drawn_func_x = X_FOR_ALL_FUNC
    drawn_func_y = testy[_ind]
    ax.plot(drawn_func_x, drawn_func_y, label='Drawn')
    
    with torch.no_grad():
        predicted, _, _ = model(torch.tensor(testx))
        
#         pred_func_x = predicted[_ind, :columncnt]
        pred_func_x = X_FOR_ALL_FUNC
        pred_func_y = predicted[_ind]
        ax.plot(pred_func_x, pred_func_y, label='Predicted')
    ax.legend()
    plt.show()

draw_prediction(datasets[2], datasets[3], vae_model, columncnt)