In [1]:
import pandas as pd
import numpy as np
from scipy.linalg import kron
from scipy.spatial.distance import cdist
from sklearn.model_selection import train_test_split

In [37]:
seed = 1
np.random.seed(seed)

# Data: SARCOS
로봇팔 데이터 SARCOS 데이터 세트의 Joint positions 7개, Velocities 7개, Accelerations 7개를 이용해, **Torque의 7개 칼럼을 예측**한다.

In [41]:
train_data = pd.read_csv('./data/SARCOSTst.csv', header=None)[:2000]
test_data = pd.read_csv('./data/SARCOSTrn.csv', header=None)[:200]

X_train, X_valid, y_train, y_valid = train_test_split(train_data.iloc[:,:-7], train_data.iloc[:,21:], test_size=0.1, random_state=seed)
X_test, y_test = test_data.iloc[:,:-7], test_data.iloc[:,21:]

print("SARCOS data size")
print("Train", X_train.shape, y_train.shape)
print("Valid", X_valid.shape, y_valid.shape)
print("Test", X_test.shape, y_test.shape)

SARCOS data size
Train (1800, 21) (1800, 7)
Valid (200, 21) (200, 7)
Test (200, 21) (200, 7)


# Kernel
- Linear inner product kernel
$$ k(x_1,x_2) = \sigma_0 + x_1 \cdot x_2 $$
- Non-linear Gaussian kernel
$$ k(x_1,x_2) = \exp(-{1 \over \sigma_k^2}\left|x_1-x_2\right|^2)$$

In [6]:
# Linear inner product kernel
class LinearKernel:
    """
    Input
    -----
    X1: (N1, D)
    X2: (N2, D)
    
    Output
    ------
    kernal matrix: (N1, N2)
    
    """
    def __init__(self, sigma_0=1.0):
        self.sigma_0 = sigma_0
    
    def __call__(self, X1, X2):
        return self.sigma_0 + X1 @ X2.T

In [7]:
from scipy.spatial.distance import cdist

# Non-linear Gaussian kernel
class GaussianKernel:
    """
    Input
    -----
    X1: (N1, D)
    X2: (N2, D)
    
    Output
    ------
    kernal matrix: (N1, N2)
    """
    def __init__(self, sigma_k=1.0):
        self.sigma_k = sigma_k
    
    def __call__(self, X1, X2):
        dist = cdist(X1, X2, 'sqeuclidean')
        return np.exp(-dist/self.sigma_k**2)

In [8]:
# check
kernel = LinearKernel()

a = np.array([[1, 2],
              [3, 4]])
b = np.array([[2, 2],
              [2, 2]])
print("linear")
print(kernel(a,b))

gkernel = GaussianKernel()
print("non-linear")
print(gkernel(a,b))

linear
[[ 7.  7.]
 [15. 15.]]
non-linear
[[0.36787944 0.36787944]
 [0.00673795 0.00673795]]


# Multi-Output Gaussian Process Regression
Predictive mean:
$$
\begin{equation}
\begin{align}
vec(M*)= (C_{DD}\otimes_{kron}K^*_{N_{test}N_{train}})(C_{DD}\otimes_{kron}K_{N_{train}N_{train}}+\Sigma_{DD}\otimesI_{N_{train}N_{train}})^{-1}vec(Y_{N_{train}D})
\end{align}
\end{equation}
$$

In [12]:
# utils
def vec(v):
    """
    [[a,a'],      [[a],
     [b,b'],  ->   [b],
     [c,c']]       [c],
                   [a'],
                   [b'],
                   [c']]
    """
    return np.reshape(v.T, (-1,1))

def unvec(v, r, c):
    return np.reshape(v, (c,r)).T

In [10]:
# a = np.array([[2,22,222],
#               [3,33,333]])
# r, c = a.shape
# print(a)
# aa = np.reshape(a.T,(-1,1))
# print(aa)

[[  2  22 222]
 [  3  33 333]]
[[  2]
 [  3]
 [ 22]
 [ 33]
 [222]
 [333]]


In [176]:
# np.reshape(aa, (c,r)).T

