## 2b. Evidence - Robustness QAS Measurements

Evidence collected in this section checks for the Robustness scenarios defined in the previous step. Note that some functions will be loaded from external Python files.

### Initialize MLTE Context

MLTE contains a global context that manages the currently active _session_. Initializing the context tells MLTE how to store all of the artifacts that it produces. This import will also set up global constants related to folders and model to use.

In [None]:
# Sets up context for the model being used, sets up constants related to folders and model data to be used.
from demo.scenarios.session import *

### Helper Functions

General functions and external imports.

In [None]:
# General functions.
from demo.scenarios import garden
import pandas as pd


def calculate_base_accuracy(df_results: pd.DataFrame) -> pd.DataFrame:
    # Calculate the base model accuracy result per data label
    df_pos = (
        df_results[df_results["model correct"] == True].groupby("label").count()
    )
    df_pos.drop(columns=["prediced_label"], inplace=True)
    df_neg = (
        df_results[df_results["model correct"] == False]
        .groupby("label")
        .count()
    )
    df_neg.drop(columns=["prediced_label"], inplace=True)
    df_neg.rename(columns={"model correct": "model incorrect"}, inplace=True)
    df_res = df_pos.merge(
        df_neg, right_on="label", left_on="label", how="outer"
    )
    df_res.fillna(0, inplace=True)
    df_res["model acc"] = df_res["model correct"] / (
        df_res["model correct"] + df_res["model incorrect"]
    )
    df_res["count"] = df_res["model correct"] + df_res["model incorrect"]
    df_res.drop(columns=["model correct", "model incorrect"], inplace=True)
    df_res.head()

    return df_res


def calculate_accuracy_per_set(
    data_folder: str, df_results: pd.DataFrame, df_res: pd.DataFrame
) -> pd.DataFrame:
    # Calculate the model accuracy per data label for each blurred data set
    base_filename = "predictions_test"
    ext_filename = ".csv"
    set_filename = ["_blur2x8", "_blur5x8", "_blur0x8", "_noR", "_noG", "_noB"]

    col_root = "model acc"

    for fs in set_filename:
        filename = os.path.join(data_folder, base_filename + fs + ext_filename)
        colname = col_root + fs

        df_temp = pd.read_csv(filename)
        df_temp.drop(columns=["Unnamed: 0"], inplace=True)

        df_pos = (
            df_temp[df_temp["model correct"] == True].groupby("label").count()
        )
        df_pos.drop(columns=["prediced_label"], inplace=True)
        df_neg = (
            df_results[df_results["model correct"] == False]
            .groupby("label")
            .count()
        )
        df_neg.drop(columns=["prediced_label"], inplace=True)
        df_neg.rename(
            columns={"model correct": "model incorrect"}, inplace=True
        )
        df_res2 = df_pos.merge(
            df_neg, right_on="label", left_on="label", how="outer"
        )
        df_res2.fillna(0, inplace=True)

        df_res2[colname] = df_res2["model correct"] / (
            df_res2["model correct"] + df_res2["model incorrect"]
        )
        df_res2.drop(columns=["model correct", "model incorrect"], inplace=True)

        df_res = df_res.merge(
            df_res2, right_on="label", left_on="label", how="outer"
        )

    df_res.head()
    return df_res


def print_model_accuracy(df_res: pd.DataFrame, key: str, name: str):
    model_acc = sum(df_res[key] * df_res["count"]) / sum(df_res["count"])
    print(name, model_acc)

In [None]:
# Prepare all data. Same as the case above, we will use CSV files that contain results of a previous execution of the model.
df_results = garden.load_base_results(DATASETS_DIR)
df_res = calculate_base_accuracy(df_results)
df_res = calculate_accuracy_per_set(DATASETS_DIR, df_results, df_res)
df_info = garden.load_taxonomy(DATASETS_DIR)
df_all = garden.merge_taxonomy_with_results(df_res, df_info, "label", "Label")

# fill in missing model accuracy data
df_all["model acc_noR"].fillna(0, inplace=True)
df_all["model acc_noG"].fillna(0, inplace=True)
df_all["model acc_noB"].fillna(0, inplace=True)

### Measurements

Now do the actual measurements. First simply see the model accuracy across blurs.

In [None]:
# view changes in model accuracy
print_model_accuracy(df_res, "model acc", "base model accuracy")
print_model_accuracy(
    df_res, "model acc_blur2x8", "model accuracy with 2x8 blur"
)
print_model_accuracy(
    df_res, "model acc_blur5x8", "model accuracy with 5x8 blur"
)
print_model_accuracy(
    df_res, "model acc_blur0x8", "model accuracy with 0x8 blur"
)

