In [1]:
import numpy as np

from numbers import Number, Integral
from typing import Optional, Union, List, Tuple

ArrayOnCPU = np.ndarray

In [4]:
def check_positive_value(scalar : Number, name : str) -> Number:
    if scalar <= 0:
        raise ValueError(f'The parameter \'{name}\' should have a positive value.')
    return scalar

def matrix_mult(X : ArrayOnCPU, Y : Optional[ArrayOnCPU] = None, transpose_X : bool = False, transpose_Y : bool = False) -> ArrayOnCPU:
    subscript_X = '...ji' if transpose_X else '...ij'
    subscript_Y = '...kj' if transpose_Y else '...jk'
    return np.einsum(f'{subscript_X},{subscript_Y}->...ik', X, Y if Y is not None else X)

def squared_norm(X : ArrayOnCPU, axis : int = -1) -> ArrayOnCPU:
    return np.sum(np.square(X), axis=axis)

In [10]:
from abc import ABCMeta, abstractmethod
from sklearn.base import BaseEstimator

class Kernel(BaseEstimator, metaclass=ABCMeta):
    """Base class for Kernels.

    Warning: This class should not be used directly.
    Use derived classes instead.
    """

    def fit(self, X : ArrayOnCPU, y : Optional[ArrayOnCPU] = None):
        return self

    @abstractmethod
    def _K(self, X : ArrayOnCPU, Y : Optional[ArrayOnCPU] = None) -> ArrayOnCPU:
        pass

    @abstractmethod
    def _Kdiag(self, X : ArrayOnCPU) -> ArrayOnCPU:
        pass

    def __call__(self, X : ArrayOnCPU, Y : Optional[ArrayOnCPU] = None, diag : bool = False, return_on_gpu : bool = False) -> ArrayOnCPU:
        if diag:
            K = self._Kdiag(X)
        else:
            K =  self._K(X, Y)
        return K

# ----------------------------------------------------------------------------------------------------------------------------------------------------------------

class LinearKernel(Kernel):
    """Class for linear (static) kernel."""

    def __init__(self, sigma : float = 1.0) -> None:
        self.sigma = check_positive_value(sigma, 'sigma')

    def _K(self, X : ArrayOnCPU, Y : Optional[ArrayOnCPU] = None) -> ArrayOnCPU:
        return self.sigma**2 * matrix_mult(X, Y, transpose_Y=True)

    def _Kdiag(self, X : ArrayOnCPU) -> ArrayOnCPU:
        return self.sigma**2 * squared_norm(X, axis=-1)

In [6]:
def multi_cumsum(M : ArrayOnCPU, exclusive : bool = False, axis : int = -1) -> ArrayOnCPU:
    """Computes the exclusive cumulative sum along a given set of axes.

    Args:
        K (np.ndarray): A matrix over which to compute the cumulative sum
        axis (int or iterable, optional): An axis or a collection of them. Defaults to -1 (the last axis).
    """

    ndim = M.ndim
    axis = [axis] if np.isscalar(axis) else axis
    axis = [ndim+ax if ax < 0 else ax for ax in axis]

    # create slice for exclusive cumsum (slice off last element along given axis then pre-pad with zeros)
    if exclusive:
        slices = tuple(slice(-1) if ax in axis else slice(None) for ax in range(ndim))
        M = M[slices]

    # compute actual cumsums
    for ax in axis:
        M = np.cumsum(M, axis=ax)

    # pre-pad with zeros along the given axis if exclusive cumsum
    if exclusive:
        pads = tuple((1, 0) if ax in axis else (0, 0) for ax in range(ndim))
        M = np.pad(M, pads)

    return M

