In [None]:
import json
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from math import exp
from math import factorial
import numpy as np
from numpy import sign
import pickle
import seaborn as sns
import signatory
from signatory import logsignature_channels
from signatory import signature_channels
import sys
import torch

sys.path.insert(1, '../src/')

In [None]:
from azran_ghahramani import delta_k_t
from azran_ghahramani import get_eigengap_values
from azran_ghahramani import get_maximal_eigengap_information
from azran_ghahramani import K_prototypes
from azran_ghahramani import make_distances
from azran_ghahramani import make_W_D_from_distances
from azran_ghahramani import maximal_eigengap
from azran_ghahramani import multiscale_k_prototypes_from_W_D
from azran_ghahramani import star_shaped_init
from azran_ghahramani import n_clusters_best_revealed
from kernels import make_laplacian_kernel
from metrics import make_mmd_metric
from metrics import euclid_distance
from metrics import average_euclidean_distance
from similarities import make_gaussian_similarity
from utils import make_colours

In [None]:
REGIMES = [
    {'mean': 0.001, 'std_dev': 0.008},
    {'mean': 0.001, 'std_dev': 0.003},
    {'mean': -0.001, 'std_dev': 0.008},
    {'mean': -0.001, 'std_dev': 0.003},
    {'mean': 0, 'std_dev': 0.008}
]

In [None]:
# Load precomputed data - already comes equipped with signature terms but
# computation is still demonstrated in this script
with open('../data/synthetic_data.pkl', 'rb') as fp:
    data = pickle.load(fp)

In [None]:
def get_signature_cutoffs(channels, depth):
    return tuple(signature_channels(
        channels,
        depth = d
    ) for d in range(1, depth+1))

In [None]:
def get_logsignature_cutoffs(channels, depth):
    '''
    Returns the number of terms up to and including each depth
    Returns the number of terms at each depth in a logsignature vector for
    a given number of input channels
    '''
    return tuple(logsignature_channels(
        channels,
        depth = d
    ) for d in range(1, depth+1))

In [None]:
SIGNATURE_DEPTH = 3

In [None]:
def scale_vector(orig, min_vec, range_vec, skipped_indices=None):
    assert len(orig) == len(min_vec) == len(range_vec)
    
    scaled_indices = range(len(orig))
    if skipped_indices is not None:
        scaled_indices = [n for n in range(len(orig)) if n not in skipped_indices]
        
    scaled_vec = []
    for n in range(len(orig)):
        if n in scaled_indices:
            entry = orig[n]
            scaled_entry = (entry - min_vec[n]) / range_vec[n]
            scaled_vec.append(scaled_entry)
        else:
            scaled_vec.append(0)
        
    return scaled_vec

### Scale signature terms

In [None]:
get_signature_cutoffs(channels=2, depth=SIGNATURE_DEPTH)

In [None]:
def root_signature_vector(vec, cutoffs, depth):
    scaled_vec = vec[:cutoffs[0]]
    
    for level in range(2, depth+1):
        previous_cutoff = cutoffs[level-2]
        level_cutoff = cutoffs[level-1]
        terms = vec[previous_cutoff:level_cutoff]
        scaled_terms = [sign(el) * (abs(el) ** (1/level)) for el in terms]
        scaled_vec += scaled_terms
        
    return scaled_vec

In [None]:
depth = SIGNATURE_DEPTH
cutoffs = get_signature_cutoffs(channels=2, depth=depth)

for el in data:
    signatures = el['signatures']
    rooted_signatures = [
        root_signature_vector(sig, cutoffs, depth)
        for sig in signatures
    ]
    el['rooted_signatures'] = rooted_signatures

In [None]:
def scale_key(data, key):
    full_collection = [val.copy() for el in data for val in el[key]]
    max_vector = [
        max(el[n] for el in full_collection)
        for n in range(len(full_collection[0]))
    ]
    min_vector = [
        min(el[n] for el in full_collection)
        for n in range(len(full_collection[0]))
    ]
    range_vector = [
        max_el - min_el 
        for max_el, min_el in zip(max_vector, min_vector)
    ]
    skipped_indices = [
        idx
        for idx, el in enumerate(range_vector)
        if np.isclose(el, 0)
    ]
    
    for idx, el in enumerate(data):
        values = el[key]
        scaled_values = [
            scale_vector(val, min_vector, range_vector, skipped_indices)
            for val in values
        ]
        data[idx][f'scaled_{key}'] = scaled_values

In [None]:
scale_key(data, 'signatures')
scale_key(data, 'logsignatures')
scale_key(data, 'rooted_signatures')

In [None]:
regime_reps = {}
for el in data:
    regime_number = el['regime_number']
    if regime_number not in regime_reps:
        regime_reps[regime_number] = el.copy()

In [None]:
regime_paths = {n: [] for n in range(len(REGIMES))}
for el in data:
    regime_number = el['regime_number']
    paths = el['points']
    regime_paths[regime_number] += paths

In [None]:
regime_colours = [
    'blue',
    'green',
    'black',
    'purple',
    'brown'
]

In [None]:
sns.set()

In [None]:
PATHS_PER_ELEMENT = 5

fig, ax = plt.subplots()

