# Compute classifier metrics

The snakemake pipeline output the probability of 0 / 1 for each cell for each classifier. Here, we compute and save many common metrics from these probabilities.

In [1]:
# imports
import plotnine as plotnine
import polars as pl
from sklearn.metrics import (
    average_precision_score,
    balanced_accuracy_score,
    confusion_matrix,
    f1_score,
    roc_auc_score,
)
from tqdm import tqdm

In [None]:
# paths
pipeline = "profiles_tcdropped_filtered_var_mad_outlier_featselect_filtcells_metacorr"
snakemake_dir = "/dgx1nas1/storage/data/jess/repos/2021_09_01_VarChAMP/6.downstream_analysis_snakemake"
res_b7 = f"{snakemake_dir}/outputs/results/2024_01_23_Batch_7/{pipeline}"
res_b8 = f"{snakemake_dir}/outputs/results/2024_02_06_Batch_8/{pipeline}"
metrics_dir = "/dgx1nas1/storage/data/jess/varchamp/sc_data/classification_results/B7B8_1percent_updatedmeta"

In [None]:
# read in bb classifier info
info_b7 = pl.read_csv(f"{res_b7}/classifier_info.csv")
info_b7 = info_b7.with_columns(
    (pl.col("trainsize_1") / (pl.col("trainsize_0") + pl.col("trainsize_1"))).alias(
        "train_prob_1"
    ),
    (pl.col("testsize_1") / (pl.col("testsize_0") + pl.col("testsize_1"))).alias(
        "test_prob_1"
    ),
)

info_b8 = pl.read_csv(f"{res_b8}/classifier_info.csv")
info_b8 = info_b8.with_columns(
    (pl.col("trainsize_1") / (pl.col("trainsize_0") + pl.col("trainsize_1"))).alias(
        "train_prob_1"
    ),
    (pl.col("testsize_1") / (pl.col("testsize_0") + pl.col("testsize_1"))).alias(
        "test_prob_1"
    ),
)

info = pl.concat([info_b7, info_b8])

In [None]:
# classifier predictions
preds_b8 = pl.scan_parquet(f"{res_b8}/predictions.parquet")
preds_b8 = preds_b8.with_columns(pl.lit("batch8").alias("Batch")).collect()

preds_b7 = pl.scan_parquet(f"{res_b7}/predictions.parquet")
preds_b7 = preds_b7.with_columns(pl.lit("batch7").alias("Batch")).collect()


preds = pl.concat([preds_b7, preds_b8]).with_columns(
    pl.concat_str(
        [pl.col("Classifier_ID"), pl.col("Metadata_Protein"), pl.col("Batch")],
        separator="_",
    ).alias("Full_Classifier_ID")
)

In [None]:
# Define a function to compute metrics for each group
def compute_aubprc(auprc, prior):
    return (auprc * (1 - prior)) / ((auprc * (1 - prior)) + ((1 - auprc) * prior))


def compute_metrics(group):
    y_true = group["Label"].to_numpy()
    y_prob = group["Prediction"].to_numpy()
    y_pred = (y_prob > 0.5).astype(int)
    prior = sum(y_true == 1) / len(y_true)

    class_ID = group["Classifier_ID"].unique()[0]

    # Compute AUROC
    auroc = roc_auc_score(y_true, y_prob)

    # Compute AUPRC
    auprc = average_precision_score(y_true, y_prob)
    aubprc = compute_aubprc(auprc, prior)

    # Compute macro-averaged F1 score
    macro_f1 = f1_score(y_true, y_pred, average="macro")

    # Compute sensitivity and specificity
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    sensitivity = tp / (tp + fn)
    specificity = tn / (tn + fp)

    # Compute balanced accuracy
    balanced_acc = balanced_accuracy_score(y_true, y_pred)

    return {
        "AUROC": auroc,
        "AUPRC": auprc,
        "AUBPRC": aubprc,
        "Macro_F1": macro_f1,
        "Sensitivity": sensitivity,
        "Specificity": specificity,
        "Balanced_Accuracy": balanced_acc,
        "Classifier_ID": class_ID,
    }

In [None]:
# Initialize an empty list to store the results
results = []
classIDs = preds.select("Full_Classifier_ID").to_series().unique().to_list()

# Group by Classifier_ID and compute metrics for each group
for id in tqdm(classIDs):
    metrics = compute_metrics(preds.filter(pl.col("Full_Classifier_ID") == id))
    metrics["Full_Classifier_ID"] = id
    results.append(metrics)

# Convert the results to a Polars DataFrame
metrics_df = pl.DataFrame(results)

# Add classifier info and save
metrics_df = metrics_df.join(info, on="Classifier_ID")
metrics_df = metrics_df.with_columns(
    (
        pl.max_horizontal(["trainsize_0", "trainsize_1"])
        / pl.min_horizontal(["trainsize_0", "trainsize_1"])
    ).alias("Training_imbalance"),
    (
        pl.max_horizontal(["testsize_0", "testsize_1"])
        / pl.min_horizontal(["testsize_0", "testsize_1"])
    ).alias("Testing_imbalance"),
)
metrics_df.write_csv(f"{metrics_dir}/metrics.csv")