# Imports

In [None]:
import json
from pathlib import Path

# Go up from notebooks/ â†’ code/
PROJECT_ROOT = Path.cwd().parent

CONFIG_PATH = PROJECT_ROOT / "param_config.json"
with open(CONFIG_PATH, "r") as f:
    param_config = json.load(f)

In [None]:
import sys, os
sys.path.append(os.path.abspath(".."))

import pandas as pd

import source_files.ExperimentalFunctions as EF
import source_files.UtilityFunctions as UF
import source_files.ConvergenceTesting as CT
import source_files.InitializationFunctions as IF

In [None]:
UF.set_FH_prior_config(param_config)

# Running the Code

In [None]:
import numpy as np

np_RNG_seed = param_config["general_parameters"]["np_RNG_seed"]

if isinstance(np_RNG_seed, int):
    np.random.seed(np_RNG_seed)

## Initialization

In [None]:
raw_measurement_df = pd.read_excel(r"../../data/raw_measurements.xlsx")

cloudiness_params = param_config["experimental_parameters"]["cloudiness_params"]
a_value_for_noise = param_config["general_parameters"]["a_value_noise"]
noise_floor = param_config["general_parameters"]["floor_noise"]

raw_measurement_df[["cloudiness", "variance_single"]] = raw_measurement_df.apply(
    lambda row: pd.Series(EF.get_ind_cloudiness_score_and_variance(row, cloudiness_params, noise_floor, a_value_for_noise)),
    axis=1
)


df_clean = EF.collapse_all_measurements(raw_measurement_df, noise_floor = noise_floor)

xT_array = df_clean[["x", "T"]].to_numpy()
measurement_array = df_clean["measurement"].to_numpy()
variance_array = df_clean["variance"].to_numpy()

In [None]:
extra_retrain = True #post-initialization, train the GP immediately using the provided data

#final HPs from campaign provided below as initialization example
sample_init_hyperparameters = np.array([0.0643, 0.119, 2.629, 0.0217, -8.219])

FH_HPs = param_config["general_parameters"]["hyperparameters"]
FH_HP_names = FH_HPs.keys()

#boundaries for each of the hyperparameters, useful for training
hyperparameter_bounds = [FH_HPs[k]["bounds"] for k in FH_HP_names]


non_FH_theory_HP_init = sample_init_hyperparameters[:3]
FH_theory_HP_init = sample_init_hyperparameters[3:]

base_HP_lims = hyperparameter_bounds[:3]
FH_theory_HP_lims = hyperparameter_bounds[3:]


my_GP_opt, parameter_bounds, hps_bounds, hps_log = IF.Initialization_HITL(
    simulation_flag = False,
    default_variance = False, #True or False
    scale_vars = a_value_for_noise,
    min_noise = noise_floor,
    base_HP_guess = non_FH_theory_HP_init, #[5e-1, 1e-2, 1e0]
    base_HP_lims = base_HP_lims,
    FH_HP_guess = FH_theory_HP_init,
    FH_HP_lims = FH_theory_HP_lims,
    experimental_initial_points = xT_array,
    experimental_initial_measurements = measurement_array,
    experimental_initial_variances = variance_array,
    retrain_now = extra_retrain, 
    specified_kernel = IF.custom_kernel, 
)

In [None]:
print("Current Optimizer Data:")
print("--"*20)
print("--"*20)

for key, val in my_GP_opt.get_data().items():
    print(f"{key}: {val}")

## Convergence Testing

In [None]:
generate_new_points_CTAnalysis = True
current_converged_index = None
current_converged_bool = False
converged_index = None

convergence_threshold = param_config["experimental_parameters"]["boundary_convergence_testing"]["boundary_convergence_threshold"]
convergence_window = param_config["experimental_parameters"]["boundary_convergence_testing"]["boundary_convergence_window"] #min number of iterations required for convergence (manually-set)
N_window = param_config["experimental_parameters"]["boundary_convergence_testing"]["boundary_log_smoothing_window"]
test_gauss_blur = param_config["experimental_parameters"]["boundary_convergence_testing"]["boundary_gaussian_blur"]

resolution_boundary_gen = param_config["experimental_parameters"]["boundary_convergence_testing"]["boundary_point_gen_resolution"]

parameter_bounds = [
    param_config["general_parameters"]["parameter_bounds"]["composition"],
    param_config["general_parameters"]["parameter_bounds"]["temperature_C"]
]



