In [0]:
import os
import re
import requests
import json
import pandas as pd
import numpy as np
import random
import mlflow
from sklearn.metrics import roc_auc_score, accuracy_score, cohen_kappa_score

dbutils.widgets.text("processed_data_path", "/dbfs/FileStore/tables/p300_files/processed-features/", "DBFS path to processed features")
dbutils.widgets.text("serving_endpoint_name", "p300", "Name of the Serving Endpoint")
dbutils.widgets.text("target_entity_name", "champion", "Served entity to test (like 'champion' or 'challenger')")
dbutils.widgets.text("num_subjects_to_test", "5", "Number of subjects to sample for testing")
dbutils.widgets.text("registered_model_name", "P300_Classifier", "Registered Model Name")
dbutils.widgets.text("experiment_name", f"/Users/{dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()}/P300_BCI_Inference", "MLflow Experiment Name")

# get widget values
processed_data_root_path = dbutils.widgets.get("processed_data_path")
serving_endpoint_name = dbutils.widgets.get("serving_endpoint_name")
target_entity_name = dbutils.widgets.get("target_entity_name")
num_subjects_to_test = int(dbutils.widgets.get("num_subjects_to_test"))
registered_model_name = dbutils.widgets.get("registered_model_name")
mlflow_experiment_name = dbutils.widgets.get("experiment_name")

In [0]:
# --- configure API clients and endpoint URL ---
databricks_host = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
databricks_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
headers = {"Authorization": f"Bearer {databricks_token}", "Content-Type": "application/json"}
invocation_url = f"{databricks_host}/serving-endpoints/{serving_endpoint_name}/served-models/{target_entity_name}/invocations"
print(f"will send inference requests to: {invocation_url}")

# configure MLflow client to handle both legacy and UC model names
mlflow.set_registry_uri("databricks-uc" if "." in registered_model_name else "databricks")
mlflow.set_experiment(mlflow_experiment_name)
mlflow_client = mlflow.tracking.MlflowClient()

In [0]:
# --- data selection logic with holdout set creation ---

# 1. query the serving endpoint API to find which model version the target entity is serving
print(f"querying endpoint '{serving_endpoint_name}' to find version for entity '{target_entity_name}'...")
endpoint_url_get = f"{databricks_host}/api/2.0/serving-endpoints/{serving_endpoint_name}"
response = requests.get(endpoint_url_get, headers=headers)
response.raise_for_status()

endpoint_config = response.json()
# look inside the 'config' and its 'served_entities' list
served_entities = endpoint_config.get("config", {}).get("served_entities", [])
served_model_version = None
served_model_name = None

for entity in served_entities:
    if entity.get("name") == target_entity_name:
        served_model_version = entity.get("entity_version")
        served_model_name = entity.get("entity_name")
        break

print(f"found that entity '{target_entity_name}' is serving model '{served_model_name}' version '{served_model_version}'.")

In [0]:
# 2. get the run ID associated with that specific model version
mlflow.set_registry_uri("databricks") # set to legacy workspace registry
mlflow_client = mlflow.tracking.MlflowClient()
model_version_details = mlflow_client.get_model_version(name=served_model_name, version=served_model_version)
served_model_run_id = model_version_details.run_id

# 3. download the training_subjects.json artifact from that run
display(served_model_run_id)
local_path = mlflow_client.download_artifacts(run_id=served_model_run_id, path="training_subjects.json")
with open(local_path, 'r') as f:
    training_subjects_data = json.load(f)
training_subjects_set = set(training_subjects_data.get("training_subjects", []))
print(f"model version {served_model_version} was trained on {len(training_subjects_set)} subjects. they will be excluded.")

In [0]:
# 4. create the holdout set and sample from it
all_available_subjects_set = {d for d in os.listdir(processed_data_root_path)}
testing_holdout_set = all_available_subjects_set - training_subjects_set
holdout_list = sorted(list(testing_holdout_set))

