In [1]:
import sigpde.torch as sig
import torch
import math
device = torch.device('cuda:0')

In [2]:
def sample(n, rho=1, scale=8, T=0.5, start=0, stop=1, dt=0.005, randgrid = 30, thin=0, device=torch.device('cuda:0'), dtype=torch.float64):
        l = math.ceil((stop - start) / dt) + 1
        
        ts = torch.linspace(0, 1, l, device=device, dtype=dtype)
        
        b1 = math.sqrt(2 / T) * torch.sin(4 * torch.pi * ts / T)
        b2 = math.sqrt(2 / T) * torch.cos(6 * torch.pi * ts / T)
        c = torch.randn((n, 3), dtype=dtype, device=device)
       
        b1 = (b1 * c[:, 1].view(n, 1))
        b2 = (b2 * c[:, 2].view(n, 1))
        
        x = (b1 + b2 + c[:, 0].view(n, 1))
       
        c1 = (0.75 - 0.25) * torch.rand((n, 2), device=device, dtype=dtype) + 0.25
        
        i1 = (ts - c1[:,0].view(n, 1, 1)).view(n, -1)**2
        i2 = (ts - c1[:,1].view(n, 1, 1)).view(n, -1)**2
        
        zero_column = torch.zeros(n, 1, device=device, dtype=dtype)
        x1 = torch.cat((zero_column, (x[:,range(l-1)] * dt).cumsum(dim=1)), dim=1) * i2
        x2 = torch.cat((zero_column, (x[:,range(l-1)] * i1[:,range(l-1)] * dt).cumsum(dim=1)), dim=1)
        
        y = rho * (x2 - x1) * scale
        y = y
        x = x
        
        return x.view(n, l, 1)[:, range(0, l, 2**thin),:], y.view(n, l, 1)[:, range(0, l, 2**thin),:]

In [3]:
kernel = sig.SigPDE(sig.kernels.LinearKernel(), 3)
kernel2 = sig.RobustSigPDE(sig.kernels.LinearKernel(), 3)

In [4]:
def generate(batch_size, length, dimension, device = torch.device('cpu')):
  random_walks = torch.randn(batch_size, length, dimension, dtype = torch.double, device = device) / math.sqrt(length)
  start = torch.zeros([batch_size, 1, dimension], device=device, dtype=torch.double)
  random_walks = torch.cat((start, random_walks), dim=1)
  random_walks = torch.cumsum(random_walks, dim=1)
  return random_walks

def add_time(x, start=0, stop=1):
    device = x.device
    dtype = x.dtype

    l = x.shape[1]

    t = torch.linspace(start, stop, l, device=device, dtype=dtype)
    t = t.unsqueeze(0).unsqueeze(-1)
    return torch.cat((x, t.expand(x.shape[0], x.shape[1], 1)), dim=-1)

def std_norm(x):
    return 2 - 1 / (1 + x.log())

def time_norm(x, time=True):
    if time:
        x = add_time(x)
    return x / x.pow(2).sum(dim=2).sqrt().max(dim=1).values.view(x.shape[0], 1, 1)

In [22]:
x = generate(30, 512, 50, device=device)
y = generate(30, 512, 50, device=device)
z, w = sample(30)
z = time_norm(z)
w = time_norm(w)

In [26]:
kernel = sig.SigPDE(sig.kernels.LinearKernel(), 1)
kernel2 = sig.RobustSigPDE(sig.kernels.LinearKernel(), 1)
kernel.gram(x)

