## ESAT K Estimation Metrics


#### Code Imports

In [1]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots

from esat.data.datahandler import DataHandler
from esat.model.batch_sa import BatchSA
from esat.data.analysis import ModelAnalysis, BatchAnalysis
from esat_eval.simulator import Simulator
from esat.estimator import FactorEstimator

#### Synthetic Dataset

Generate synthetic input (V) and uncertainty (U) datasets for model analysis. V and U are generated in the following sequence:

1.	Feature profiles are defined and/or randomly generated (H); if the latter, for each feature, a random number of factors between 1 and K are chosen as sources for that feature. For each contributing factor, a random contribution (uniform value between 0 and 1) is assigned. If one or more predefined factor profiles (a row of H) are provided by the user, they are assigned to H in order of occurrence and overwrite the corresponding randomly generated row of H.
2.	Sample concentrations are defined and/or randomly generated (W); if the latter, each cell of W is set to a random uniform number between 0 and contribution_max
3.	V1 is calculated as the product W x H
4.	A noise matrix (N) is created by selecting values from a normal distribution with a randomly selected mean noise (uniform distribution between noise_mean_min and noise_mean_max) for each feature, and standard deviation = noise_scale. The randomly selected mean noise for a feature has a 50% chance to be multiplied by -1 to allow for the reduction of values in V1. Then the Hadamard product (element-wise matrix multiplication) of V1 and N is used to calculate V: V1 + V1◦N -> V
5.  Outliers are added to V if outliers=True. A number of elements in V (a proportion = outlier_p) are randomly selected and each one has a 50% chance to become V*outlier_mag, and a 50% chance to become V/outlier_mag

6.	An uncertainty matrix (U1) is created by selecting values from a normal distribution with a randomly selected mean uncertainty (uniform distribution between uncertainty_mean_min and uncertainty_mean_max) for each feature, and standard deviation = uncertainty_scale. Then the Hadamard product of V and U1 is used to calculate U: V◦U1 -> U

In [2]:
# Synethic dataset parameter value ranges
syn_factors_min = 3
syn_factors_max = 8

syn_features_min = 5
syn_features_max = 100

syn_samples_min = 20
syn_samples_max = 2000

outliers = True
outliers_p_min = 0.05
outliers_p_max = 0.1
outliers_mag_min = 1.1
outliers_mag_max = 2

noise_mean_min = 0.1
noise_mean_max = 0.2
noise_scale = 0.05

uncertainty_mean_min = 0.1
uncertainty_mean_max = 0.2
uncertainty_scale = 0.05

contr_curve_min_range = [0.0, 1.0]
contr_curve_max_range = [2.0, 5.0]
contr_curve_scale_range = [0.1, 0.5]

random_seed = 1

In [3]:
rng = np.random.default_rng(seed=random_seed)