print(f"found {len(holdout_list)} subjects available for testing.")

subjects_to_test = holdout_list
if len(holdout_list) >= num_subjects_to_test:
    subjects_to_test = random.sample(holdout_list, k=num_subjects_to_test)
else:
    print(f"warning: requested {num_subjects_to_test}, but only {len(holdout_list)} available in holdout set. using all available.")

print(f"final subjects selected for testing: {subjects_to_test}")

In [0]:
# --- inference and evaluation loop ---
all_true_labels, all_pred_labels, all_pred_probas = [], [], []
INFERENCE_BATCH_SIZE = 100 

for subject_name in subjects_to_test:
    print(f"\nprocessing subject {subject_name}...")
    subject_folder_path = os.path.join(processed_data_root_path, subject_name)
    
    features_df = pd.read_parquet(os.path.join(subject_folder_path, "features.parquet"))
    labels_df = pd.read_parquet(os.path.join(subject_folder_path, "labels.parquet"))
    
    num_features = features_df.shape[1]
    features_df.columns = [f"feature_{i}" for i in range(num_features)]

    # --- Mini-batching loop ---
    subject_pred_list = []
    num_rows = len(features_df)
    
    print(f"  sending {num_rows} records in batches of {INFERENCE_BATCH_SIZE}...")
    for i in range(0, num_rows, INFERENCE_BATCH_SIZE):
        batch_df = features_df.iloc[i : i + INFERENCE_BATCH_SIZE]
        
        data_to_send = {
            "dataframe_split": {
                "columns": batch_df.columns.tolist(),
                "data": batch_df.values.tolist()
            }
        }

        response = requests.post(invocation_url, headers=headers, json=data_to_send)
        response.raise_for_status()
        
        response_data = response.json()
        predictions_batch = response_data.get("predictions", [])
        subject_pred_list.extend(predictions_batch)
    # --- End of mini-batching loop ---
    
    subject_pred_labels = [p.get('predicted_label_code') for p in subject_pred_list]
    subject_pred_probas = [p.get('probability_target') for p in subject_pred_list]
    subject_true_labels = labels_df['label'].values
    
    all_true_labels.extend(subject_true_labels)
    all_pred_labels.extend(subject_pred_labels)
    all_pred_probas.extend(subject_pred_probas)
    print(f"  successfully received {len(subject_pred_list)} predictions for {subject_name}.")

In [0]:
# --- final metrics calculation and logging ---
if not all_true_labels:
    print("\nno predictions were made. cannot calculate final metrics.")
else:
    TARGET_CLASS_LABEL_CODE = 2
    
    true_binary = np.where(np.array(all_true_labels) == TARGET_CLASS_LABEL_CODE, 1, 0)
    pred_binary = np.where(np.array(all_pred_labels) == TARGET_CLASS_LABEL_CODE, 1, 0)
    
    accuracy = accuracy_score(true_binary, pred_binary)
    kappa = cohen_kappa_score(true_binary, pred_binary)
    auc = None
    if all(p is not None for p in all_pred_probas) and len(np.unique(true_binary)) > 1:
        auc = roc_auc_score(true_binary, all_pred_probas)
    
    # log results to a new mlflow run
    with mlflow.start_run(run_name=f"evaluation_{target_entity_name}_v{served_model_version}") as run:
        mlflow.log_param("evaluated_model_name", registered_model_name)
        mlflow.log_param("evaluated_model_version", served_model_version)
        mlflow.log_param("evaluated_entity_alias", target_entity_name)
        mlflow.log_param("num_test_subjects", len(subjects_to_test))
        
        metrics_to_log = {"evaluation_accuracy": accuracy, "evaluation_kappa": kappa}
        if auc: metrics_to_log["evaluation_auc"] = auc
        mlflow.log_metrics(metrics_to_log)
        
        print(f"logged evaluation metrics to MLflow run: {run.info.run_id}")