# Benchmarks

## Initialize

In [None]:
%load_ext autoreload
%autoreload 2

import os
from tqdm.auto import tqdm
import pathlib

import numpy as np
import pandas as pd
import lifelines

In [None]:
%env MKL_NUM_THREADS=1
%env NUMEXPR_NUM_THREADS=1
%env OMP_NUM_THREADS=1

In [None]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=50, threads_per_worker=1)
client = Client(cluster)
cluster.scheduler

In [None]:
project_name = "210616_centres_dask"
data_path = "/data/analysis/ag-reils/steinfej"
data_pre = f"{data_path}/data/2_datasets_pre/{project_name}"
data_post = f"{data_path}/data/3_datasets_post/{project_name}"

project_label = "21_PGS_Revision"
project_path = f"/data/analysis/ag-reils/ag-reils-shared/cardioRS/results/projects/{project_label}"
figures_path = f"{project_path}/figures"
data_results_path = f"{project_path}/data"
pathlib.Path(figures_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(data_results_path).mkdir(parents=True, exist_ok=True)

In [None]:
endpoints = ['MACE']
endpoint_labels = sorted([f"{e}_event" for e in endpoints]+[f"{e}_event_time" for e in endpoints])
endpoint_data =  pd.read_feather(f"{data_post}/data_merged.feather", columns=["eid"] + endpoint_labels)

In [None]:
data_temp = pd.read_feather(f"{data_post}/data_merged.feather")
eids_endpoint = {}
for endpoint in tqdm(endpoints):
    if endpoint == "MACE": eids_incl = data_temp.copy().query(f"myocardial_infarction==False&stroke==False&statins==False").eid.to_list()
    else: eids_incl=data_temp.copy().eid.to_list()
    print(endpoint, len(eids_incl))
    eids_endpoint[endpoint] = eids_incl

In [None]:
partitions = [i for i in range(22)]
splits = ["train", "valid", "test"]
eids_partition_split = {partition: {split: pd.read_feather(f"{data_post}/partition_{partition}/{split}/data.feather").eid.to_list() for split in splits} for partition in tqdm(partitions)}

In [None]:
def convert_to_float32(df):
    for col in tqdm(df.columns.to_list()):
        if df[col].dtype == "float64": 
            print(col, "convert")
            df[col]= df[col].astype("float32")
    return df

def convert_to_category(df):
    for col in tqdm(df.columns.to_list()):
        if df[col].dtype == "object": df[col]= df[col].astype("category")
    return df

In [None]:
preds_models = pd.read_feather(f"{data_results_path}/predictions_model_210703_FINAL.feather").query("endpoint=='MACE'")#")#&module=='DSM'")

In [None]:
preds_cox = pd.read_feather(f"{data_results_path}/predictions_cox_210631_REVISION.feather").query("endpoint=='MACE'")

In [None]:
preds_scores = pd.read_csv(f"{data_results_path}/predictions_scores_210616.csv")#.assign(endpoint="MACE")#.query("endpoint=='MACE'")

In [None]:
preds_scores_list = []
for endpoint in endpoints:
    for partition in partitions:
        for split in splits:
            eids_incl = eids_endpoint[endpoint]
            eids_split = eids_partition_split[partition][split]
            data_temp = preds_scores.query("eid==@eids_incl").query("eid==@eids_split").assign(endpoint=endpoint, partition=partition, split=split)
            print(endpoint, partition, split, len(data_temp))
            preds_scores_list.append(data_temp)

In [None]:
preds_scores = pd.concat(preds_scores_list, axis=0).reset_index(drop=True)

In [None]:
eids_incl = eids_endpoint["MACE"]
preds = pd.concat([preds_models, preds_cox, preds_scores], axis=0)#.query("eid==@eids_incl")

In [None]:
del preds["Ft_0"]

In [None]:
del preds_models
del preds_cox
del preds_scores

## Calibration DF

In [None]:
time_cols = {t: f"Ft_{t}" for t in range(1,26)}

In [None]:
data_train = preds.query("split=='train'")
data_train

In [None]:
data_fit = data_train[["eid"]+["endpoint", "module", "features", "partition"] + list(time_cols.values())]

In [None]:
modules = data_train.module.unique().tolist()
features = data_train.features.unique().tolist()
partitions = data_train.partition.unique().tolist()

In [None]:
from lifelines import CRCSplineFitter
def get_observed_probability(F_t, events, durations, t):
    def ccl(p): return np.log(-np.log(1 - p))
    T = "time"
    E = "event"
    predictions_at_t = np.clip(F_t, 1e-5, 1 - 1e-5)
    prediction_df = pd.DataFrame({f"ccl_at_{t}": ccl(predictions_at_t), T: durations, E: events})

    index_old = prediction_df.index
    prediction_df = prediction_df.dropna()
    index_new = prediction_df.index
    diff = index_old.difference(index_new)

    knots=3
    crc = CRCSplineFitter(knots, penalizer=0.00001)
    
    regressors = {"beta_": [f"ccl_at_{t}"], **{f"gamma{i}_":"1" for i in range(knots)}}
    crc.fit_right_censoring(prediction_df, T, E, regressors=regressors)
    
    risk_obs = (1 - crc.predict_survival_function(prediction_df, times=[t])).T.squeeze()
    return risk_obs, diff.to_list()

In [None]:
from sklearn.isotonic import IsotonicRegression

def fit_isotonic_regression(df, endpoint, time_cols):
    isoreg_dict = {}
    for t, col in tqdm(time_cols.items()):
        if df[f"Ft_{t}"].nunique()>1:
            risk_obs = pd.Series(np.nan)
            i=1
            while risk_obs.nunique()<=1: 
                df_temp = df.sample(frac=0.5, replace=True).dropna(subset=[col])
                try:
                    risk_obs, nan = get_observed_probability(df_temp[col].values, 
                                                                df_temp[f"{endpoint}_event"].values, 
                                                                df_temp[f"{endpoint}_event_time"].values,
                                                                t)
                except:
                    if i==20: break
                    i+=1
            risk_pred = df_temp.drop(df_temp.index[nan])[col].reset_index(drop=True)
            
            isoreg_dict[t] = IsotonicRegression().fit(risk_pred.values,risk_obs.values)
        else: isoreg_dict[t] = None
    return isoreg_dict

In [None]:
from dask.diagnostics import ProgressBar

with ProgressBar():
    ir_dict = {}
    for endpoint in endpoints:
        temp = data_fit.query("endpoint==@endpoint").merge(endpoint_data[["eid", f"{endpoint}_event", f"{endpoint}_event_time"]], on="eid", how="left")
        for module in tqdm(modules):
            temp_module = temp.query("module==@module")
            for feature in tqdm(features, leave=False):
                temp_feature = temp_module.query("features==@feature").drop(["endpoint", "module", "features"], axis=1)
                if len(temp_feature)>0:
                    for partition in tqdm(partitions, leave=False):                  
                        temp_partition = temp_feature.query("partition==@partition").drop(["partition"], axis=1)
                        if len(temp_partition)>0:
                            data_future = client.scatter(temp_partition)
                            ir_dict[f"{endpoint}_{module}_{feature}_{partition}"] = client.submit(fit_isotonic_regression, data_future, endpoint, time_cols=time_cols)

In [None]:
from dask.distributed import progress
progress(ir_dict)

In [None]:
ir_dict = client.gather(ir_dict)

In [None]:
preds_calib_list = []

for endpoint in endpoints:
    temp = preds.query("endpoint==@endpoint")
    for module in tqdm(modules):
        temp_module = temp.query("module==@module")
        for feature in tqdm(features):
            temp_feature = temp_module.query("features==@feature")
            if len(temp_feature)>0:
                for partition in tqdm(partitions): 
                    if module!="SCORE":
                        temp_partition = temp_feature.query("partition==@partition").dropna(subset=list(time_cols.values())) 
                    else: 
                        temp_partition = temp_feature.query("partition==@partition")
                    isoreg_str = f"{endpoint}_{module}_{feature}_{partition}"
                    if len(temp_partition)>0:
                        if isoreg_str in ir_dict:
                            isoreg_dict = ir_dict[isoreg_str]
                            for t, col in time_cols.items(): 
                                if isinstance(isoreg_dict, dict):
                                    isoreg = isoreg_dict[t]
                                    if isinstance(isoreg, IsotonicRegression):
                                        calibrated = isoreg_dict[t].predict(temp_partition[col])
                                        #if np.isnan(np.sum(calibrated)): print(np.sum(np.isnan(calibrated)), "NaNs in calibrated probabilites")
                                        temp_partition[col] = calibrated
                                    else:
                                        print("Not working t", endpoint, module, feature, isoreg_str, t)
                                        temp_partition[col] = np.nan
                                else:
                                    print("Not working", endpoint, module, feature, isoreg_str, t)
                                    temp_partition[col] = np.nan
                            preds_calib_list.append(temp_partition)
                        else:
                            print(isoreg_str, "not available")
                            pass
                    else:
                        print("No data available for", isoreg_str)
                        pass
            else:
                print(f"No data available for {endpoint}_{module}_{feature}")
                pass
from IPython.display import clear_output
clear_output()

In [None]:
preds_calib = pd.concat(preds_calib_list, axis=0).assign(calibrated=True).query("eid==@eids_incl").reset_index(drop=True)

In [None]:
preds_calib.to_feather(f"{data_results_path}/predictions_210703_centres_FINAL.feather")