In [None]:
import json
import math
import pickle
from pathlib import Path

import joblib
import numpy as np
import pandas as pd
from sklearn.metrics import f1_score, mean_squared_error
from tqdm.notebook import trange

try:
    from conformal_data_cleaning.config import error_fractions, error_types
    from conformal_data_cleaning.data import _ID_TO_TASK_TYPE, AVAILABLE_DATASETS, TaskType
    from conformal_data_cleaning.evaluation.utils import AutoInitializedDict
    from conformal_data_cleaning.jenga_extension import OpenMLTask, get_OpenMLTask

# package is not available in remote jupyter
# for this reason, these files are copied over
except ModuleNotFoundError:
    import sys

    sys.path.append("..")

    from config import error_fractions, error_types
    from evaluation.utils import AutoInitializedDict
    from jenga_extension import OpenMLTask, get_OpenMLTask

    from data import _ID_TO_TASK_TYPE, AVAILABLE_DATASETS, TaskType

In [None]:
experiment_name = "final-experiments"

processed_path = Path("../../processed")
experiment_results_path = Path(f"../../results/{experiment_name}")
experiment_models_path = Path(f"../../models/{experiment_name}")

experiment_processed_path = processed_path / experiment_name
if not experiment_processed_path.exists():
    experiment_processed_path.mkdir()

task_cache_file = processed_path / "task_cache.joblib"
results_file = experiment_processed_path / "results_cache.csv"

cleaned_data_cache_file = experiment_processed_path / "cleaned_data_cache.joblib"
original_performances_cache_file = experiment_processed_path / "original_performances_cache.joblib"
results_as_list_cache_file = experiment_processed_path / "results_as_list_cache.joblib"

models_leaderboard_cache_file = processed_path / "models_leaderboard_cache.csv"
dataset_statistics_file = processed_path / "dataset_statistics.csv"

# Calculate and Cache Dataset Statistics

In [None]:
def calculate_errors(correct_task: OpenMLTask, to_test_data: pd.DataFrame, actual_error_fraction: float) -> tuple:
    clean_test_data = correct_task.test_data

    if actual_error_fraction == 0:
        categorical_error = (1.0, 1.0) if len(correct_task.categorical_columns) > 0 else (None, None)
        numerical_error = 0.0 if len(correct_task.numerical_columns) > 0 else None

    else:
        if len(correct_task.categorical_columns) > 0:
            macro = f1_score(
                clean_test_data[correct_task.categorical_columns].astype(str).to_numpy().flatten(),
                to_test_data[correct_task.categorical_columns].astype(str).to_numpy().flatten(),
                average="macro",
            )

            weighted = f1_score(
                clean_test_data[correct_task.categorical_columns].astype(str).to_numpy().flatten(),
                to_test_data[correct_task.categorical_columns].astype(str).to_numpy().flatten(),
                average="weighted",
            )

            categorical_error = (macro, weighted)
        else:
            categorical_error = (None, None)

        if len(correct_task.numerical_columns) > 0:
            numerical_error = math.sqrt(
                mean_squared_error(
                    clean_test_data[correct_task.numerical_columns].to_numpy().flatten(),
                    to_test_data[correct_task.numerical_columns].to_numpy().flatten(),
                ),
            )

        else:
            numerical_error = None

    return categorical_error, numerical_error

