# Distances and Localised Regression (old, not important)

In [None]:
from tosig_pytorch import EsigPyTorch
from esig import tosig
import numpy as np
import torch
import math
import time
import pickle
import pandas as pd

# to plot pretty figures
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
plt.rcParams['axes.labelsize'] = 14
plt.rcParams['xtick.labelsize'] = 12
plt.rcParams['ytick.labelsize'] = 12

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
import sklearn
from sklearn.neighbors import NearestNeighbors
from sklearn.neighbors.ball_tree import BallTree
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
import random

In [None]:
def projection(sig, k, width, depth):
    if not isinstance(k, int):
        raise TypeError('k must be an integer')
    if k<0:
        raise NameError('projection level must be non-negative')
    if k==0:
        return np.array([1.])
    if k > depth:
        return np.zeros(width**k)
    if isinstance(sig, torch.DoubleTensor):
        sig = sig.numpy()
    k1 = sum([width**i for i in range(k)])
    k2 = sum([width**i for i in range(k+1)])
    return sig[k1:k2]

In [None]:
def L2(tensor):
    return np.sqrt(sum([i**2 for i in tensor.flatten()]))

## Projection distance

In [None]:
def projection_distance(sig_1, sig_2, width, depth):
    sig_diff = sig_1 - sig_2
    projection_norms = [L2(projection(sig_diff, k, width, depth)) for k in range(depth+1)]
    projection_norms[0] = 0.
    return max(projection_norms)

## Homogeneous Distance from Exercises 7.37 - 7.38 p. 146 in Friz-Victoir

In [None]:
def norm(sigtensor, width, depth):
    projection_norms = [(math.factorial(k) * L2(
                                                projection(
                                                            pyt.tensor_log(sigtensor, depth)[1:], 
                                                            k, 
                                                            width, 
                                                            depth
                                                           )
                                               )
                         )**(1./k)
                        for k in range(1, depth+1)
                       ]
    return (math.factorial(depth)**(-1./depth))*max(projection_norms)

In [None]:
def hom_distance(stream_lhs, stream_rhs, width, depth):
    stream_lhs_inverse = torch.from_numpy(np.flip(stream_lhs.numpy(), axis=0).copy())
    sig_tensor_lhs_inverse = pyt.stream2sigtensor(stream_lhs_inverse, depth)
    sig_tensor_rhs = pyt.stream2sigtensor(stream_rhs, depth)
    prod = pyt.tensor_multiply(sig_tensor_lhs_inverse, sig_tensor_rhs, depth)
    ans = norm(prod, width, depth)
    if ans < 1e-07:
        return 0.
    return ans

## Check non-negativity, symmetry and tringular inequality

In [None]:
pyt = EsigPyTorch()

for i in range(20):
    
    width = np.random.randint(2, 8)
    depth = np.random.randint(2, 8)

    print('Example {}: width = {} --- depth = {}'.format(i, width, depth))
    
    stream1 = pyt.brownian(100, width)
    stream2 = pyt.brownian(100, width)
    stream3 = pyt.brownian(100, width)
    
    s = time.time()
    sigs1 = pyt.stream2sig(stream1, depth)
    sigs2 = pyt.stream2sig(stream2, depth)
    sigs3 = pyt.stream2sig(stream3, depth)
    t = time.time()
    
    print('sig_avg_time : {} secs'.format((t-s)/3.))
    
    s = time.time()
    a = projection_distance(sigs1, sigs2, width, depth)
    a_sym = projection_distance(sigs2, sigs1, width, depth)
    b = projection_distance(sigs1, sigs3, width, depth)
    c = projection_distance(sigs2, sigs3, width, depth)
    t = time.time()
    
    print('dist_avg_time : {} secs \n'.format((t-s)/4.))
    
    if not a >= 0.:
        print('Non-negativity not satisfied')
    if not a == a_sym:
        print('Symmetry not satisfied')
    if not a + b >= c:
        print('Triangular Inequality not satisfied')

In [None]:
pyt = EsigPyTorch()