In [4]:
# Initialize the simulator with the above parameters
def generate_synthetic_data(true_factor):
    n_features = rng.integers(low=syn_features_min, high=syn_features_max, size=1)[0]
    n_samples = rng.integers(low=syn_samples_min, high=syn_samples_max, size=1)[0]
    i_outlier_p = round(rng.uniform(low=outliers_p_min, high=outliers_p_max, size=1)[0], 2)
    i_outlier_mag = round(rng.uniform(low=outliers_mag_min, high=outliers_mag_max, size=1)[0], 2)
    contribution_max = round(rng.uniform(low=1.0, high=10.0, size=1)[0], 2)
    print(f"True Factors: {true_factor}, Features: {n_features}, Samples: {n_samples}, Outliers %: {i_outlier_p}, Outliers Magnitude: {i_outlier_mag}, Contribution Max: {contribution_max}")
    simulator = Simulator(seed=rng.integers(low=0, high=1e10, size=1)[0],
                          factors_n=true_factor,
                          features_n=n_features,
                          samples_n=n_samples,
                          outliers=outliers,
                          outlier_p=i_outlier_p,
                          outlier_mag=i_outlier_mag,
                          contribution_max=contribution_max,
                          noise_mean_min=noise_mean_min,
                          noise_mean_max=noise_mean_max,
                          noise_scale=noise_scale,
                          uncertainty_mean_min=uncertainty_mean_min,
                          uncertainty_mean_max=uncertainty_mean_max,
                          uncertainty_scale=uncertainty_scale,
                          verbose=False
                         )
    curved_factors_count = rng.integers(low=0, high=true_factor, size=1)[0]
    # print(f"Factors Curve Update Count: {curved_factors_count}")
    curved_factor_list = rng.choice(list(range(true_factor)), size=curved_factors_count, replace=False)
    # print(f"Updating factors: {curved_factor_list} curve type")
    for c_i in curved_factor_list:
        # parameters not used by the curve type are ignored
        i_curve_type = rng.choice(['uniform', 'decreasing', 'increasing', 'logistic', 'periodic'], size=1)[0]
        i_curve_min = rng.uniform(low=contr_curve_min_range[0], high=contr_curve_min_range[1], size=1)[0]
        i_curve_max = rng.uniform(low=contr_curve_max_range[0], high=contr_curve_max_range[1], size=1)[0]
        i_curve_scale = rng.uniform(low=contr_curve_scale_range[0], high=contr_curve_scale_range[1], size=1)[0]
        i_curve_frequency = rng.uniform(low=0.1, high=0.9, size=1)[0]

        # To keep all as uniform comment out the line below
        simulator.update_contribution(factor_i=c_i, curve_type=i_curve_type, scale=i_curve_scale, frequency=i_curve_frequency, minimum=i_curve_min, maximum=i_curve_max)
    
    syn_input_df, syn_uncertainty_df = simulator.get_data()
    data_handler = DataHandler.load_dataframe(input_df=syn_input_df, uncertainty_df=syn_uncertainty_df)
    data_handler.metrics
    V, U = data_handler.get_data()
    return V, U

def run_estimation(k, eV, eU, e_samples: int = 100, min_factors: int = 2, max_factors: int = 12, max_iterations: int = 5000, converge_delta: float = 1.0, converge_n: int = 20):
    run_samples_n = (max_factors - min_factors) * e_samples
    factor_est = FactorEstimator(V=eV, U=eU)
    results = factor_est.run(samples=run_samples_n, min_factors=min_factors, max_factors=max_factors, max_iterations=max_iterations, converge_delta=converge_delta, converge_n=converge_n)
    
    results["Overall Score"] = (results["Factors"] * results["Delta Ratio"] * results["Q(True)"].min()) / results["Q(True)"]
    # Add rank for each metric
    drordered_list = results.sort_values("Delta Ratio", ascending=False).reset_index()
    drrank = drordered_list.loc[drordered_list["Factors"]==k].index[0]

    kordered_list = results.sort_values("K Estimate", ascending=False).reset_index()
    krank = kordered_list.loc[kordered_list["Factors"]==k].index[0]

    os_ordered_list = results.sort_values("Overall Score", ascending=False).reset_index()
    os_rank = os_ordered_list.loc[os_ordered_list["Factors"]==k].index[0]
    
    estimation = {
        "true K": k,
        "delta ratio": np.nanargmax(results["Delta Ratio"].values) + min_factors,
        "K estimate": np.nanargmax(results["K Estimate"].values) + min_factors,
        "Overall Score": np.nanargmax(results["Overall Score"].values) + min_factors,
        "DR Rank": drrank,
        "K Rank": krank,
        "OS Rank": os_rank,
        "features": eV.shape[1],
        "samples": eV.shape[0]
    }
    return estimation, results
    

In [5]:
# Random test
# true_factor = rng.integers(low=syn_factors_min, high=syn_factors_max, size=1)[0]
# i_V, i_U = generate_synthetic_data(true_factor=true_factor)