In [7]:
def signature_kern_higher_order(M : ArrayOnCPU, n_levels : int, order : int, difference : bool = True, return_levels : bool = False) -> ArrayOnCPU:
    """
    Computes the signature kernel matrix with higher-order embedding into the tensor algebra.
    """

    if difference:
        M = np.diff(np.diff(M, axis=1), axis=-1)

    if M.ndim == 4:
        n_X, n_Y = M.shape[0], M.shape[2]
        K = np.ones((n_X, n_Y), dtype=M.dtype)
    else:
        n_X = M.shape[0]
        K = np.ones((n_X,), dtype=M.dtype)

    if return_levels:
        K = [K, np.sum(M, axis=(1, -1))]
    else:
        K += np.sum(M, axis=(1, -1))

    R = np.copy(M)[None, None, ...]
    for i in range(1, n_levels):
        d = min(i+1, order)
        R_next = np.empty((d, d) + M.shape, dtype=M.dtype)
        R_next[0, 0] = M * multi_cumsum(np.sum(R, axis=(0, 1)), exclusive=True, axis=(1, -1))
        for r in range(1, d):
            R_next[0, r] = 1./(r+1) * M * multi_cumsum(np.sum(R[:, r-1], axis=0), exclusive=True, axis=1)
            R_next[r, 0] = 1./(r+1) * M * multi_cumsum(np.sum(R[r-1, :], axis=0), exclusive=True, axis=-1)
            for s in range(1, d):
                R_next[r, s] = 1./((r+1)*(s+1)) * M * R[r-1, s-1]
        R = R_next
        if return_levels:
            K.append(np.sum(R, axis=(0, 1, 3, -1)))
        else:
            K += np.sum(R, axis=(0, 1, 3, -1))

    return np.stack(K, axis=0) if return_levels else K

In [8]:
# simulate geometric Brownian motion paths
n_paths = 4
n_steps = 50 - 1

mu_x = 0.1
sigma_x = 0.2
mu_y = 0.2
sigma_y = 0.2
dt = 0.01
X = np.exp((mu_x - 0.5 * sigma_x**2) * dt + sigma_x * np.sqrt(dt) * np.random.randn(n_paths, n_steps))
Y = np.exp((mu_y - 0.5 * sigma_y**2) * dt + sigma_y * np.sqrt(dt) * np.random.randn(n_paths, n_steps))
X = np.cumprod(X, axis=1)
Y = np.cumprod(Y, axis=1)
X = np.concatenate([np.ones((n_paths, 1)), X], axis=1)
Y = np.concatenate([np.ones((n_paths, 1)), Y], axis=1)
X = X[..., np.newaxis]
Y = Y[..., np.newaxis]
X.shape, Y.shape

((4, 50, 1), (4, 50, 1))

In [11]:
static_kernel = LinearKernel()

# M = static_kernel(X, diag=True, return_on_gpu=True)

M_X = static_kernel(X.reshape((-1, X.shape[-1])), return_on_gpu=True).reshape((X.shape[0], X.shape[1], X.shape[0], X.shape[1]))
M_Y = static_kernel(Y.reshape((-1, Y.shape[-1])), return_on_gpu=True).reshape((Y.shape[0], Y.shape[1], Y.shape[0], Y.shape[1]))
M_XY = static_kernel(X.reshape((-1, X.shape[-1])), Y.reshape((-1, Y.shape[-1])), return_on_gpu=True).reshape((X.shape[0], X.shape[1], Y.shape[0], Y.shape[1]))
M_X.shape, M_Y.shape, M_XY.shape

((4, 50, 4, 50), (4, 50, 4, 50), (4, 50, 4, 50))

In [12]:
n_levels = 5
order = 5
sig_levels = signature_kern_higher_order(M_XY, n_levels, order, return_levels=True)
sig = signature_kern_higher_order(M_XY, n_levels, order, return_levels=False)
sig_levels.shape, sig.shape

((6, 4, 4), (4, 4))

In [14]:
def is_symmetric(X):
    if X.ndim == 2:
        return np.allclose(X, X.T)
    else:
        raise ValueError('X must be a 2D matrix')

is_symmetric(sig)

False

In [15]:
sig

array([[0.94603477, 0.98041265, 0.91455012, 1.00187657],
       [1.01778198, 1.00637977, 1.02846249, 0.99939313],
       [1.06701221, 1.02385792, 1.10803374, 0.99774129],
       [0.98546588, 0.99475847, 0.97684788, 1.00050018]])

In [16]:
norms = signature_kern_higher_order(M_Y, n_levels, order, return_levels=True)
norms.shape

(6, 4, 4)

In [18]:
for i in range(n_levels + 1):
    print(np.sqrt(norms[i].diagonal()))


[1. 1. 1. 1.]
[0.25855752 0.09302766 0.41276635 0.00886461]
[3.34259965e-02 4.32707289e-03 8.51880279e-02 3.92906649e-05]
[2.88084762e-03 1.34179157e-04 1.17209170e-02 1.16098816e-07]
[1.86216207e-04 3.12059329e-06 1.20950002e-03 2.57299626e-10]
[9.62952024e-06 5.80602992e-08 9.98481804e-05 4.57947170e-13]


In [19]:
np.allclose(sig_levels.sum(axis=0), sig)

True