In [None]:
# check whether to use cache
if not task_cache_file.exists() or not dataset_statistics_file.exists():
    task_cache = AutoInitializedDict()
    dataset_statistics_as_list = []

    for i in trange(len(AVAILABLE_DATASETS)):
        task_id = AVAILABLE_DATASETS[i]
        correct_task = get_OpenMLTask(task_id=task_id)
        task_cache["correct_task"][task_id] = correct_task

        clean_training_data = correct_task.train_data
        clean_test_data = correct_task.test_data

        task_type = _ID_TO_TASK_TYPE[task_id].value
        num_categorical = len(correct_task.categorical_columns)
        num_numerical = len(correct_task.numerical_columns)
        num_training_examples = len(clean_training_data)

        fraction_of_information_X_y = fraction_of_information(clean_training_data, correct_task.train_labels.to_frame())
        fraction_of_information_column_to_rest = [
            fraction_of_information(
                clean_training_data[[x for x in clean_training_data.columns if x != column]],
                clean_training_data[[column]],
            )
            for column in clean_training_data.columns
        ]
        fraction_of_information_column_to_rest_mean = np.mean(fraction_of_information_column_to_rest)
        fraction_of_information_column_to_rest_std = np.std(fraction_of_information_column_to_rest)

        for error_type in error_types:
            for error_fraction in error_fractions:
                corrupted_task = get_OpenMLTask(task_id=task_id, corruption=error_type, fraction=error_fraction)
                corrupted_data = corrupted_task.test_data

                error_mask = clean_test_data != corrupted_data
                actual_error_fraction = error_mask.sum().sum() / corrupted_data.size
                (categorical_error, categorical_error_weighted), numerical_error = calculate_errors(
                    correct_task=correct_task,
                    to_test_data=corrupted_data,
                    actual_error_fraction=actual_error_fraction,
                )

                task_cache["corrupted_task"][task_id][error_type][error_fraction]["task"] = corrupted_task
                task_cache["corrupted_task"][task_id][error_type][error_fraction]["error_mask"] = error_mask
                dataset_statistics_as_list.append(
                    (
                        task_id,
                        error_type,
                        error_fraction,
                        actual_error_fraction,
                        task_type,
                        num_categorical,
                        num_numerical,
                        num_training_examples,
                        categorical_error,
                        categorical_error_weighted,
                        numerical_error,
                        fraction_of_information_X_y,
                        fraction_of_information_column_to_rest_mean,
                        fraction_of_information_column_to_rest_std,
                    ),
                )

    dataset_statistics = (
        pd.DataFrame(
            dataset_statistics_as_list,
            columns=[
                "task_id",
                "error_type",
                "error_fraction",
                "actual_error_fraction",
                "task_type",
                "num_categorical",
                "num_numerical",
                "num_training_examples",
                "categorical_error",
                "categorical_error_weighted",
                "numerical_error",
                "fraction_of_information_X_y",
                "fraction_of_information_column_to_rest_mean",
                "fraction_of_information_column_to_rest_std",
            ],
        )
        # round 'actual_error_fraction' to 5 decimals
        .assign(actual_error_fraction=lambda df: df["actual_error_fraction"].apply(lambda x: round(x, 5)))
        .convert_dtypes()
    )

    # save files
    joblib.dump(task_cache, task_cache_file)
    dataset_statistics.to_csv(dataset_statistics_file, index=False)

else:
    task_cache = joblib.load(task_cache_file)
    dataset_statistics = pd.read_csv(dataset_statistics_file).convert_dtypes()

# Cache Original Downstream Performances

In [None]:
# check whether to use cache
if not original_performances_cache_file.exists():
    original_performances = AutoInitializedDict()

    original_perf_paths = list(experiment_results_path.rglob("**/original_perf.json"))
    for i in trange(len(original_perf_paths)):
        original_performance_file = original_perf_paths[i]

        if not Path(*original_performance_file.parts[:7], "finished.txt").exists():
            continue

        task_id = int(original_performance_file.parts[4])
        cleaner_type = str(original_performance_file.parts[5])
        confidence_level = float(original_performance_file.parts[6])
        repetition = int(original_performance_file.parts[7])
        task_type = _ID_TO_TASK_TYPE[task_id]

        if task_type == TaskType.BINARY or task_type == TaskType.MULTI_CLASS:
            metric_name = "F1_macro"

        elif task_type == TaskType.REGRESSION:
            metric_name = "RMSE"

        else:
            raise Exception(f"task_type ({task_type}) not valid.")

        original_performances[task_id][cleaner_type][confidence_level][repetition] = (
            json.loads(original_performance_file.read_text())[metric_name],
            metric_name,
        )

    # save files
    joblib.dump(original_performances, original_performances_cache_file)

else:
    original_performances = joblib.load(original_performances_cache_file)

# Cache Cleaned Data

In [None]:
# check whether to use cache
if not cleaned_data_cache_file.exists():
    cleaned_data_cache = AutoInitializedDict()

    cleaned_data_file_paths = list(experiment_results_path.rglob("**/cleaned_data.csv"))
    for i in trange(len(cleaned_data_file_paths)):
        cleaned_data_file = cleaned_data_file_paths[i]
        cleaned_mask_file = cleaned_data_file.parent / "cleaned_mask.csv"

        if not Path(*cleaned_data_file.parts[:7], "finished.txt").exists():
            continue

        task_id = int(cleaned_data_file.parts[4])
        cleaner_type = str(cleaned_data_file.parts[5])
        confidence_level = float(cleaned_data_file.parts[6])
        repetition = int(cleaned_data_file.parts[7])
        error_type = cleaned_data_file.parts[8]
        error_fraction = float(cleaned_data_file.parts[9])

        cleaned_data = pd.read_csv(cleaned_data_file).convert_dtypes()
        cleaned_mask = pd.read_csv(cleaned_mask_file).convert_dtypes()

        cleaned_data_cache[task_id][cleaner_type][confidence_level][repetition][error_type][error_fraction][
            "data"
        ] = cleaned_data
        cleaned_data_cache[task_id][cleaner_type][confidence_level][repetition][error_type][error_fraction][
            "mask"
        ] = cleaned_mask

    # save files
    joblib.dump(cleaned_data_cache, cleaned_data_cache_file)