In [6]:
%%time
# estimation, results = run_estimation(k=true_factor, eV=i_V, eU=i_U, e_samples=10, min_factors=min_factors, max_factors=max_factors)
# print(estimation)
# results

CPU times: total: 0 ns
Wall time: 0 ns


In [None]:
# Sampling parameters
test_n = 2

# Each iteration
samples = 10
min_factors = 2
max_factors = 12


est_list = []
results_list = []
for i in range(test_n):
    i_factor = rng.integers(low=syn_factors_min, high=syn_factors_max, size=1)[0]
    i_V, i_U = generate_synthetic_data(true_factor=i_factor)
    i_est, i_result = run_estimation(k=i_factor, eV=i_V, eU=i_U, e_samples=samples, min_factors=min_factors, max_factors=max_factors, max_iterations=1000, converge_delta=1.0, converge_n=20)
    est_list.append(i_est)
    results_list.append(i_result)

True Factors: 5, Features: 53, Samples: 1515, Outliers %: 0.06, Outliers Magnitude: 1.95, Contribution Max: 3.81


Rapid random sampling for factor estimation: 100%|███████████████████████████████| 100/100 [03:13<00:00,  1.94s/it]
04-Sep-24 14:37:22 - Estimated factor count: 2


True Factors: 7, Features: 73, Samples: 338, Outliers %: 0.1, Outliers Magnitude: 1.56, Contribution Max: 2.04


Rapid random sampling for factor estimation:  97%|███████████████████████████████ | 97/100 [00:31<00:00,  9.39it/s]

In [None]:
def accuracy(occurences, results):
    accuracy_list = [0]*len(occurences)
    for i in range(len(occurences)):
        if occurences[i] == 0:
            accuracy_list[i] = 0
        else:
            accuracy_list[i] = round(results[i]/occurences[i],2)
    return np.multiply(100, accuracy_list)


dr_results = [0]*(max_factors-min_factors)
kest_results = [0]*(max_factors-min_factors)
score_results = [0]*(max_factors-min_factors)
k_runs = [0]*(max_factors-min_factors)
for _result in est_list:
    idx_k = _result['true K'] - min_factors
    true_k = _result['true K']
    k_runs[idx_k] += 1
    if _result['delta ratio'] == true_k:
        dr_results[idx_k] += 1
    if _result['K estimate'] == true_k:
        kest_results[idx_k] += 1
    if _result['Overall Score'] == true_k:
        score_results[idx_k] += 1

labels = [f"{i + min_factors} Factor(s)" for i in range(max_factors-min_factors+1)]

result_fig = make_subplots(specs=[[{"secondary_y": True}]])
result_fig.add_trace(go.Bar(name="Delta Ratio", x=labels, y=accuracy(k_runs, dr_results)), secondary_y=False)
result_fig.add_trace(go.Bar(name="K Estimate", x=labels, y=accuracy(k_runs, kest_results)), secondary_y=False)
result_fig.add_trace(go.Bar(name="Overal Score", x=labels, y=accuracy(k_runs, score_results)), secondary_y=False)
result_fig.add_trace(go.Scatter(name="K Runs", x=labels, y=k_runs, mode='markers'), secondary_y=True)

result_fig.update_layout(title="K Estimation Randomized Results", barmode='group', height=600, width=1200)
result_fig.update_yaxes(title_text="Accuracy (%)", range=[0, 100.0], secondary_y=False)
result_fig.update_yaxes(title_text="Run Count", secondary_y=True)
result_fig.show()

In [None]:
print(f"Total Runs: {np.sum(k_runs)}")
print(f"Delta Ratio Accuracy: {100*np.sum(dr_results)/np.sum(k_runs)}%")
print(f"K Estimate Accuracy: {100*np.sum(kest_results)/np.sum(k_runs)}%")
print(f"Overall Score Accuracy: {100*np.sum(score_results)/np.sum(k_runs)}%")

In [None]:
est_list

In [None]:
results_list