In [5]:
import sys, os

# Insert path to model directory,.
cwd = os.getcwd()
path = f"{cwd}/../../src"
sys.path.insert(0, path)

import numpy as np
import pandas as pd
from functools import partial

# toy datasets
from data.distribution import DataParams, Inputs

from experiments.utils import dict_product, run_parallel_step

# Plotting Procedures
import matplotlib.pyplot as plt
import seaborn as sns

sns.reset_defaults()
#sns.set_style('whitegrid')
#sns.set_context('talk')
sns.set_context(context='talk',font_scale=0.7)
%matplotlib inline

# Insert path to package,.
pysim_path = f"/home/emmanuel/code/pysim/"
sys.path.insert(0, pysim_path)

## Algorithms

In [6]:
from typing import Optional
from scipy.spatial.distance import pdist, squareform
from models.dependence import HSICModel

def scotts_factor(X: np.ndarray) -> float:
    """Scotts Method to estimate the length scale of the 
    rbf kernel.
    
        factor = n**(-1./(d+4))
    
    Parameters
    ----------
    X : np.ndarry
        Input array
    
    Returns
    -------
    factor : float
        the length scale estimated
    
    """
    n_samples, n_features = X.shape
    
    return np.power(n_samples, - 1 / (n_features + 4.))

def silvermans_factor(X: np.ndarray) -> float:
    """Silvermans method used to estimate the length scale
    of the rbf kernel.
    
    factor = (n * (d + 2) / 4.)**(-1. / (d + 4)).
    
    Parameters
    ----------
    X : np.ndarray,
        Input array
    
    Returns
    -------
    factor : float
        the length scale estimated
    """
    n_samples, n_features = X.shape
    
    base = ( n_samples * (n_features + 2.) ) / 4.
    
    return np.power(base, - 1 / (n_features + 4.))




def kth_distance(dists: np.ndarray, percent: float) -> np.ndarray:
    
    # kth distance calculation (50%)
    kth_sample = int(percent * dists.shape[0])
    
    # take the Kth neighbours of that distance
    k_dist = dists[:, kth_sample]
    
    return k_dist

def sigma_estimate(
    X: np.ndarray, 
    method: str='median', 
    percent: Optional[int]=None, 
    heuristic: bool=False
) -> float:
    
    # get the squared euclidean distances
    if method == 'silverman':
        return silvermans_factor(X)
    elif method == 'scott':
        return scotts_factor(X)
    elif percent is not None:
        kth_sample = int((percent/100) * X.shape[0])
        dists = np.sort(squareform(pdist(X, 'sqeuclidean')))[:, kth_sample]
#         print(dists.shape, dists.min(), dists.max())
    else:
        dists = np.sort(pdist(X, 'sqeuclidean'))
#         print(dists.shape, dists.min(), dists.max())
        
    
    if method == 'median':
        sigma = np.median(dists)
    elif method == 'mean':
        sigma = np.mean(dists)
    else:
        raise ValueError(f"Unrecognized distance measure: {method}")
    
    if heuristic:
        sigma = np.sqrt(sigma / 2)
    return sigma

## Experiment

So in this experiment, we will be looking at the following cases:

1. HSIC Estimator: HSIC, KA, CKA
2. A Length Scale Per Dimension or Not
3. The Sigma Estimator - Scott, Silverman, Median, Median w. Kth Distance

In [9]:
parameters = {
    'scorer': ['hsic', 'cka', 'ka'],
    'estimator': [
        ('median', 15),
        ('median', 20),
        ('median', 50),
        ('median', 80),
        ('scott',None),
        ('silverman',None),
        ('median', None),
    ],
    'per_dataset': [True],
    'per_dimension': [True, False]
}


# create a list of all param combinations
parameters_list = list(dict_product(parameters))
n_params= len(parameters_list)
print('# of Params:', n_params)

# of Params: 42


### Experimental Step

In [None]:
from typing import Dict 

def step(params: Dict, loop_param: Dict):
    
    # ================
    # DATA
    # ================    
    dist_data = DataParams(
    dataset=params['dataset'],
    trial = params['trial'],
    std = params['std'],
    nu = params['nu'],
    samples = loop_param,
    dimensions = params['dimensions'],
    )

    # generate data
    inputs = dist_data.generate_data()


    # ====================
    # Sigma Estimator
    # ====================

    # estimate sigma
    sigma_X, sigma_Y = get_sigma(
        X=inputs.X, Y=inputs.Y, 
        method=params['sigma_estimator'][0], 
        percent=params['sigma_estimator'][1], 
        per_dimension=params['per_dimension'],
        separate_scales=params['separate_scales']
    )

    # ====================
    # HSIC Model
    # ====================
    # get hsic score
    score = get_hsic(
        inputs.X, inputs.Y, 
        params['scorer'], 
        sigma_X, sigma_Y
    )

    # ====================
    # Results
    # ====================

    # append results to dataframe
    results_df = pd.DataFrame(
        {
            # Data Params
            "dataset": [params["dataset"]],
            "trial": [params["trial"]],
            "std": [params["std"]],
            "nu": [params["nu"]],
            "samples": [loop_param],
            "dimensions": [params["dimensions"]],
            # STANDARDIZE PARSM
            "standardize": [params["standardize"]],
            # SIGMA FORMAT PARAMS
            "per_dimension": [params["per_dimension"]],
            "separate_scales": [params["separate_scales"]],
            # SIGMA METHOD PARAMS
            "sigma_method": [params["sigma_estimator"][0]],
            "sigma_percent": [params["sigma_estimator"][1]],
            "sigma_X": [sigma_X],
            "sigma_Y": [sigma_Y],
            # HSIC Params
            "scorer": [params["scorer"]],
            "score": [score],
            "mutual_info": [inputs.mutual_info],
        }
    )
    return results_df