for i in range(20):
    
    width = np.random.randint(2, 8)
    depth = np.random.randint(2, 8)

    print('Example {}: width = {} --- depth = {}'.format(i, width, depth))
    
    stream1 = pyt.brownian(100, width)
    stream2 = pyt.brownian(100, width)
    stream3 = pyt.brownian(100, width)
    
    s = time.time()
    a = hom_distance(stream1, stream2, width, depth)
    a_sym = hom_distance(stream2, stream1, width, depth)
    b = hom_distance(stream1, stream3, width, depth)
    c = hom_distance(stream2, stream3, width, depth)
    t = time.time()
    
    print('dist_avg_time : {} secs \n'.format((t-s)/4.))
    
    if not a >= 0.:
        print('Non-negativity not satisfied')
    if not np.abs(a-a_sym)<1e-7:
        print('Symmetry not satisfied: dist(a,b) = {} --- dist(b,a) = {}'.format(a, a_sym))
    if not a + b >= c:
        print('Triangular Inequality not satisfied')

## Digit pathwise-prediction and subsequent classification

In [None]:
# load streams
with open('input.pickle', 'rb') as handle:
    X = pickle.load(handle, encoding='latin1')

# load corresponding digit (optional)
with open('output.pickle', 'rb') as handle:
    y = pickle.load(handle, encoding='latin1')

In [None]:
# plot a few examples
for j in range(1, 5):
    plt.plot(np.array([i[0] for i in X[j]]), np.array([i[1] for i in X[j]]))
    plt.title('this is supposed to be a {}'.format(y[j]))
    plt.show()

In [None]:
print ('We have {} images in total \n'.format(len(X)))
print ('The first few digits have shapes: {}'.format([X[0].shape, 
                                                      X[1].shape,
                                                      X[2].shape]))

## Localised Regression

In [None]:
stop_index = 1000
data = list(zip(X[:stop_index], y[:stop_index]))
random.shuffle(data)
X, y = zip(*data)

In [None]:
# only retaining the first 80% of the information in the test set
percentage_to_retain = 0.8
X_input = [i[:int(percentage_to_retain*float(len(i))),:] for i in X]
X_missing_part = [i[int(percentage_to_retain*float(len(i))):,:] for i in X]
X_output = X

In [None]:
split = int(.95*len(X))
X_input_train = X_input[:split]
X_input_test = X_input[split:]
X_output_train = X_output[:split]
X_output_test = X_output[split:]

# y_available = np.array(y[:split], np.newaxis).astype(float)
# y_test = np.array(y[split:], np.newaxis).astype(float)

In [None]:
class LocalisedRegression:
    
    def __init__(self, X_input_train, X_output_train, k_nn=5, width=2, depth=2):  
        self.X_input_train = X_input_train
        self.X_output_train = X_output_train
        self.k_nn = k_nn
        self.width = width
        self.depth = depth
        # compute signatures of training data streams
        self.X_sigs_input, self.X_sigs_output = self.compute_signatures()
        self.pyt = EsigPyTorch()
        
    def compute_signatures(self):
        X_sigs_input = np.array([tosig.stream2sig(item, self.depth) for item in X_input_train])
        X_sigs_output = np.array([tosig.stream2sig(item, self.depth) for item in X_output_train])
        return X_sigs_input, X_sigs_output
    
    def mydist(self, x, y):
        return projection_distance(x, y, width=self.width, depth=self.depth)

    def train(self, x=None, y=None):
        pass
    
    def getKey(self, item):
        return item[2]
    
    def predict(self, x):
        
        # calculate S(x)^-1
        x_inverse = torch.from_numpy(np.flip(x, axis=0).copy())
        sig_tensor_x_inverse = pyt.stream2sigtensor(x_inverse, self.depth)
        
        # pre-multiply training data by S(x)^-1
        prod_in = np.array([pyt.tensor_multiply(sig_tensor_x_inverse, torch.tensor(np.insert(t, 0, 2.)), self.depth)[1:].numpy() for t in self.X_sigs_input])
        
        # calculate path signature
        x_sig = tosig.stream2sig(x, self.depth)
        
        # compute distances from x_sig to premultiplied training data signatures
        distances = [(x_in, x_out, self.mydist(x_sig, x_in)) for x_in, x_out in zip(prod_in, self.X_sigs_output)]
        
        # pick k-NN
        l = sorted(distances, key=self.getKey)
        X_in_local = [i[0] for i in l[:self.k_nn]]
        X_out_local = [i[1] for i in l[:self.k_nn]]
        
        # run linear regression locally on the selected k-NN
        lin_reg = LinearRegression()
        lin_reg.fit(X_in_local, X_out_local)
        
        return lin_reg.predict(np.array([x_sig]))

