In [15]:
from typing import List, Dict, Set, Any, Optional, Tuple, Literal, Callable
import numpy as np
import torch
from torch import Tensor
import os
import sys
import tslearn
import tslearn.metrics
import timeit
import iisignature

from abstract_base import TimeSeriesKernel, StaticKernel
from static_kernels import LinearKernel, RBFKernel, PolyKernel
from integral import StaticIntegralKernel
from sig_trunc import TruncSigKernel
from gak import GlobalAlignmentKernel, sigma_gak
from flattened_static import FlattenedStaticKernel
from reservoir import ReservoirKernel

In [4]:
#####################################
##### Volterra Reservoir Kernel #####
#####################################

def naive_gram(X, Y, tau, gamma):
    X = torch.flip(X, dims=[1])
    Y = torch.flip(Y, dims=[1])
    N, T, d = X.shape
    N2, T, d = Y.shape
    lin_ker = LinearKernel()
    state_space_gram = lin_ker(X, Y) #shape N1 N2 T
    prod = gamma**2 / (1 - tau**2 * state_space_gram)

    gram = torch.zeros(N, N2, dtype=X.dtype, device=X.device)
    for i in range(N):
        for j in range(N2):
            outer = 0
            for t in range(T):
                inner = 1
                for s in range(t+1):
                    inner *= prod[i, j, s]
                outer += inner
            gram[i, j] = 1 + outer
    return gram


def naive_gram2(X, Y, tau, gamma):
    X = torch.flip(X, dims=[1])
    Y = torch.flip(Y, dims=[1])
    N, T, d = X.shape
    N2, T, d = Y.shape
    lin_ker = LinearKernel()
    state_space_gram = lin_ker(X, Y) #shape N1 N2 T
    prod = gamma**2 / (1 - tau**2 * state_space_gram)

    gram = torch.zeros(N, N2, dtype=X.dtype, device=X.device)
    gram = gram + 1/(1-gamma**2)
    for t in range(T):
        gram = 1 + prod[:,:,T-1-t] * gram
    return gram



def volterra_reservoir_kernel_test():
    N = 3
    N2 = 2
    T = 70
    d = 2
    dtype = torch.float32
    torch.manual_seed(3)
    X = torch.randn(N, T, d, dtype=dtype).to("cuda").detach()
    Y = torch.randn(N2,T, d, dtype=dtype).to("cuda").detach()
    tau = 0.9 / torch.maximum(X.abs().max(), Y.abs().max())
    print("tau", tau)
    gamma = 0.9

    ker = ReservoirKernel(tau=tau, gamma=gamma)
    gram = ker(X, Y)
    print("gram\n", gram)
    naive = naive_gram(X, Y, tau, gamma)
    print("naive\n", naive)
    naive2 = naive_gram2(X, Y, tau, gamma)
    print("naive2\n", naive2)
volterra_reservoir_kernel_test()

tau tensor(0.2829, device='cuda:0')
gram
 tensor([[5.1026, 5.3574],
        [6.7520, 6.2966],
        [6.1606, 5.6091]], device='cuda:0')
naive
 tensor([[5.1026, 5.3574],
        [6.7520, 6.2966],
        [6.1606, 5.6091]], device='cuda:0')
naive2
 tensor([[5.1026, 5.3574],
        [6.7520, 6.2966],
        [6.1606, 5.6091]], device='cuda:0')


In [11]:


#### Test GAK ####
N= 8
N2= 20
T, d = 20, 2
torch.manual_seed(0)
X = torch.randn(N,  T, d, dtype=torch.float64).to("cuda") / d
Y = torch.randn(N2, T, d, dtype=torch.float64).to("cuda") / d

sigma = tslearn.metrics.sigma_gak(X)
gak = tslearn.metrics.cdist_gak
mine = GlobalAlignmentKernel(RBFKernel(sigma=sigma), normalize=True, max_batch=50000)

out = gak(X, X, sigma=sigma)
print(out)

out3 = mine(X, X)
print(out3)
print(torch.mean(torch.abs(out.cpu() - out3.cpu())))

def function1():
    gak(X, Y, sigma=sigma)

def function3():
    with torch.no_grad():
        mine(X, Y)

# Measure the execution time of function 1
execution_time1 = timeit.timeit(function1, number=1)
print("Execution time of function 1:", execution_time1)
# Measure the execution time of function 3
execution_time3 = timeit.timeit(function3, number=1)
print("Execution time of function 3:", execution_time3)

tensor([[1.0000, 0.5397, 0.6101, 0.4831, 0.5128, 0.6126, 0.6808, 0.5659],
        [0.5397, 1.0000, 0.6964, 0.3003, 0.5562, 0.4147, 0.5552, 0.6372],
        [0.6101, 0.6964, 1.0000, 0.4397, 0.4925, 0.4835, 0.7042, 0.7311],
        [0.4831, 0.3003, 0.4397, 1.0000, 0.4666, 0.5981, 0.6449, 0.6898],
        [0.5128, 0.5562, 0.4925, 0.4666, 1.0000, 0.5421, 0.6075, 0.6294],
        [0.6126, 0.4147, 0.4835, 0.5981, 0.5421, 1.0000, 0.7800, 0.5428],
        [0.6808, 0.5552, 0.7042, 0.6449, 0.6075, 0.7800, 1.0000, 0.7322],
        [0.5659, 0.6372, 0.7311, 0.6898, 0.6294, 0.5428, 0.7322, 1.0000]])