else:
    cleaned_data_cache = joblib.load(cleaned_data_cache_file)

# Read, Calculate, and Cache Results

In [None]:
def read_downstream_performance_for(file: Path):
    task_id = int(file.parts[4])
    cleaner_type = str(file.parts[5])
    confidence_level = float(file.parts[6])
    repetition = int(file.parts[7])

    original_performance, metric_name = original_performances[task_id][cleaner_type][confidence_level][repetition]
    corrupted_performance_file = file.parent / "corrupted_perf.json"
    cleaned_performance_file = file.parent / "cleaned_perf.json"

    return (
        original_performance,
        json.loads(corrupted_performance_file.read_text())[metric_name],
        json.loads(cleaned_performance_file.read_text())[metric_name],
        metric_name,
    )


def read_and_calc_downstream_improvement_in_percent_for(file: Path):
    (
        original_performance,
        corrupted_performance,
        cleaned_performance,
        metric_name,
    ) = read_downstream_performance_for(file)

    if metric_name == "F1_macro":
        improvement_in_percent = (cleaned_performance - corrupted_performance) / corrupted_performance * 100

    elif metric_name == "RMSE":
        improvement_in_percent = -((cleaned_performance - corrupted_performance) / corrupted_performance) * 100

    else:
        raise Exception(f"metric_name ({metric_name}) not valid.")

    return (
        original_performance,
        corrupted_performance,
        cleaned_performance,
        metric_name,
        improvement_in_percent,
    )


def get_dataset_statistics_for(task_id: int, error_type: str, error_fraction: float) -> dict:
    statistics = dataset_statistics[
        (dataset_statistics["task_id"] == task_id)
        & (dataset_statistics["error_type"] == error_type)
        & (dataset_statistics["error_fraction"] == error_fraction)
    ]

    if len(statistics) != 1:
        raise Exception(
            f"Statistics for 'task_id={task_id}', 'error_type={error_type}', 'error_fraction={error_fraction}' have none or more than one entry",
        )

    return statistics.iloc[0].to_dict()


def is_improvement(row: pd.Series) -> bool:
    if row["task_type"] == "regression":
        # RMSE
        return row["cleaned_performance__mean"] < row["corrupted_performance__mean"]
    else:
        # F1
        return row["cleaned_performance__mean"] > row["corrupted_performance__mean"]