In [None]:
class LocalisedRegressionHomogeneous:
    
    def __init__(self, X_input_train, X_output_train, k_nn=5, width=2, depth=2):  
        self.X_input_train = X_input_train
        self.X_output_train = X_output_train
        self.k_nn = k_nn
        self.width = width
        self.depth = depth
        self.pyt = EsigPyTorch()
        # input signatures are calculated on the fly
        self.signatures_output = self.compute_signatures_output()
    
    def norm(self, sigtensor, width, depth):
        projection_norms = [(math.factorial(k) * L2(
                                                    projection(
                                                                self.pyt.tensor_log(sigtensor, depth)[1:], 
                                                                k, 
                                                                width, 
                                                                depth
                                                               )
                                                   )
                             )**(1./k)
                            for k in range(1, depth+1)
                           ]
        return (math.factorial(depth)**(-1./depth))*max(projection_norms)

    def hom_distance(self, stream_lhs, stream_rhs, width, depth):
        stream_lhs_inverse = torch.from_numpy(np.flip(stream_lhs, axis=0).copy())
        sig_tensor_lhs_inverse = self.pyt.stream2sigtensor(stream_lhs_inverse, depth)
        sig_tensor_rhs = self.pyt.stream2sigtensor(stream_rhs, depth)    
        
        self.sig_in = sig_tensor_rhs[1:]
        
        prod = self.pyt.tensor_multiply(sig_tensor_lhs_inverse, sig_tensor_rhs, depth)
        ans = self.norm(prod, self.width, self.depth)
        if ans < 1e-07:
            return 0., sig_tensor_rhs
        return ans

    def compute_signatures_output(self):
        return [self.pyt.stream2sig(torch.tensor(item), self.depth) for item in self.X_output_train]
        
    def train(self, x=None, y=None):
        pass
    
    def getKey(self, item):
        return item[0]
    
    def predict(self, x):

        # compute distances from x to training data streams
        distances = [(self.hom_distance(x, torch.tensor(x_in), self.width, self.depth), self.sig_in,
                      x_out) for x_in, x_out in zip(self.X_input_train, self.signatures_output)]
        
        # pick k-NN
        l = sorted(distances, key=self.getKey)
        X_in_local = [i[1].numpy() for i in l[:self.k_nn]]
        X_out_local = [i[2].numpy() for i in l[:self.k_nn]]
        
        # run linear regression locally on the selected k-NN
        lin_reg = LinearRegression()
        lin_reg.fit(X_in_local, X_out_local)
        
        return lin_reg.predict(np.array([tosig.stream2sig(x, self.depth)]))

In [None]:
n_predictions = 5

for k in [7, 10, 15, 20, 50]:
    for d in [2, 3, 4, 5]:
        print('\n \n Predictions for k_nn = {}, depth = {}'.format(k, d))
        model = LocalisedRegressionHomogeneous(X_input_train, X_output_train, k_nn=k, depth=d)
        predicted_sigs = [model.predict(ex) for ex in X_input_test[:n_predictions]]
        real_paths = [p for p in X_output_test[:n_predictions]]
        increments = [p[0][1:3] for p in predicted_sigs]
        starting_points = [sp[0] for sp in real_paths]
        predicted_paths = [np.append(ex, np.array([sp+inc]), axis=0) for ex, sp, inc in zip(X_input_test[:n_predictions],
                                                                                            starting_points,
                                                                                            increments)]
        for item in range(n_predictions):
            plt.plot(np.array([i[0] for i in predicted_paths[item]]), np.array([i[1] for i in predicted_paths[item]]), marker='o')
            plt.plot(np.array([i[0] for i in real_paths[item]]), np.array([i[1] for i in real_paths[item]]))
            plt.title('prediction #{} with k_nn={}, depth={}'.format(item, k, d))
            plt.show()