for el in data:
    regime_number = el['regime_number']
    regime_colour = regime_colours[regime_number]
    paths = el['points']
    for path in paths[:PATHS_PER_ELEMENT]:
        x_vals = [x[0] for x in path]
        y_vals = [x[1] for x in path]
        ax.plot(x_vals, y_vals, color=regime_colour, lw=0.15)

plt.title('Simulated Brownian Motion Paths')
plt.ylabel('Price')
plt.xlabel('Time')

legend_lines = [
    Line2D([0], [0], color=regime_colours[n], lw=2)
    for n in range(len(REGIMES))
]
legend_labels = [f'Regime {n+1}' for n in range(len(REGIMES))]

plt.legend(legend_lines, legend_labels)
plt.show()

Investigate the effect of changing sigma in the resulting eigengap separation plot

In [None]:
def make_wrapped_metric(metric, key):
    def f(info_dict_0, info_dict_1):
        return metric(info_dict_0[key], info_dict_1[key])
    return f

In [None]:
def make_gaussian_similarity_from_percentile(nonzero_distances, percentile):
    sigma = np.percentile(nonzero_distances, percentile)
    return make_gaussian_similarity(sigma)

In [None]:
def get_spectrum(matr):
    evalues, evectors = np.linalg.eig(matr)
    evectors = evectors.T
    evectors = np.array([evectors[n] for n in range(len(evectors))])

    # Sort eigenvalues from largest to smallest; update eigenvectors
    idx = evalues.argsort()[::-1]
    evalues = evalues[idx]
    evectors = evectors[idx]
    
    return evalues, evectors

In [None]:
kernel = make_laplacian_kernel(sigma=0.1)
mmd = make_mmd_metric(kernel, kernel_repeated_arg_value=1)
metric = make_wrapped_metric(mmd, 'scaled_rooted_signatures')

In [None]:
distances = make_distances(data, metric)
flat_distances = [el for ls in distances for el in ls]
largest_distance = max(flat_distances)

In [None]:
fig, ax = plt.subplots()

similarity_sigma = 0.05
similarity = make_gaussian_similarity(similarity_sigma)
self_similarity = similarity(largest_distance)

W, D = make_W_D_from_distances(
    len(data),
    similarity,
    distances,
    self_similarity
)

transition_matrix = np.linalg.inv(D).dot(W)

evalues, evectors = get_spectrum(transition_matrix)

t_values_first_segment = list(range(1, 1000))
t_values_second_segment = np.logspace(start=3, stop=13, num=1000).tolist()
t_values = t_values_first_segment + t_values_second_segment

eigengap_values = get_eigengap_values(t_values, evalues)    
max_egap_vals, max_attained = get_maximal_eigengap_information(eigengap_values)

colours = make_colours(len(max_attained))
colours[2] = 'red'

for idx, cluster in enumerate(max_attained):
    cluster_colour = colours[idx]

    cluster_egap_separation_values = list(eigengap_values[cluster].values())
    plt.plot(t_values, cluster_egap_separation_values, color=cluster_colour)
    maxima_value = max_attained[cluster]['suitability']
    maxima_location = max_attained[cluster]['n_steps']
    ax.axvline(
        x = maxima_location,
        color = cluster_colour,
        linestyle = '--',
        label = f'{cluster} Clusters Maxima'
    )

plt.xscale('log')
plt.ylim(0, 1.2)
plt.legend()
plt.xlabel('Steps')
plt.ylabel('Eigengap Separation')
plt.title('Maximal Eigengap Separation for Number of Steps')

output = multiscale_k_prototypes_from_W_D(data, W, D)
print(json.dumps(output, indent=2))

### Plot partition

In [None]:
def get_partition(results, n_clusters):
    for el in results:
        if el['n_clusters'] == n_clusters:
            return el['partition']
        
    return None

In [None]:
N_PATHS_PER_ELEMENT = 5

In [None]:
cluster_colours = [
    'darkblue',
    'grey',
    'darkgreen',
    'brown',
    'orange'
]

In [None]:
fig, ax = plt.subplots()

partition = get_partition(output, n_clusters=5)

for cluster_idx, cluster in enumerate(partition):
    cluster_col = regime_colours[cluster_idx]
    
    for idx in cluster:     
        plotted_paths = data[idx]['points'][:N_PATHS_PER_ELEMENT]
        for path in plotted_paths:
            x_vals = [x[0] for x in path]
            y_vals = [x[1] for x in path]
            ax.plot(x_vals, y_vals, lw=0.1, color=cluster_col)
            
plt.xlabel('Time')
plt.ylabel('Price')
plt.title('5 Clusters - Separation 0.94')
plt.show()

In [None]:
fig, ax = plt.subplots()

partition = get_partition(output, n_clusters=2)

for cluster_idx, cluster in enumerate(partition):
    cluster_col = regime_colours[cluster_idx]
    
    for idx in cluster:     
        plotted_paths = data[idx]['points'][:N_PATHS_PER_ELEMENT]
        for path in plotted_paths:
            x_vals = [x[0] for x in path]
            y_vals = [x[1] for x in path]
            ax.plot(x_vals, y_vals, lw=0.1, color=cluster_col)
            
plt.xlabel('Time')
plt.ylabel('Price')
plt.title('2 Clusters - Separation 0.73')
plt.show()