tensor([[1.0000, 0.5397, 0.6101, 0.4831, 0.5128, 0.6126, 0.6808, 0.5659],
        [0.5397, 1.0000, 0.6964, 0.3003, 0.5562, 0.4147, 0.5552, 0.6372],
        [0.6101, 0.6964, 1.0000, 0.4397, 0.4925, 0.4835, 0.7042, 0.7311],
        [0.4831, 0.3003, 0.4397, 1.0000, 0.4666, 0.5981, 0.6449, 0.6898],
        [0.5128, 0.5562, 0.4925, 0.4666, 1.0000, 0.5421, 0.6075, 0.6294],
        [0.6126, 0.4147, 0.4835, 0.59

In [13]:
#### test dimensions of TimeSeriesKernels ####

N=3
N2=4
T, d = 7, 5
X = torch.randn(N, T, d) / d**0.5
Y = torch.randn(N2, T, d) / d**0.5
inputs = [
    (X, X),
    (X, Y),
    (X[0], X[0]),
    (X[0], Y[0]),
    (X[0], Y),
    (X, Y[0]),
]
diag_inputs = [
    (X, X),
    (Y, Y),
    (X[:min(N,N2)], Y[:min(N,N2)]),
    (X[0], X[0]),
    (X[0], Y[0]),
]
def test_kernel(ker: TimeSeriesKernel, inputs, diag=False):
    print(ker)
    for X, Y in inputs:
        out = ker(X, Y, diag, normalize=False)
        out_normalize = ker(X, Y, diag, normalize=True)
        # print(out, "out")
        # print(out_normalize, "out, normalize")
        print(out.shape)
    print()


sigker = TruncSigKernel(static_kernel=RBFKernel(), 
                        trunc_level=6, 
                        geo_order=1,
                        only_last=False,)
test_kernel(sigker, inputs)
test_kernel(sigker, diag_inputs, True)

intker = StaticIntegralKernel(static_kernel=RBFKernel())
test_kernel(intker, inputs)
test_kernel(intker, diag_inputs, True)

gak = GlobalAlignmentKernel(static_kernel=RBFKernel(sigma=sigma_gak(X)))
test_kernel(gak, inputs)
test_kernel(gak, diag_inputs, True)

flat = FlattenedStaticKernel(static_kernel=LinearKernel())
test_kernel(flat, inputs)
test_kernel(flat, diag_inputs, True)

res = ReservoirKernel(tau= 0.1, gamma=0.9)
test_kernel(res, inputs)
test_kernel(res, diag_inputs, True)

<sig_trunc.TruncSigKernel object at 0x7f2a14227e50>
torch.Size([3, 3, 6])
torch.Size([3, 4, 6])
torch.Size([1, 1, 6])
torch.Size([1, 1, 6])
torch.Size([1, 4, 6])
torch.Size([3, 1, 6])

<sig_trunc.TruncSigKernel object at 0x7f2a14227e50>
torch.Size([3, 6])
torch.Size([4, 6])
torch.Size([3, 6])
torch.Size([1, 6])
torch.Size([1, 6])

<integral.StaticIntegralKernel object at 0x7f2a14260990>
torch.Size([3, 3])
torch.Size([3, 4])
torch.Size([1, 1])
torch.Size([1, 1])
torch.Size([1, 4])
torch.Size([3, 1])

<integral.StaticIntegralKernel object at 0x7f2a14260990>
torch.Size([3])
torch.Size([4])
torch.Size([3])
torch.Size([1])
torch.Size([1])

<gak.GlobalAlignmentKernel object at 0x7f2a1423bed0>
torch.Size([3, 3])
torch.Size([3, 4])
torch.Size([1, 1])
torch.Size([1, 1])
torch.Size([1, 4])
torch.Size([3, 1])

<gak.GlobalAlignmentKernel object at 0x7f2a1423bed0>
torch.Size([3])
torch.Size([4])
torch.Size([3])
torch.Size([1])
torch.Size([1])

<flattened_static.FlattenedStaticKernel object at 0x7f2

In [14]:
#Test that iisig gives the same result as mine

# Number of signature levels to use.
normalize=False
trunc_level = 5
geo_order = 5
N=2
N2= 2
T, d = 20, 2
torch.manual_seed(0)
X = torch.randn(N, T, d, dtype=torch.float64).to("cuda") / d
Y = torch.randn(N2, T, d, dtype=torch.float64).to("cuda") / d
X_np = X.cpu().numpy()
Y_np = Y.cpu().numpy()

mine = TruncSigKernel(LinearKernel(scale=1), 
                      normalize=normalize, 
                      trunc_level=trunc_level, 
                      geo_order=geo_order, 
                      max_batch=50000)

#test
out2 = mine(X, Y)
featuresX = iisignature.sig(X_np, trunc_level)
featuresY = iisignature.sig(Y_np, trunc_level)
out3 = 1+np.dot(featuresX, featuresY.T)
print("\nmine", out2)
print("\niisig", out3)
print(np.mean(np.abs(out2.cpu().numpy() - out3)))


mine tensor([[ 0.3805, -0.1749],
        [ 1.0562,  1.1173]], device='cuda:0', dtype=torch.float64)

iisig [[ 0.38050659 -0.17487614]
 [ 1.05622317  1.1172864 ]]
4.0245584642661925e-15
