In [1]:
import numpy as np
import plotly.express as px
import matplotlib.pyplot as plt
import plotly.graph_objects as go

from typing import List, Dict, Union, Any, Tuple, Callable, Optional, Set
import torch
from torch import Tensor
import sys
import os

from experiments.utils import print_nameshape, print_tensor

# Generate random vectors

In [2]:
###################################################################
############## functions to generate random vectors ############### #currently only supports mean zero
###################################################################

def gen_multivariate_normal(batch:int, d:int, n:int, device:str, cov_seed=99):
    # Create a separate RandomState instance
    rng = np.random.RandomState(cov_seed)

    # Use this instance to generate the covariance matrix
    cov = rng.normal(size = (d,d))
    cov = cov @ cov.T

    #now use normal np random module
    corpus = np.random.multivariate_normal(np.zeros(d), cov, size=(batch, n+1))
    corpus = torch.from_numpy(corpus).to(device)
    return corpus[:, :-1], corpus[:, -1:], torch.from_numpy(cov).to(device)


def gen_uniform_cumsum(batch:int, d:int, n:int, device:str, cov_seed=99):
    corpus = np.random.uniform(-1, 1, size=(batch, n+1, d))
    corpus = torch.from_numpy(corpus).to(device).cumsum(dim=-1)

    # approximate covariance matrix
    sample = np.random.uniform(-1, 1, size=(100000, d))
    sample = torch.from_numpy(sample).to(device).cumsum(dim=-1)
    cov = torch.cov(sample)
    return corpus[:, :-1], corpus[:, -1:], cov

# Calculate NN distances

* Conformance
* Limiting iid NN

In [3]:
@torch.jit.script
def calc_conformance_nn(corpus:Tensor, new_sample:Tensor, n:int):
    with torch.no_grad():
        #invert empirical Cov matrix
        U, S, Vt = torch.svd(corpus)
        XtX_sqrtinv = Vt @ torch.diag_embed(S**-1) @ Vt.permute(0, 2, 1)
        differences = corpus - new_sample
        transformed = differences @ XtX_sqrtinv
        nn = n**0.5 * torch.norm(transformed, dim=-1)
        nn, _ = torch.min(nn, dim=-1)
        return nn
    

def conformance(
        N_simu:int = 100000,
        d:int = 2,   #dimension of gaussian vector
        n:int = 50,  #corpus size
        device = "cuda",
        distribution_gen_fun:Callable[[int, int, str], Tuple[Tensor, Tensor, np.ndarray]] = None,
        batch_size = 10000,
    ):
    mc = []
    batch_size = min(batch_size, N_simu)
    for i in range(N_simu//batch_size):
        #generate normals
        corpus, new_sample, _ = distribution_gen_fun(batch_size, d, n, device)
        mc.append(calc_conformance_nn(corpus, new_sample, n))
    mc = torch.concatenate(mc).cpu().numpy()
    mc = np.sort(mc)
    return mc



def limiting_iid_nn(
        N_simu:int = 100000,
        d:int = 2,   #dimension of gaussian vector
        n:int = 50,  #corpus size
        device = "cuda",
        distribution_gen_fun:Callable[[int, int, str], Tuple[Tensor, Tensor, np.ndarray]] = None,
    ):
    with torch.no_grad():
        iids, new_samples, cov = distribution_gen_fun(N_simu, d, n, device)
        U, S, Vt = torch.svd(cov)
        cov_sqrt_inv = U @ torch.diag(S**-0.5) @ Vt.T
        normalized = torch.einsum('ij,bnj->bni', cov_sqrt_inv, iids-new_samples)
        diff = torch.norm(normalized, dim=-1)
        nndist, _ = torch.min(diff, dim=-1)
        nndist = nndist.cpu().numpy()
        return np.sort(nndist)



def general_d_simu(
        distribution_gen_fun:Callable[[int, int, str], Tuple[Tensor, Tensor, np.ndarray]],
        MC_corpus:int   = 10000,
        MC_limiting:int = 10000,
        d:int = 2,   #dimension of gaussian vector
        n:int = 100,  #corpus size
        device = "cuda",
        batch_size = 1000,
        seed:int = 0,
        extra_to_title:str = "",
    ):
    """Compares conformance score to the limiting distribution.

    Args:
        MC_corpus (int): Number of MC simulations for conformance.
        MC_limiting (int): Number of MC simulations for limiting distribution.
        d (int): Dimension of random vector.
        n (int): Corpus size, i.e. NNs are taken from this corpus.
        seed (int): Random seed.
        device (str): PyTorch device to run on.
        batch_size (int): Batch size for MC simulation.
        distribution_gen_fun (Callable[[int, int, str], Tuple[Tensor, Tensor, np.ndarray]]): 
            Function to generate the distribution. 
            Takes in batch, d, n, device and returns two Tensors of shape 
            (batch, n, d) and (batch, 1, d), corresponding to iid corpus and a new sample,
            and the covariance matrix.
    """
    # set seed
    torch.manual_seed(seed)
    np.random.seed(seed)

    # get sorted monte carlo distances
    mc = conformance(MC_corpus, d, n, device, distribution_gen_fun, batch_size)
    limiting = limiting_iid_nn(MC_limiting, d, n, device, distribution_gen_fun)

    #plot
    q = 0.99
    mc = mc[mc<np.quantile(mc, q)]
    limiting = limiting[limiting<np.quantile(limiting, q)]


    fig2 = go.Figure()
    fig2.add_trace(go.Scatter(x=mc, y=np.linspace(0, 1, len(mc)), mode='lines', name='Conformance'))
    fig2.add_trace(go.Scatter(x=limiting, y=np.linspace(0, 1, len(limiting)), mode='lines', name='Limiting iid NN'))
    fig2.update_layout(autosize=False, width=700, height=500, title=f"n = {n}, d = {d}" + extra_to_title)
    fig2.show()

    fig = go.Figure()
    fig.add_trace(go.Histogram(x=mc, name="Conformance", marker=dict(opacity=0.5), histnorm="probability density"))
    fig.add_trace(go.Histogram(x=limiting, name="Limiting iid NN", marker=dict(opacity=0.5), histnorm="probability density"))
    fig.update_layout(barmode='overlay', title=f"n = {n}, d = {d}" + extra_to_title)
    fig.show()

# Multivariate Gaussian experiments

In [4]:
general_d_simu(
    gen_multivariate_normal,
    d=3,
    n=10,
    extra_to_title = ", Gaussian"
    )


general_d_simu(
    gen_multivariate_normal,
    d=3,
    n=100,
    extra_to_title = ", Gaussian"
    )

# Uniform Cumsum variables

In [5]:
general_d_simu(
    gen_multivariate_normal,
    d=5,
    n=10,
    extra_to_title = ", Uniform Cumsum"
    )

general_d_simu(
    gen_multivariate_normal,
    d=5,
    n=100,
    extra_to_title = ", Uniform Cumsum"
    )