# 0. Data

In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
device = torch.device("cuda")
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from scipy.linalg import block_diag # for computation efficiency - Kronecker product with identity matrix and regular matrix

In [2]:
'split data'
'first 21 columns are input variables and last 7 columns are output variables'

train_data = pd.read_csv('./data/SARCOSTst.csv', header=None)[:1000]
test_data = pd.read_csv('./data/SARCOSTrn.csv', header=None)[:100]

X_train, X_valid, Y_train, Y_valid = train_test_split(train_data.iloc[:,:21], train_data.iloc[:,-7:], test_size=0.2)
X_test, Y_test = test_data.iloc[:,:21], test_data.iloc[:,-7:]

X_train = torch.tensor(X_train.values).to(device)
X_valid = torch.tensor(X_valid.values).to(device)
Y_train = torch.tensor(Y_train.values).to(device)
Y_valid = torch.tensor(Y_valid.values).to(device)
X_test = torch.tensor(X_test.values).to(device)
Y_test = torch.tensor(Y_test.values).to(device)

print('train set: ', X_train.shape, Y_train.shape)
print('valid set: ', X_valid.shape, Y_valid.shape)
print('test set: ', X_test.shape, Y_test.shape)

train set:  torch.Size([800, 21]) torch.Size([800, 7])
valid set:  torch.Size([200, 21]) torch.Size([200, 7])
test set:  torch.Size([100, 21]) torch.Size([100, 7])


# 1. Kernel

In [3]:
class LinearKernel:
    """
    standard dot product kernel k(a,b) = a^\top b
    :input: X1 (N*D), X2 (M*D)
    :output: covariance matrix (N*M)
    """
    def __init__(self):
        pass

    def __call__(self, X1, X2):
        # return torch.tensor([[torch.dot(x1, x2) for x1 in X1] for x2 in X2]) 
        return torch.matmul(X1, X2.T) 


In [4]:
class GaussianKernel:
    """
    isotropic Gaussian kernel
    :input: X1 (N*D), X2 (M*D)
    :output: covariance matrix (N*M)
    """
    def __init__(self, sigma_k=1):
        self.sigma_k = sigma_k # isotropic Gaussian kernel variance (Hyperparameter !)

    def __call__(self, X1, X2):
        # return np.exp(-(np.sum(X1**2, axis=1).values.reshape(-1, 1) +
        #                 np.sum(X2**2, axis=1).values.reshape(1, -1) - 2*X1@X2.T) / pow(self.sigma_k, 2))
        return torch.exp(-(torch.sum(X1**2, axis=1).reshape(-1, 1) +
                           torch.sum(X2**2, axis=1).reshape(1, -1) - 2*torch.matmul(X1, X2.T)) / pow(self.sigma_k, 2))


# 2. GP regression

In [36]:
class GP_Regression(nn.Module):
    def __init__(self, X_train, Y_train, K:GaussianKernel(), sigma_n, device):
        super().__init__()
        self.X_train = X_train
        self.Y_train = Y_train
        self.K = K
        self.sigma_n = nn.Parameter(torch.tensor(sigma_n), requires_grad=True)  # noise variance (Hyperparameter)
        self.device = device
        self.D = self.Y_train.shape[1] # output dim (=7)
    
    def fit(self, X_test):
        """
        calculate sufficient statistics
        """
        self.K_X_X = self.K(self.X_train, self.X_train)
        self.K_X_X = torch.block_diag(*[self.K_X_X]*self.D)
        self.K_Xt_X = self.K(X_test, self.X_train)
        self.K_Xt_X = torch.block_diag(*[self.K_Xt_X]*self.D)
        self.K_X_Xt = self.K_Xt_X.T
        self.K_Xt_Xt = self.K(X_test, X_test)
        self.K_Xt_Xt = torch.block_diag(*[self.K_Xt_Xt]*self.D)
        self.Sigma = self.sigma_n * torch.eye(self.D).to(self.device)
        self.I = torch.eye(len(self.X_train)).to(self.device)
        self.y_concat = self.Y_train.T.reshape(self.D*Y_train.shape[0])

    def __NLL_term_1__(self):
        combined = []
        for d in range(self.D):
            combined.append( 
                (-0.5 * torch.matmul( torch.matmul(self.Y_train, self.K_X_X+torch.kron(self.Sigma,self.I)), self.Y_train.T )).item() 
            )
        return sum(combined)

    def __NLL_term_2__(self):
        return -0.5 * torch.log(torch.det(self.K_X_X + torch.kron(self.Sigma, self.I)))
    
    def predict(self):
        
        K_X_X, K_X_Xt, K_Xt_X, K_Xt_Xt, Sigma, I, y_concat = self.K_X_X, self.K_X_Xt, self.K_Xt_X, self.K_Xt_Xt, self.Sigma, self.I, self.y_concat

        # predictive mean, covariance, variance
        mean = K_Xt_X @ torch.linalg.inv( K_X_X + torch.kron(Sigma,I) ) @ y_concat
        cov = K_Xt_Xt - K_Xt_X @ torch.linalg.inv( K_X_X + torch.kron(Sigma,I) ) @ K_X_Xt
        var = torch.diag(cov)

        self.predictive_distribution = {'mean': mean, 'cov':cov, 'var':var}

        return mean.reshape(self.D, -1).T
    
    def calculate_NLL(self):
        return self.__NLL_term_1__() + self.__NLL_term_2__()

In [42]:
model = GP_Regression(X_train, Y_train, GaussianKernel(), sigma_n=0.05, device=device)
model.fit(X_test)
pred = model.predict()

loss_f = nn.MSELoss()
mse = loss_f(pred, Y_test)
print(mse)


tensor(377.1774, device='cuda:0', dtype=torch.float64,
       grad_fn=<MseLossBackward0>)


# 3. Main