In [1]:
from typing import List, Dict, Set, Any, Optional, Tuple, Literal, Callable
import numpy as np
import torch
from torch import Tensor
import sigkernel
import os
import sys

from kernels.abstract_base import TimeSeriesKernel, StaticKernel
from kernels.static_kernels import LinearKernel, RBFKernel, PolyKernel
from kernels.integral_type import StaticIntegralKernel
from kernels.sig_pde import SigPDEKernel
from kernels.sig_trunc import TruncSigKernel

In [6]:
N=3
N2=4
T, d = 40, 5
X = torch.randn(N, T, d, dtype=torch.float64) / d
Y = torch.randn(N2, T, d, dtype=torch.float64) / d

inputs = [
    (X, X),
    (X, Y),
    (X[0], X[0]),
    (X[0], Y[0]),
    (X[0], Y),
    (X, Y[0]),
]
diag_inputs = [
    (X, X),
    (X, Y[:N]),
    (X[0], X[0]),
    (X[0], Y[0]),
]
def test_kernel(ker: TimeSeriesKernel, inputs, diag=False, normalize=False):
    print(ker)
    for X, Y in inputs:
        out = ker(X, Y, diag, normalize=normalize)
        out_normalize = ker(X, Y, diag, normalize=True)
        print(out)
        print(out_normalize)
        print(out.shape)
    print()


sigker = TruncSigKernel(static_kernel=RBFKernel(), 
                        trunc_level=4, 
                        geo_order=1,
                        only_last=True,)
test_kernel(sigker, inputs)
test_kernel(sigker, diag_inputs, True)

sigpde = SigPDEKernel(static_kernel=RBFKernel(),
                     dyadic_order=3,)
test_kernel(sigpde, inputs)
test_kernel(sigpde, diag_inputs, True)

intker = StaticIntegralKernel(static_kernel=RBFKernel())
test_kernel(intker, inputs)
test_kernel(intker, diag_inputs, True)

<kernels.sig_trunc.TruncSigKernel object at 0x7fee8979b290>
tensor([[ 2.4527e+02,  9.6429e+01,  8.9373e+01],
        [5.5335e-322,  1.6558e+02,  7.8619e+01],
        [4.9407e-324, 6.2952e-316,  2.2980e+02]], dtype=torch.float64)
tensor([[1.0000, 0.4785, 0.3765],
        [0.0000, 1.0000, 0.4030],
        [0.0000, 0.0000, 1.0000]], dtype=torch.float64)
torch.Size([3, 3])
tensor([[ 81.8982, 107.0054,  89.1904,  79.9658],
        [ 67.3619,  82.0555,  65.9581,  58.1056],
        [ 77.9504, 115.0598, 103.1768, 105.6861]], dtype=torch.float64)
tensor([[0.4036, 0.4118, 0.3985, 0.3229],
        [0.4040, 0.3843, 0.3587, 0.2856],
        [0.3968, 0.4574, 0.4762, 0.4409]], dtype=torch.float64)
torch.Size([3, 4])
tensor([[245.2682]], dtype=torch.float64)
tensor([[1.]], dtype=torch.float64)
torch.Size([1, 1])
tensor([[81.8982]], dtype=torch.float64)
tensor([[0.4036]], dtype=torch.float64)
torch.Size([1, 1])
tensor([[ 81.8982, 107.0054,  89.1904,  79.9658]], dtype=torch.float64)
tensor([[0.4036, 0.4

In [None]:
def test_kernel(kernel: TimeSeriesKernel):
    print("Testing", kernel)

    gram = kernel.gram(X, Y)
    diag = kernel.gram(X, Z, diag=True)
    batch_call = kernel(X, Z)
    call = kernel(X[0], Z[0])
    print("gram", gram.shape)
    print("diag", diag.shape)
    print("batch_call", batch_call.shape)
    print("call", call.shape, call)
    print("\n")


N1 = 5
N2 = 6
T = 7
d = 3
torch.manual_seed(0)
X = torch.rand((N1, T, d), dtype=torch.float64)
Y = torch.rand((N2, T, d), dtype=torch.float64)
Z = torch.rand((N1, T, d), dtype=torch.float64)
print("X", X.shape)
print("Y", Y.shape)
print("Z", Z.shape)

rbf = RBFKernel()
integral = IntegralKernel(rbf)
sigpde = SigPDEKernel(rbf)
test_kernel(integral)
test_kernel(sigpde)

In [None]:
import numpy as np
import ksig

# Number of signature levels to use.
n_levels = 5 

# Use the RBF kernel for vector-valued data as static (base) kernel.
static_kernel = ksig.static.kernels.RBFKernel() 

# Instantiate the signature kernel, which takes as input the static kernel.
n_levels = 5
order = 1
sig_kernel = ksig.kernels.SignatureKernel(n_levels=n_levels, order=order, static_kernel=static_kernel)

# Generate 10 sequences of length 50 with 5 channels.
n_seq, l_seq, n_feat = 10, 50, 5 
X = np.random.randn(n_seq, l_seq, n_feat)

# Sequence kernels take as input an array of sequences of ndim == 3,
# and work as a callable for computing the kernel matrix. 
K_XX = sig_kernel(X)  # K_XX has shape (10, 10).

# The diagonal kernel entries can also be computed.
K_X = sig_kernel(X, diag=True)  # K_X has shape (10,).

# Generate another array of 8 sequences of length 20 and 5 features.
n_seq2, l_seq2 = 8, 20
Y = np.random.randn(n_seq2, l_seq2, n_feat)

# Compute the kernel matrix between arrays X and Y.
K_XY = sig_kernel(X, Y)  # K_XY has shape (10, 8)
K_XY