In [2]:
from pathlib import Path
import os
path = Path().absolute()

if path.name =="GP":
    os.chdir(path.parent)

BASEDIR = Path().absolute()
BASEDIR

PosixPath('/home/taerim/sOftrobot/PRIMNET-V2/code')

In [113]:
from utils.args import read_ARGS
configs = "PRIMNET/FINGER.py"
args = read_ARGS((BASEDIR/'configs'/configs).absolute())
args

ARGS(MODEL='PRIMNET', EVEN_JOINTS=True, WANDB=True, pname='PRIMNET_v2.2', runname='FINGER', DATASET='FINGER', TPOSE=((0, 0, 0.12),), LOAD_WEIGHTPATH=None, SAVE_PERIOD=1, TEST_PERIOD=1, EVEN_JOINT=True, p_offset_std=0.1, rpy_offset_std=0.01, axis_std=0.1, OUTPUT_NORMALIZE=False, seed=0, hdim=(16, 16), motor_embed_dim=4, lr=0.0015, lrd=0.95, wd=0.0, w_vec=0.1, epochs=2000, focus_ratio=0.2, data_ratio=1.0, n_workers=2, batch_size=64, joint_seqs=('T', 'R', 'R', 'R', 'R', 'T', 'P'), marker_num=1, motor_dim=2)

In [114]:

from utils.dataloader import get_dataset, Sampler
train_dataset,val_dataset,test_dataset,ext_dataset  = get_dataset(f"../dataset/{args.DATASET}.json", 1.0)


train_sampler = Sampler(args, train_dataset)
val_sampler   = Sampler(args, val_dataset)
test_sampler  = Sampler(args, test_dataset)
ext_sampler   = Sampler(args, ext_dataset)


In [128]:
# %%
import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
from scipy.spatial.distance import pdist, cdist, squareform
from scipy.linalg import cholesky, cho_solve, solve_triangular
import torch

GPR_CHOLESKY_LOWER = True

class GPR(object):
    def __init__(self):
        self.alpha = 1e-3

    def se_kernel(self, Xa, Xb):
        # dists_ = pdist(Xa, metric='sqeuclidean')
        # K_ = np.exp(-.5 * gamma * dists_)
        # K_ = squareform(K_)
        # np.fill_diagonal(K, 1)

        dists = cdist(Xa, Xb, metric='sqeuclidean')
        K = np.exp(- dists / (self.median))

        # print(np.equal(K, K_))
        return K

    def fit(self, X, y):
        # Normalize fitting data
        self.X_mean = np.mean(X, axis=0)
        self.X_std = np.std(X, axis=0)
        self.X = (X-self.X_mean)/self.X_std

        self.y_mean = np.mean(y, axis=0)
        self.y_std = np.std(y, axis=0)
        self.y_std = np.maximum(self.y_std, 1e-8)
        self.y = (y-self.y_mean)/self.y_std

        # print(pdist(X, 'sqeuclidean'))
        self.median = np.median(pdist(X, 'sqeuclidean'))

        K = self.se_kernel(self.X, self.X)
        # noise™
        K[np.diag_indices_from(K)] += self.alpha
        try:
            self.L_ = cholesky(K, lower=GPR_CHOLESKY_LOWER, check_finite=False)
        except np.linalg.LinAlgError as exc:
            exc.args = (
                f"The kernel, {self.kernel_}, is not returning a positive "
                "definite matrix. Try gradually increasing the 'alpha' "
                "parameter of your GaussianProcessRegressor estimator.",
            ) + exc.args
            raise
        # Alg 2.1, page 19, line 3 -> alpha = L^T \ (L \ y)
        self.alpha_ = cho_solve(
            (self.L_, GPR_CHOLESKY_LOWER),
            self.y,
            check_finite=False,
        )
        return self
    
    def predict(self, X):
        X = (X-self.X_mean)/self.X_std
        K_trans = self.se_kernel(X, self.X)
        y_mean = K_trans @ self.alpha_
        y_mean = self.y_std * y_mean + self.y_mean
        # V = solve_triangular(
        #     self.L_, K_trans.T, lower=GPR_CHOLESKY_LOWER, check_finite=False
        # )
        
        return y_mean




from torch.nn import functional as F
def p_loss_fn(x,y):
    loss =  F.l1_loss(x,y, reduction='none')
    loss = torch.mean(loss, dim=list(range(1,loss.ndim)))
    return loss



# %%
test_pred_list = []
test_target_list = []
ext_pred_list = []
ext_target_list = []

train_batch = train_sampler.sample_all()
test_batch = test_sampler.sample_all()
ext_batch = ext_sampler.sample_all()

for i in range(args.marker_num):
    # Train Dataset #
    train_X, train_y = train_batch['motor_control'].numpy(), train_batch['position'].numpy()[:,i,:,0]


    # Test Dataset
    test_X, test_y = test_batch['motor_control'].numpy(), test_batch['position'].numpy()[:,i,:,0]

    # Ext Dataset
    ext_X, ext_y = ext_batch['motor_control'].numpy(), ext_batch['position'].numpy()[:,i,:,0]


    gpr = GPR()
    gpr.fit(train_X, train_y)
    
    pred_y = gpr.predict(test_X)

    test_pred_list.append(torch.FloatTensor(pred_y))
    test_target_list.append(torch.FloatTensor(test_y))
    
    pred_y = gpr.predict(ext_X)
    ext_pred_list.append(torch.FloatTensor(pred_y))
    ext_target_list.append(torch.FloatTensor(ext_y))
    

In [129]:
# Interpolation
test_pred = torch.stack(test_pred_list, dim = 1)
test_target = torch.stack(test_target_list, dim=1)

test_pred.shape, test_target.shape

position_loss = p_loss_fn(test_pred, test_target)

float(torch.mean(position_loss))

0.0063729495741426945

In [130]:
# Extrapolation
ext_pred = torch.stack(ext_pred_list, dim = 1)
ext_target = torch.stack(ext_target_list, dim=1)

ext_pred.shape, ext_target.shape

position_loss = p_loss_fn(ext_pred, ext_target)
float(torch.mean(position_loss))

0.012367046438157558