array([[  2,  22, 222],
       [  3,  33, 333]])

In [178]:
# a = np.array([[2,22],
#               [3,33],
#               [4,44]])
# print(a)
# r, c = a.shape
# aa = vec(a)
# print(aa)
# print(unvec(aa, r, c))


[[ 2 22]
 [ 3 33]
 [ 4 44]]
[[ 2]
 [ 3]
 [ 4]
 [22]
 [33]
 [44]]
[[ 2 22]
 [ 3 33]
 [ 4 44]]


In [11]:
class MOGPR:
    """
    Input
    -----
    Xtrain: (N_train, D)
    ytrain: (N_train, D)
    kernel: Hyperparameter
    coregionalisation_matrix: Identity matrix
    sigma: Hyperparameter
    
    Output
    ------
    mean: (N_test, D)
    """
    def __init__(self, Xtrain, ytrain, kernel, coregionalisation_matrix="I",  sigma=1.0):
        self.X_train = np.array(Xtrain)
        self.y_train = np.array(ytrain)
        self.kernel = kernel
        self.C = coregionalisation_matrix
        self.sigma = sigma
        self._predictive_distribution = None
    
    def predict(self, Xtest):
        Xtest = np.array(Xtest)
        N, D = self.y_train.shape
        if self.C == "I":
            self.C = np.identity(D)
        
        Sigma = np.diag(np.full(D,self.sigma))
        
        # Kernel matrices
        K = self.kernel(self.X_train, self.X_train) # (N_train, N_train)
        K_star = self.kernel(Xtest, self.X_train) # (N_test, N_train)
        K_star_2 = self.kernel(Xtest, Xtest) # (N_test, N_test)
        
        # Block matrices
        CK = kron(self.C, K)
        CK_star = kron(self.C, K_star)
        CK_starT = kron(self.C, K_star.T)
        CK_star_2 = kron(self.C, K_star_2)
        Sigma_block = kron(Sigma, np.identity(N))
        
        # Predictive distribution
        Common = CK_star @ np.linalg.inv(CK + Sigma_block)
        mean = Common @ vec(self.y_train)
        cov = CK_star_2 - Common @ CK_starT
        var = np.diag(cov)
#         mean = np.linalg.inv(CK + Sigma_block) @ vec(self.y_train)
#         mean = CK_star @ mean
        
        mean = unvec(mean, D, Xtest.shape[0]).T
        self._predictive_distribution = {"mean": mean, "cov": cov, "var": var}
        return mean

In [158]:
# N, D = y_train.shape
# C = np.identity(D)

# Sigma = np.diag(np.full(D,1.0))

# print(N, D)
# print(C.shape)
# print(Sigma.shape)
# Sigma_block = kron(Sigma, np.identity(N))
# print(Sigma_block.shape)

800 7
(7, 7)
(7, 7)
(5600, 5600)


# Full MOGP

In [160]:
import time

model = MOGPR(X_train, y_train, GaussianKernel())

start = time.time()

# My Code
model.predict(X_test)

end = time.time()
print("Run time [s]: ",end-start)

Run time [s]:  3.2480571269989014


In [8]:
import time

model = MOGPR(X_train, y_train, LinearKernel())

start = time.time()

# My Code
model.predict(X_test)

end = time.time()
print("Run time [s]: ",end-start)

Run time [s]:  80.82301592826843


In [11]:
import time

model = MOGPR(X_train, y_train, GaussianKernel())

start = time.time()

# My Code
model.predict(X_test)

end = time.time()
print("Run time [s]: ",end-start)

Run time [s]:  105.01352000236511


In [None]:
model._predictive_distribution["mean"].shape

In [None]:
print(model._predictive_distribution["cov"].shape)
print(model._predictive_distribution["var"].shape)

In [128]:
# a = np.array([[2,22,222],
#               [3,33,333]])
# print(a)
# np.diag(a)

[[  2  22 222]
 [  3  33 333]]


array([ 2, 33])

In [94]:
# gkernel = GaussianKernel()
# gkernel(X_train, X_train).shape

(800, 800)

