# Benchmarking model-based design of experiment approaches with a pharmaceutical crystallisation emulator

**Installing the packages**

In [None]:
from obsidian import Campaign, ParamSpace, Target
from obsidian.parameters import Param_Categorical, Param_Ordinal, Param_Continuous, Param_Discrete
import pandas as pd
import io
import numpy as np
from pymoo.indicators.hv import HV
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error
from joblib import load
from joblib import dump

**Define parameters and targets for optimisation**

In [None]:
params = [
    Param_Continuous('Initial_temperature', 34, 40),
    Param_Continuous('Seed_load', 0, 2.625),
    Param_Ordinal('Seed_mean', ['46.94', '55.94']),
    Param_Continuous('Final_temperature', 10, 25),
    Param_Continuous('Cooling_rate', 0.1, 0.5),
    Param_Continuous('AS_end_frac', 0.45, 0.9),
    Param_Continuous('AS_rate_mL_min', 3, 10)
    ]

param_column_names = [p.name for p in params]

X_space = ParamSpace(params)

target = [
    Target('d90', aim='min'),
    Target('Yield', aim='max'),
]

target_column_names = [t.name for t in target]

campaign = Campaign(X_space, target)

**Loading the simulated data**

In [None]:
def load_data():
    # Read the dataset from the Excel file
    data = pd.read_excel(r"file_location.xlsx")
    return data

**Random forest model**

This is specific to the optimisation problem and therefore needs to be run whenever inputs and outputs are changed.

In [None]:
def preprocess_data(data):
    input_features = param_column_names
    output_features = target_column_names

    X = data[input_features].values
    y = data[output_features].values

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    return X_train, X_test, y_train, y_test, scaler

def build_model():
    model = RandomForestRegressor(n_estimators=100, random_state=42)
    return model

def main():
    data = load_data()
    X_train, X_test, y_train, y_test, scaler = preprocess_data(data)

    model = build_model()
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    mae = mean_absolute_error(y_test, y_pred)
    print(f"Test MSE: {mse}, Test MAE: {mae}")

    return model, scaler

if __name__ == "__main__":
    model, scaler = main()
    dump(model, 'model.joblib')
    dump(scaler, 'scaler.joblib')

**Model-based design of experiment - Bayesian optimisation**

In [None]:
Z0_list = []

num_exp_list = [5, 10, 15, 20]
design_methods = ['LHS', 'Random', 'Sobol']
acquisition_functions = ['Mean', 'RS', 'SF', 'EHVI', 'NEHVI', 'NParEGO']

model = load('model.joblib')
scaler = load('scaler.joblib')


for num_exp in num_exp_list:
    for design_method in design_methods:
        # Initial Screening
        X0 = campaign.designer.initialize(num_exp, design_method)

        new_X = X0[param_column_names].values
        new_X_scaled = scaler.transform(new_X)

        predictions = model.predict(new_X_scaled)
        predictions_df = pd.DataFrame(predictions, columns=target_column_names, index=X0.index)

        Z0_initial = pd.concat([X0, predictions_df], axis=1)
        Z0_initial['Method'] = design_method

        # BO initialisation
        X0 = Z0_initial[param_column_names]
        Y0 = Z0_initial[target_column_names]

        normalized_Y0 = (Y0 - Y0.min()) / (Y0.max() - Y0.min())
        normalized_Z0 = pd.concat([X0, normalized_Y0], axis=1)
        
        campaign.add_data(normalized_Z0)
        campaign.fit()

        for acquisition_function in acquisition_functions:
            Z0 = Z0_initial.copy()

            for _ in range(5):
                X_suggest, eval_suggest = campaign.suggest(acquisition=[acquisition_function])

                new_X = X_suggest[param_column_names].values
                new_X_scaled = scaler.transform(new_X)
                predictions = model.predict(new_X_scaled)
                predictions_df = pd.DataFrame(predictions, columns=target_column_names, index=X_suggest.index)

                new_rows = pd.concat([X_suggest, predictions_df], axis=1)
                new_rows['Method'] = acquisition_function

                Z0 = pd.concat([Z0, new_rows], axis=0)

                X_new = new_rows[param_column_names]
                Y_new = new_rows[target_column_names]
                normalized_Y_new = (Y_new - Y0.min()) / (Y0.max() - Y0.min())  # normalize using original Y0 stats
                normalized_Z_new = pd.concat([X_new, normalized_Y_new], axis=1)

                campaign.add_data(normalized_Z_new)
                campaign.fit()

            Z0_list.append(Z0)


In [None]:
Z0_list[71]

**Hypervolume metric**

In [None]:
for Z0 in Z0_list:

    Y0 = Z0[target_column_names]

    #This is manual upper and lower bounds for d90 and yield
    new_rows = pd.DataFrame([
        [78, 17],
        [174, 99]
    ], columns=target_column_names)

    Y0 = pd.concat([Y0, new_rows], ignore_index=True)
    
    normalized_Y0 = (Y0 - Y0.min()) / (Y0.max() - Y0.min())

    normalized_Y0 = normalized_Y0.iloc[:-2]
    
    for t in target:
        if t.aim == 'max':
            normalized_Y0[t.name] = -normalized_Y0[t.name]  # Invert so we minimize all
    
    solutions = normalized_Y0.to_numpy()
    
    reference_point = [1.1 if t.aim == 'min' else -0.1 for t in target]
    
    hv = HV(ref_point=reference_point)
    
    hypervolume_values = []
    for i in range(1, len(solutions) + 1):
        subset = solutions[:i]
        hypervolume_values.append(hv.do(subset))
    
    Z0['Hypervolume'] = hypervolume_values

In [None]:
Z0_list[4]

**Saving the data as csv files**

In [None]:
for i, df in enumerate(Z0_list):
    first_method = df['Method'].iloc[0]
    last_method = df['Method'].iloc[-1]
    exp = len(df) - 5

    filename = f"df_{exp}_{first_method}_{last_method}_campaign5.csv"
    df.to_csv(filename, index=False)

**Figures**

In [None]:
df = Z0_list[71]
df

In [None]:
x = np.arange(1, len(df) + 1)
y = df[["Hypervolume"]].to_numpy().ravel()
plt.plot(x, y)
plt.ylabel("Hypervolume", fontsize=12)
plt.xlabel("Number of experiments", fontsize=12)
plt.xticks(x)
plt.grid(True)
plt.show()