try:
    logged_boundary_list = EF.get_all_boundary_locations(
        parameter_bounds = parameter_bounds, 
        n_grid_plotting = resolution_boundary_gen, 
        logging_filepath = desired_logging_filepath,
        
    )
    
    if extra_retrain == True:
        _, _, _, current_phase_boundary = CT.phase_bound_gen(
            my_GP_opt, 
            parameter_bounds, 
            resolution_boundary_gen, 
            test_gauss_blur
        )
        logged_boundary_list.append(current_phase_boundary)
    else: 
        pass
    
    dist_bound_conv_log = CT.compute_boundary_distances(logged_boundary_list)
    smoothed_dist_bound_conv_log = CT.smooth_series(dist_bound_conv_log, N_window)
    
    current_converged_bool, current_converged_index = CT.has_converged_flat(
        smoothed_dist_bound_conv_log,
        window = convergence_window,
        flat_thresh = convergence_threshold
    )
    
    if current_converged_bool == True:
        generate_new_points_CTFAnalysis = False
        print(f"BO Process converged at iteration {current_converged_index}")
    
    else:
        print("Continuing to recommend points. BO process hasn't converged yet.")
except:
    if current_converged_bool == True:
        generate_new_points_CTFAnalysis = False
        print(f"BO Process converged at iteration {current_converged_index}")
    
    else:
        print("Continuing to recommend points. BO process hasn't converged yet.")

## New Point Generation (as necessary)

In [None]:
if generate_new_points_CTAnalysis == True:
    prec_comp = param_config["experimental_parameters"]["new_point_generation"]["rounding_precision"]["composition"]
    prec_temp = param_config["experimental_parameters"]["new_point_generation"]["rounding_precision"]["temperature_C"]
    n_acq = param_config["experimental_parameters"]["new_point_generation"]["N_acquisition_points"]
    n_per_iter = param_config["general_parameters"]["n_points_per_iteration"]
    
    next_points, next_evals, exclusion_log, diagnostic_log = UF.get_next_point_s(
        my_GP_opt, 
        prec_comp, 
        prec_temp, 
        parameter_bounds, 
        n_acq = n_acq, 
        n_points_per_stack=n_per_iter
    )

In [None]:
if generate_new_points_CTAnalysis == True:
    for i, pt in enumerate(next_points):
        print(f"Point {(i+1)}/{5}: {pt} (eval = {next_evals[i]:.10f})")
    
    print('--'*20)
    print('--'*20)
    
    next_volfracs_experimental = EF.next_volfracs([pt[0] for pt in next_points])
    
    sample_vol = param_config["experimental_parameters"]["new_point_generation"]["sample_volume_uL"] #uL made per sample
    vol_dec_places = param_config["experimental_parameters"]["new_point_generation"]["volume_rounding_precision"]
    
    next_component_volumes_experimental = EF.next_component_volumes(
        next_volfracs_experimental, 
        sample_vol = sample_vol, 
        vol_dec_places = vol_dec_places
    )

    next_PMMA_vols = next_component_volumes_experimental['PMMA']
    next_SAN_vols = next_component_volumes_experimental['SAN']
    
    for i in range(len(next_PMMA_vols)):
        print(f"Sample {i}// PMMA vol frac: {round(next_volfracs_experimental[i],4)}, PMMA Volume: {round(next_PMMA_vols[i], vol_dec_places)}uL, SAN Volume: {round(next_SAN_vols[i], vol_dec_places)}uL")

    print('--'*20)
    print('--'*20)

    next_temp = next_points[0][1]
    time_new_anneal = UF.time_temperature_correction(next_temp)
    str_time_new_anneal = UF.time_disp(time_new_anneal)
    
    print(f"New Temp: {next_temp} C (Set = {next_temp + 2} C)\nAnnealing Time: {str_time_new_anneal}")
    

In [None]:
if generate_new_points_CTAnalysis == True:
    _, _, _, current_phase_boundary = CT.phase_bound_gen(
        my_GP_opt, 
        parameter_bounds, 
        resolution_boundary_gen, 
        gauss_blur = test_gauss_blur
    )

In [None]:
log_to_excel = False

if log_to_excel == True:
    desired_logging_filepath = "HITL_diagnostics_log.xslx"
    
    curr_diagnostic_name = EF.log_to_excel_new_sheet(
        diagnostic_log, 
        curr_data, 
        my_GP_opt, 
        current_phase_boundary, 
        generating_new_points = generate_new_points_CTFAnalysis,
        filename = desired_logging_filepath
    )

    print(f"Current training & parameter selection saved at {curr_diagnostic_name} sheet in BO Logger Excel Sheet")