tensor([[18.3334,  1.4010,  0.4858,  0.7846,  0.8163,  1.2738,  1.1459,  0.7409,
          1.6848,  0.9033,  1.2657,  0.9222,  0.8955,  0.6676,  0.8486,  1.2017,
          0.8765,  0.5164,  0.2662,  0.8098,  0.9262,  1.1080,  0.6455,  1.0314,
          1.1974,  0.9531,  0.9834,  0.5894,  0.5482,  1.5845],
        [ 1.4010, 10.2463,  0.7121,  0.5283,  1.2533,  0.7406,  0.8341,  1.2035,
          1.1352,  1.4728,  1.2093,  1.1496,  0.9734,  0.5533,  1.1498,  1.3396,
          0.7594,  0.9307,  0.9004,  1.4961,  0.7981,  1.2657,  0.7407,  0.9759,
          1.0922,  0.4141,  1.3639,  0.7858,  0.6421,  0.7769],
        [ 0.4858,  0.7121,  8.2434,  1.4694,  1.2537,  1.3057,  1.3570,  1.2500,
          0.9336,  1.1269,  0.9046,  0.7820,  1.2033,  0.9409,  1.0051,  1.6239,
          1.2043,  0.9183,  1.2218,  1.4642,  1.1458,  1.1853,  1.5663,  0.3853,
          0.7628,  1.2174,  0.9155,  1.3894,  1.4826,  0.9912],
        [ 0.7846,  0.5283,  1.4694, 16.6253,  0.1763,  1.0711,  1.7845,  1.0807

In [21]:
kernel.pairwise(y[0].view(1, -1, 50), y[1].view(1, -1, 50))

tensor([8.0374], device='cuda:0', dtype=torch.float64)

In [14]:
kernel2.normalization(x)

tensor([0.1089, 0.1120, 0.1146, 0.1253, 0.1281, 0.1172, 0.1295, 0.1109, 0.1185,
        0.1186, 0.1094, 0.1211, 0.1047, 0.1156, 0.1309, 0.1050, 0.1092, 0.1263,
        0.1119, 0.1309, 0.1103, 0.1071, 0.1167, 0.1058, 0.1179, 0.1187, 0.1244,
        0.1144, 0.1223, 0.1219], device='cuda:0', dtype=torch.float64)

In [124]:
z, w = sample(1)
z = add_time(z)
c = std_norm(kernel.pairwise(z))
ff = lambda s : kernel.pairwise(z, x_scale=s) - c

In [125]:
def bisection_single_eval(f, a, b, tol=1e-6, max_iter=100):
    fa = f(a)
    fb = f(b)
    k = 0
    if fa * fb >= 0:
        raise ValueError("Function values at the endpoints must have opposite signs.")
    
    for _ in range(max_iter):
        k += 1
        # Compute midpoint
        c = (a + b) / 2
        fc = f(c)

        # Update the interval and reuse function evaluations
        if fa * fc < 0:
            b, fb = c, fc  # Update the right endpoint
        else:
            a, fa = c, fc  # Update the left endpoint
            
        # Stopping criterion based on function value
        if abs(fc) < tol:
            print(k)
            return a, b

    raise RuntimeError("Maximum number of iterations reached without convergence.")

In [126]:
def itp_single_eval(f, a=0, b=1,tol=1e-6, max_iter=100):
    fa = f(a)
    fb = f(b)
    k = 0

    if fa * fb >= 0:
        raise ValueError("Function values at the endpoints must have opposite signs.")
    
    for _ in range(max_iter):
        k += 1
        # Midpoint and interpolation calculation
        mid = (a + b) / 2
        interp = (a * fb - b * fa) / (fb - fa)
        t = 0.2  # Blending parameter (adjust if necessary)
        x = (1 - t) * mid + t * interp

        fx = f(x)

        # Update the interval and reuse function evaluations
        if fx * fa < 0:
            b, fb = x, fx  # Update the right endpoint
        else:
            a, fa = x, fx  # Update the left endpoint
            
        if abs(fx) < tol:  # Stopping based on function value
            print(k)
            return a, b
            
    return (a + b) / 2, k

In [127]:
def secant_single_eval(f, x0, x1, tol=1e-6, max_iter=100):
    f0 = f(x0)
    f1 = f(x1)
    k = 0

    if abs(f0) < tol:
        return x0
    if abs(f1) < tol:
        return x1

    for _ in range(max_iter):
        k += 1
        # Compute the next point using the secant formula
        x2 = x1 - f1 * (x1 - x0) / (f1 - f0)
        f2 = f(x2)

        # Check stopping criteria
        if abs(f2) < tol:
            return x2, f2, k

        # Update for the next iteration
        x0, f0 = x1, f1
        x1, f1 = x2, f2

In [128]:
def itp_optimized(f, a, b, tol, max_iter, alpha=0.2, beta=1.0):
    # Initial evaluations
    f_a = f(a)
    f_b = f(b)
    k = 0
    if f_a * f_b > 0:
        raise ValueError("Function must have opposite signs at endpoints a and b.")

    for iteration in range(max_iter):
        k += 1
        # Compute midpoint and its function value (reuse evaluations when possible)
        m = (a + b) / 2
        f_m = f(m) if iteration == 0 else f_m  # First iteration requires f(m)

        # Interpolation
        delta = beta * (b - a) / 2
        g = m - (1 if f_m > 0 else -1) * min(abs(f_m) * delta, delta)

        # Truncation
        t = m + (g - m) * min(1, alpha * (b - a))

        # Evaluate f(t) only once
        f_t = f(t)

        # Update bounds and reuse evaluations
        if f_a * f_t < 0:
            b, f_b = t, f_t
        else:
            a, f_a = t, f_t
            
        # Check stopping criteria
        if f_t == 0 or abs(f_t) < tol:
            print(k)
            return a, b

        # Update f_m for reuse in the next iteration
        f_m = f_m if t == m else f(t)


In [129]:
def newton_raphson(f, x0, c, tol=1e-6, max_iter=100):
    k = 0
    
    for _ in range(max_iter):
        k += 1
        fx, dfx = f(x0)
        
        fx = fx - c
        
        if abs(fx) < tol:
            return x0, fx, k
        x0 = x0 - fx / dfx

In [146]:
z, w = sample(1)
z = add_time(z)
c = std_norm(kernel.pairwise(z))
ff = lambda s : kernel.pairwise(z, x_scale=s) - c

In [133]:
c

tensor([1.9723], device='cuda:0', dtype=torch.float64)

In [147]:
a = torch.tensor([0], device=x.device, dtype=x.dtype)
b = torch.tensor([1], device=x.device, dtype=x.dtype)
a, b = bisection_single_eval(ff, a=a, b=b, tol=0.01)
b_secant, val_secant, k_secant = secant_single_eval(ff, a, b, tol=1e-7, max_iter=200)
print(b_secant)
print(val_secant)
print(k_secant)

10
tensor([0.3542], device='cuda:0', dtype=torch.float64)
tensor([-4.9490e-09], device='cuda:0', dtype=torch.float64)
2


In [135]:
a = torch.tensor([0], device=x.device, dtype=x.dtype)
b = torch.tensor([1], device=x.device, dtype=x.dtype)
a, b = bisection_single_eval(ff, a=a, b=b, tol=0.01, max_iter=100)
b_secant, val_secant, k_secant = secant_single_eval(ff, a, b, tol=1e-7, max_iter=200)
print(b_secant)
print(val_secant)
print(k_secant)

12
tensor([0.5222], device='cuda:0', dtype=torch.float64)
tensor([-2.7587e-08], device='cuda:0', dtype=torch.float64)
2


In [134]:
a = torch.tensor([0], device=x.device, dtype=x.dtype)
b = torch.tensor([1], device=x.device, dtype=x.dtype)
a, b = itp_single_eval(ff, a=a, b=b, tol=0.01)
#a, b = itp_optimized(ff, a=a, b=b, tol=0.01, max_iter=100)
df = lambda s : kernel2.pairwise_norm(x, s)
b_nr, s_nr, k_nr = newton_raphson(df, (a + b) / 2, c, tol=1e-7, max_iter=200)
print(b_nr)
print(s_nr)
print(k_nr)

10


RuntimeError: Boolean value of Tensor with more than one value is ambiguous

In [68]:
print(kernel.pairwise(x, x_scale = b_secant) - c)
print(kernel.pairwise(x, x_scale = b_nr) - c)

tensor([1.7421e-10], device='cuda:0', dtype=torch.float64)
tensor([-6.3408e-11], device='cuda:0', dtype=torch.float64)