Measure the ranksums (p-value) for all blur cases, using `scipy.stats.ranksums` and the `ExternalMeasurement` wrapper.

In [None]:
import scipy.stats

from demo.scenarios.values.ranksums import RankSums
from mlte.measurement.external_measurement import ExternalMeasurement

my_blur = ["2x8", "5x8", "0x8"]
for i in range(len(my_blur)):
    # Define measurements.
    ranksum_measurement = ExternalMeasurement(
        f"ranksums blur{my_blur[i]}", RankSums, scipy.stats.ranksums
    )

    # Evaluate.
    ranksum: RankSums = ranksum_measurement.evaluate(
        df_res["model acc"], df_res[f"model acc_blur{my_blur[i]}"]
    )

    # Inspect values
    print(ranksum)

    # Save to artifact store
    ranksum.save(force=True)

Now to next part of the question- is this equal across the phylogenic groups?

First we will check the effect of blur for Clade 2.

In [None]:
from typing import List

from demo.scenarios.values.multiple_ranksums import MultipleRanksums

# use the initial result, blur columns to anaylze effect of blur
df_all["delta_2x8"] = df_all["model acc"] - df_all["model acc_blur2x8"]
df_all["delta_5x8"] = df_all["model acc"] - df_all["model acc_blur5x8"]
df_all["delta_0x8"] = df_all["model acc"] - df_all["model acc_blur0x8"]

pops = df_all["Clade2"].unique().tolist()
blurs = [
    "delta_2x8",
    "delta_5x8",
    "delta_0x8",
]

ranksums: List = []
for i in range(len(blurs)):
    for pop1 in pops:
        for pop2 in pops:
            ranksum_measurement = ExternalMeasurement(
                f"ranksums clade2 {pop1}-{pop2} blur{blurs[i]}",
                RankSums,
                scipy.stats.ranksums,
            )
            ranksum: RankSums = ranksum_measurement.evaluate(
                df_all[df_all["Clade2"] == pop1][blurs[i]],
                df_all[df_all["Clade2"] == pop2][blurs[i]],
            )
            print(f"blur {blurs[i]}: {ranksum}")
            ranksums.append({ranksum.identifier: ranksum.array})

multiple_ranksums_meas = ExternalMeasurement(
    f"multiple ranksums for clade2", MultipleRanksums, lambda x: x
)
multiple_ranksums: MultipleRanksums = multiple_ranksums_meas.evaluate(ranksums)
multiple_ranksums.num_pops = len(pops)
multiple_ranksums.save(force=True)

Now we check between clade 2 and clade 3.

In [None]:
df_now = (
    df_all[["Clade2", "Clade 3"]]
    .copy()
    .groupby(["Clade2", "Clade 3"])
    .count()
    .reset_index()
)
ps1 = df_now["Clade2"].to_list()
ps2 = df_now["Clade 3"].to_list()
print(df_now)

ranksums: List = []
for k in range(len(blurs)):
    print("\n", blurs[k])
    for i in range(len(ps1)):
        p1c1 = ps1[i]
        p1c2 = ps2[i]
        for j in range(len(ps1)):
            p2c1 = ps1[j]
            p2c2 = ps2[j]
            if (
                len(
                    df_all[
                        (df_all["Clade2"] == p1c1) & (df_all["Clade 3"] == p2c2)
                    ][blurs[k]]
                )
                > 0
                | len(
                    df_all[
                        (df_all["Clade2"] == p2c1) & (df_all["Clade 3"] == p2c2)
                    ][blurs[k]]
                )
                > 0
            ):
                ranksum_measurement = ExternalMeasurement(
                    f"ranksums {p1c1}-{p2c2} - {p2c1}-{p2c2} blur{blurs[k]}",
                    RankSums,
                    scipy.stats.ranksums,
                )
                ranksum: RankSums = ranksum_measurement.evaluate(
                    df_all[
                        (df_all["Clade2"] == p1c1) & (df_all["Clade 3"] == p2c2)
                    ][blurs[k]],
                    df_all[
                        (df_all["Clade2"] == p2c1) & (df_all["Clade 3"] == p2c2)
                    ][blurs[k]],
                )
                ranksums.append({ranksum.identifier: ranksum.array})

multiple_ranksums_meas = ExternalMeasurement(
    f"multiple ranksums between clade2 and 3", MultipleRanksums, lambda x: x
)
multiple_ranksums: MultipleRanksums = multiple_ranksums_meas.evaluate(ranksums)
multiple_ranksums.num_pops = len(ps1)
multiple_ranksums.save(force=True)