In [None]:
if not results_as_list_cache_file.exists():
    results_as_list = []

    files = list(experiment_results_path.rglob("**/cleaned_perf.json"))
    for i in trange(len(files)):
        file = files[i]

        if not Path(*file.parts[:7], "finished.txt").exists():
            continue

        # get data from path
        task_id = int(file.parts[4])
        cleaner_type = str(file.parts[5])
        confidence_level = float(file.parts[6])
        repetition = int(file.parts[7])
        error_type = file.parts[8]
        error_fraction = float(file.parts[9])

        # get and unpack intermediate/necessary data
        try:
            (
                original_performance,
                corrupted_performance,
                cleaned_performance,
                metric_name,
                improvement_in_percent,
            ) = read_and_calc_downstream_improvement_in_percent_for(file)
        except ValueError:
            continue

        current_dataset_statistics = get_dataset_statistics_for(
            task_id=task_id,
            error_type=error_type,
            error_fraction=error_fraction,
        )
        actual_error_fraction = current_dataset_statistics["actual_error_fraction"]
        corrupted_categorical_error = current_dataset_statistics["categorical_error"]
        corrupted_categorical_weighted_error = current_dataset_statistics["categorical_error_weighted"]
        corrupted_numerical_error = current_dataset_statistics["numerical_error"]
        error_mask = task_cache["corrupted_task"][task_id][error_type][error_fraction]["error_mask"]
        cleaned_data = cleaned_data_cache[task_id][cleaner_type][confidence_level][repetition][error_type][
            error_fraction
        ]["data"]
        cleaned_mask = cleaned_data_cache[task_id][cleaner_type][confidence_level][repetition][error_type][
            error_fraction
        ]["mask"]
        correct_data = task_cache["correct_task"][task_id].test_data

        if cleaner_type == "ConformalAutoGluon":
            prediction_sets_dict = pickle.loads((file.parent / "prediction_sets.pckl").read_bytes())
            coverages = []
            empty_set_fractions = []
            average_set_sizes = []
            relative_average_set_sizes = []

            for column_name, prediction_sets in prediction_sets_dict.items():
                if column_name in task_cache["correct_task"][task_id].categorical_columns:
                    cardinality = len(task_cache["correct_task"][task_id].train_data[column_name].unique())
                    true_value_in_prediction_set = np.any(
                        prediction_sets == correct_data[column_name].to_numpy()[:, np.newaxis],
                        axis=1,
                    )
                    coverages.append(true_value_in_prediction_set.mean())

                    average_set_sizes.append((~pd.DataFrame(prediction_sets).isna()).sum(axis=1).mean())
                    relative_average_set_sizes.append(average_set_sizes[-1] / cardinality)
                    empty_set_fractions.append(((~pd.DataFrame(prediction_sets).isna()).sum(axis=1) == 0).mean())

                elif column_name in task_cache["correct_task"][task_id].numerical_columns:
                    value_range = (
                        task_cache["correct_task"][task_id].train_data[column_name].max()
                        - task_cache["correct_task"][task_id].train_data[column_name].min()
                    )
                    true_value_in_prediction_range = (correct_data[column_name].to_numpy() >= prediction_sets[:, 0]) & (
                        correct_data[column_name].to_numpy() <= prediction_sets[:, 2]
                    )
                    coverages.append(true_value_in_prediction_range.mean())
                    average_set_sizes.append((prediction_sets[:, 2] - prediction_sets[:, 0]).mean())
                    relative_average_set_sizes.append(average_set_sizes[-1] / value_range)
                    empty_set_fractions.append(((prediction_sets[:, 2] - prediction_sets[:, 0]) == 0).mean())

                else:
                    raise Exception(
                        f"Type (categorical or numerical) of '{column_name}' of task '{task_id}' is not known!",
                    )

            coverage = np.mean(coverages)
            empty_set_fraction = np.mean(empty_set_fractions)
            average_set_size = np.mean(average_set_sizes)
            relative_average_set_size = np.mean(relative_average_set_sizes)

        else:
            coverage = empty_set_fraction = average_set_size = relative_average_set_size = None

        # calculate how well errors are detected/fixed
        number_of_errors = error_mask.sum().sum()
        number_of_assumed_errors = cleaned_mask.sum().sum()
        error_detection_fraction = (
            (error_mask & cleaned_mask).sum().sum() / number_of_errors if number_of_errors > 0 else None
        )  # relative to number of errors
        error_wrong_detection_fraction = (~error_mask & cleaned_mask).sum().sum() / (
            ~error_mask
        ).sum().sum()  # relative to number of correct cells
        error_fraction_after_cleaning = (correct_data != cleaned_data).sum().sum() / correct_data.size
        error_reduction = actual_error_fraction - error_fraction_after_cleaning

        (cleaned_categorical_error, cleaned_categorical_error_weighted), cleaned_numerical_error = calculate_errors(
            correct_task=task_cache["correct_task"][task_id],
            to_test_data=cleaned_data,
            actual_error_fraction=actual_error_fraction,
        )

        results_as_list.append(
            (
                task_id,
                cleaner_type,
                confidence_level,
                error_type,
                error_fraction,
                actual_error_fraction,
                repetition,
                original_performance,
                corrupted_performance,
                cleaned_performance,
                improvement_in_percent,
                cleaned_categorical_error,
                cleaned_categorical_error_weighted,
                cleaned_numerical_error,
                cleaned_categorical_error - corrupted_categorical_error
                if cleaned_categorical_error is not None and ~pd.isna(corrupted_categorical_error)
                else None,
                cleaned_categorical_error_weighted - corrupted_categorical_weighted_error
                if cleaned_categorical_error_weighted is not None and ~pd.isna(corrupted_categorical_weighted_error)
                else None,
                corrupted_numerical_error - cleaned_numerical_error
                if cleaned_numerical_error is not None and ~pd.isna(corrupted_numerical_error)
                else None,
                error_detection_fraction,
                error_wrong_detection_fraction,
                error_fraction_after_cleaning,
                error_reduction,
                coverage,
                empty_set_fraction,
                average_set_size,
                relative_average_set_size,
                number_of_errors,
                number_of_assumed_errors,
            ),
        )

    joblib.dump(results_as_list, results_as_list_cache_file)

