# Kernel two sample test
Kernel two sample test to determine if samples are drawn from the same distribution using Maximum Mean Discrepancy (mmd) statistic

In [None]:
# switch to the project directory
%cd ../..
# working directory should be ../pdi

In [None]:
import sys
import os
import pandas as pd

module_path = os.path.abspath('src')

if module_path not in sys.path:
    sys.path.append(module_path)


In [None]:
import pandas as pd

data_period1 = pd.read_csv('csv_filepath_1', sep=",", index_col=0)
data_period2 = pd.read_csv('csv_filepath_2', sep=",", index_col=0)

In [None]:
print(data_period1.shape)
print(data_period2.shape)

In [None]:
import numpy as np
from sklearn import metrics

def mmd(samples, gamma=None, alpha=0.05):
    """Compute unbiased MMD^2 estimate between two samples, with a hypothesis test threshold.
    
    Args:
        X -- sample from distribution P
        Y -- sample from distribution Q
        gamma (float, optional) -- RBF kernel parameter (if None, use median heuristic)
        alpha (float optional) -- significanece level for the hypothesis test (default: 0.05)
    
    Returns:
        md_squared (float)
        threshold (float) -- threshold for the null hypothesis at significance level `alpha`.
        If mmd_squared > threshold, reject the null hypothesis (that samples are taken from the same distribution).
    """
    X, Y = samples 

    if X is None or Y is None:
        return None, None

    m = len(X)
    assert len(Y) == m, "X and Y must have the same number of samples"
    
    if gamma is None:
        pairwise_dist = metrics.euclidean_distances(np.vstack([X, Y]), squared=True)
        gamma = 1.0 / np.median(pairwise_dist[np.triu_indices_from(pairwise_dist, k=1)])
    
    Kxx = metrics.pairwise.rbf_kernel(X, gamma=gamma)
    Kyy = metrics.pairwise.rbf_kernel(Y, gamma=gamma)
    Kxy = metrics.pairwise.rbf_kernel(X, Y, gamma=gamma)

    np.fill_diagonal(Kxx, 0)
    np.fill_diagonal(Kyy, 0)

    term1 = Kxx + Kyy - Kxy - Kxy.T
    mmd_squared = np.sum(term1) / (m * (m - 1))
    K = 1   # rbf kernel upper bound
    threshold = (4 * K / np.sqrt(m)) * np.sqrt(np.log(1 / alpha))
    
    return mmd_squared, threshold

In [None]:
# Get a dictionary of sets for different combinations of missing data
def prepare_sets(data):

    data_sets = {
        "no_TOF_TRD": data[data["fTOFSignal"].isna() & data["fTRDPattern"].isna()].drop(columns=["fTOFSignal", "fBeta", "fTRDPattern", "fTRDSignal"]),
        "no_TRD": data[data["fTRDPattern"].isna() & data["fTOFSignal"].notna()].drop(columns=["fTRDPattern", "fTRDSignal"]),
        "no_TOF": data[data["fTOFSignal"].isna() & data["fTRDPattern"].notna()].drop(columns=["fTOFSignal", "fBeta"]),
        "no_missing": data[data["fTRDPattern"].notna() & data["fTOFSignal"].notna()]
    } 
    return data_sets

In [None]:
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings("ignore", 
                       message="X has feature names, but.*was fitted without feature names")

def get_samples(data_period1, data_period2, size=10000):
    """ Get samples from two data taking periods
    Args:
        data_period1
        data_period2,
        size (float) -- desired sample size, default: 10000

    Returns:
        X_sample
        Y_sample
    """
    if len(data_period1) == 0 or len(data_period2) == 0:
            return None, None

    data_period1 = data_period1.dropna()
    data_period2 = data_period2.dropna()

    X = data_period1
    Y = data_period2

    scaler = StandardScaler().fit(np.vstack([X, Y]))
    X_std, Y_std = scaler.transform(X), scaler.transform(Y)

    indices = np.random.choice(X_std.shape[0], size=min(size, min(len(X_std), len(Y_std))), replace=False)
    X_sample = X_std[indices]

    indices = np.random.choice(Y_std.shape[0], size=min(size, min(len(X_std), len(Y_std))), replace=False)
    Y_sample = Y_std[indices]

    return X_sample, Y_sample


In [None]:
def evaluate_mmd(data_sets_1, data_sets_2, sample_size = 10000, alpha = 0.05, gamma=None):
    results = {
        "no_TOF_TRD": mmd(get_samples(data_sets_1["no_TOF_TRD"], data_sets_2["no_TOF_TRD"], sample_size), gamma, alpha),
        "no_TOF": mmd(get_samples(data_sets_1["no_TOF"], data_sets_2["no_TOF"], sample_size), gamma, alpha),
        "no_TRD": mmd(get_samples(data_sets_1["no_TRD"], data_sets_2["no_TRD"], sample_size), gamma, alpha),
        "no_missing": mmd(get_samples(data_sets_1["no_missing"], data_sets_2["no_missing"], sample_size), gamma, alpha)
    }

    for name, result in results.items():
        print(f"MMD for missing {name}, null hypothesis {'REJECTED' if result[0] > result[1] else 'not rejected'} (MMD score: {result[0]:.2g}, threshold: {result[1]:.2g})")

In [None]:
# Drop extra columns if comparing experimental vs simulated
if data_period1.shape[1] == 21 and data_period2.shape[1] == 19:
    data_period1 = data_period1.drop(columns = ["fIsPhysicalPrimary", "fPdgCode"])
elif data_period1.shape[1] == 19 and data_period2.shape[1] == 21:
    data_period2 = data_period2.drop(columns = ["fIsPhysicalPrimary", "fPdgCode"])

In [None]:
data_sets_1 = prepare_sets(data_period1)
data_sets_2 = prepare_sets(data_period2)

evaluate_mmd(data_sets_1, data_sets_2)

### Separate tests for different particle types

**Run only if testing simulated vs simulated distributions**

In [None]:
from pdi.data.constants import TARGET_COLUMN
from pdi.constants import TARGET_CODES, PARTICLES_DICT

features = data_period1.drop(columns=[TARGET_COLUMN]).columns
sample_size = 10000

for code in TARGET_CODES:
    particle_data_1 = data_period1[data_period1[TARGET_COLUMN] == code]
    particle_data_2 = data_period2[data_period2[TARGET_COLUMN] == code]

    part_sets_1 = prepare_sets(particle_data_1)
    part_sets_2 = prepare_sets(particle_data_2)

    print("MMD for", PARTICLES_DICT[code], "distribution: ")
    evaluate_mmd(part_sets_1, part_sets_2)
    print()