In [100]:
# from scipy.linalg import block_diag, kron
# I_D = np.identity(3)
# A = np.array([[1,2,3],
#               [4,5,6]])
# print(I_D)
# print(A)
# print(block_diag(I_D, A))
# print(kron(I_D,A))

[[1. 0. 0.]
 [0. 1. 0.]
 [0. 0. 1.]]
[[1 2 3]
 [4 5 6]]
[[1. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0.]
 [0. 0. 0. 1. 2. 3.]
 [0. 0. 0. 4. 5. 6.]]
[[1. 2. 3. 0. 0. 0. 0. 0. 0.]
 [4. 5. 6. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 1. 2. 3. 0. 0. 0.]
 [0. 0. 0. 4. 5. 6. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 1. 2. 3.]
 [0. 0. 0. 0. 0. 0. 4. 5. 6.]]


# Subset of Regressors approximation

In [40]:
class SR_MOGPR:
    """
    From training set, it produces inducing inputs and get an approximate predictive dist.
    
    Input
    -----
    Xtrain: (N_train, D)
    ytrain: (N_train, D)
    kernel: Hyperparameter
    B: (float) Ratio of inducing inputs to Xtrain, Hyperparameter
    coregionalisation_matrix: Identity matrix
    sigma: Hyperparameter
    
    Output
    ------
    mean: (N_test, D)
    """
    def __init__(self, Xtrain, ytrain, kernel, B=0.1, coregionalisation_matrix="I",  sigma=1.0):
        self.X_train = np.array(Xtrain)
        self.y_train = np.array(ytrain)
        self.kernel = kernel
        self.B = B
        self.C = coregionalisation_matrix
        self.sigma = sigma
        self._predictive_distribution = None
        
        self.Zx = None
        self.Zy = None
        
    def predict(self, Xtest):
        Xtest = np.array(Xtest)
        N, D = self.y_train.shape
        if self.C == "I":
            self.C = np.identity(D)
        
        Sigma = np.diag(np.full(D,self.sigma))
        
        # Select inducing inputs - M = N_train*B
        idx = np.array(np.random.choice(self.X_train.shape[0], int(self.X_train.shape[0]*self.B), replace=False))
        self.Zx = np.array(self.X_train[idx])
        self.Zy = np.array(self.y_train[idx])
        
        # Approximate kernel matrices
        K_bb = self.kernel(self.Zx, self.Zx) # (M, M)
        K_xb = self.kernel(self.X_train, self.Zx) # (N_train, M)
        K_starb = self.kernel(Xtest, self.Zx) # (N_test, M)
        K_star_2 = self.kernel(Xtest, Xtest) # (N_test, N_test)
        
        KtK = K_xb.T @ K_xb # (M, M)
        
        # Block matrices
        CK_starb = kron(self.C, K_starb) # (D*N_test, DM)
        CK_bstar = kron(self.C, K_starb.T) # (DM, D*N_test)
        CKtK = kron(self.C, KtK) # (DM, DM)
        CK_star_2 = kron(self.C, K_star_2) # (D*N_test, D*N_test)
        Sigma_block = kron(Sigma, K_bb)
        inverse_block = np.linalg.inv(CKtK + Sigma_block)
        
        # Predictive distribution
        mean = inverse_block @ vec(K_xb.T @ self.y_train) # (DM, 1)
        mean = CK_starb @ mean # (D*N_test,1)
        cov = CK_star_2 - CK_starb @ inverse_block @ CK_bstar
        var = np.diag(cov)
        
        mean = unvec(mean, D, Xtest.shape[0]).T
        self._predictive_distribution = {"mean": mean, "cov": cov, "var": var}
        return mean

In [42]:
import time

model = SR_MOGPR(X_train, y_train, GaussianKernel())

start = time.time()

# My Code
model.predict(X_test)

end = time.time()
print("Run time [s]: ",end-start)

Run time [s]:  0.17807674407958984


In [43]:
import time

model = MOGPR(X_train, y_train, GaussianKernel())

start = time.time()

# My Code
model.predict(X_test)

end = time.time()
print("Run time [s]: ",end-start)

Run time [s]:  30.328718900680542