else:
    results_as_list = joblib.load(results_as_list_cache_file)

In [None]:
dataset_statistics = pd.read_csv(dataset_statistics_file).convert_dtypes()

if not results_file.exists():
    results = (
        pd.DataFrame(
            results_as_list,
            columns=[
                "task_id",
                "cleaner_type",
                "confidence_level",
                "error_type",
                "error_fraction",
                "actual_error_fraction",
                "repetition",
                "original_performance",
                "corrupted_performance",
                "cleaned_performance",
                "improvement_in_percent",
                "cleaned_categorical_error",
                "cleaned_categorical_error_weighted",
                "cleaned_numerical_error",
                "categorical_error_reduction",
                "categorical_error_weighted_reduction",
                "numerical_error_reduction",
                "error_detection_fraction",
                "error_wrong_detection_fraction",
                "error_fraction_after_cleaning",
                "error_reduction",
                "coverage",
                "empty_set_fraction",
                "average_set_size",
                "relative_average_set_size",
                "number_of_errors",
                "number_of_assumed_errors",
            ],
        )
        .convert_dtypes()
        # calculate mean/std over 3 repetitions
        .groupby(
            [
                "task_id",
                "cleaner_type",
                "confidence_level",
                "error_type",
                "error_fraction",
                "actual_error_fraction",
            ],
        )
        .agg(["mean", "std"])
        # remove no longer necessary repetition column
        .drop(columns="repetition", level=0)
        .reset_index()
        # fix column names
        .pipe(lambda df: df.set_axis(df.columns.map(lambda x: f"{x[0]}__{x[1]}" if x[1] else x[0]), axis=1))
        # round 'actual_error_fraction' to 5 decimals
        .assign(actual_error_fraction=lambda df: df["actual_error_fraction"].apply(lambda x: round(x, 5)))
        .merge(
            dataset_statistics,
            on=["task_id", "error_type", "error_fraction", "actual_error_fraction"],
            how="left",
            validate="m:1",
        )
        .assign(has_errors=lambda df: df["actual_error_fraction"] > 0)
        .assign(improvement=lambda df: df.apply(is_improvement, axis=1))
    )

    results.to_csv(results_file, index=False)

else:
    results = pd.read_csv(results_file).convert_dtypes()

# Read and Cache Model Leaderboards

In [None]:
dataset_statistics = pd.read_csv(dataset_statistics_file).convert_dtypes()

if not models_leaderboard_cache_file.exists():
    models_leaderboard_as_list = []

    models_leaderboard_file_paths = list(experiment_models_path.rglob("**/*.csv"))
    for index in trange(len(models_leaderboard_file_paths)):
        leaderboard_temp = pd.read_csv(models_leaderboard_file_paths[index])
        leaderboard_temp["task_id"] = int(models_leaderboard_file_paths[index].parts[4])
        leaderboard_temp["cleaner_type"] = models_leaderboard_file_paths[index].parts[5]
        leaderboard_temp["confidence_level"] = float(models_leaderboard_file_paths[index].parts[6])
        leaderboard_temp["repetition"] = int(models_leaderboard_file_paths[index].parts[7])

        models_leaderboard_as_list.append(leaderboard_temp)

    # create single dataframe
    models_leaderboard = pd.concat(models_leaderboard_as_list, ignore_index=True)

    (
        # drop some not necessary columns
        models_leaderboard.drop(
            columns=[
                "ag_args_fit",
                "ancestors",
                "can_infer",
                "child_ag_args_fit",
                "child_hyperparameters",
                "child_hyperparameters_fit",
                "child_model_type",
                "descendants",
                "features",
                "fit_order",
                "fit_time_marginal",
                "memory_size",
                "memory_size_min",
                "memory_size_min_w_ancestors",
                "memory_size_w_ancestors",
                "num_ancestors",
                "num_descendants",
                "num_features",
                "num_models",
                "num_models_w_ancestors",
                "pred_time_val",
                "pred_time_val_marginal",
                "score_val",
                "stack_level",
            ],
        )
        # add 'task_type' column
        .merge(
            dataset_statistics[["task_id", "task_type"]].drop_duplicates(),
            on="task_id",
            how="inner",
            validate="m:m",
        )
        # finally, save as csv
        .to_csv(models_leaderboard_cache_file, index=False)
    )

else:
    models_leaderboard = pd.read_csv(models_leaderboard_cache_file).convert_dtypes()