In [7]:
import pandas as pd 
import numpy as np
from sklearn.metrics.cluster import adjusted_rand_score
import math

## Clustering ARI

In [None]:
def eval_clustering_ARI(gt_variant_df, output_variant_df):
    # Get IDs of mutations
    gt_variant_df['mut_id'] = gt_variant_df.apply((lambda x: '{}:{}:{}:{}'.format(x['CHR'], x['POS'], x['REF'], x['ALT'])), axis=1)
    output_variant_df['mut_id'] = output_variant_df.apply((lambda x: '{}:{}:{}:{}'.format(x['CHR'], x['POS'], x['REF'], x['ALT'])), axis=1)
    
    # Get mutation cluster assignment
    gt_use = gt_variant_df[['mut_id', 'CLUSTER']].drop_duplicates()
    output_use = output_variant_df[['mut_id', 'CLUSTER']].drop_duplicates()
    
    # Merge data and calculate ARI
    compare_data = gt_use.merge(output_use, on=['mut_id'], suffixes=['_gt', '_output'], how='inner')
    ari = adjusted_rand_score(compare_data['CLUSTER_gt'], compare_data['CLUSTER_output'])
    return ari

## Mutation descendant accuracy

In [None]:
def eval_mutation_descendant_accuracy(gt_variant_df, gt_edges_df, output_variant_df, output_edges_df):
    # Get IDs of mutations
    gt_variant_df['mut_id'] = gt_variant_df.apply((lambda x: '{}:{}:{}:{}'.format(x['CHR'], x['POS'], x['REF'], x['ALT'])), axis=1)
    output_variant_df['mut_id'] = output_variant_df.apply((lambda x: '{}:{}:{}:{}'.format(x['CHR'], x['POS'], x['REF'], x['ALT'])), axis=1)
    
    #Remove noise mutations & subset just ['mut_id' and 'cluster'] columns
    gt_not_noise = gt_variant_df[gt_variant_df.NOISE == False]['mut_id'].tolist()
    gt_use = gt_variant_df[gt_variant_df['mut_id'].isin(gt_not_noise)][['mut_id', 'CLUSTER']].drop_duplicates()
    output_use = output_variant_df[output_variant_df['mut_id'].isin(gt_not_noise)][['mut_id', 'CLUSTER']].drop_duplicates()
    
    # Get descendants of edges
    gt_tree_descendants = get_tree_edges(gt_edges_df, add_root_node=False)
    output_tree_descendants = get_tree_edges(output_edges_df, add_root_node=True)
    
    # Merge data
    compare_data = gt_use.merge(output_use, on=['mut_id'], suffixes=['_gt', '_output'], how='inner')
    
    # Calculate mutation descendant accuracy
    accuracy = calculate_md_accuracy(compare_data, gt_tree_descendants, output_tree_descendants)
    return accuracy
    
    
def get_tree_edges(tree, add_root_node=False):
    tree = tree[['Parent', 'Child']].copy()
    max_node = tree[['Parent', 'Child']].max().max()
    edges = list(tree[['Parent', 'Child']].apply(tuple, axis=1))
    parents, children = np.array(list(zip(*edges)))
    leaves = children[~np.isin(children, parents)]
    
    # Add fake normal root node (with label max(nodes)+1) to output (if output does not contain root node)
    if add_root_node:
        root = parents[~np.isin(parents, children)][0]
        tree = pd.concat([tree, pd.DataFrame.from_records({'Parent': max_node+1, 'Child': root}, index=[1])]).reset_index(drop=True)
        edges.append((max_node+1, root))

    tree['Descendants'] = [get_descendants(edge, edges, leaves, [])[1:] for edge in edges]
    tree_use = tree.set_index('Child')
    return tree_use


def get_descendants(edge, edges, leaves, descendants=[]):
    if edge[-1] in leaves:
        descendants.append(edge[-1])
        return descendants
    else:
        descendants.append(edge[-1])
        edge_children = [i for i in edges if i[0] == edge[1]]
        for j in edge_children:
            descendants = get_descendants(j, edges, leaves, descendants)
        return descendants
    
def calculate_md_accuracy(variant_data, gt_tree, output_tree):
    gt_variant_dict = variant_data['CLUSTER_gt'].to_dict()
    output_variant_dict = variant_data['CLUSTER_output'].to_dict()
    gt_descendants_dict = gt_tree['Descendants'].to_dict()
    output_descendants_dict = output_tree['Descendants'].to_dict()
    relationship = (lambda mut1, mut2, mut_dict, tree_dict: 1 if mut_dict[mut1] in tree_dict[mut_dict[mut2]]
                                                            else 2 if mut_dict[mut2] in tree_dict[mut_dict[mut1]]
                                                            else 3 if mut_dict[mut1] == mut_dict[mut2]
                                                            else 0)
    eval = (lambda sim, tx: sim == tx)
    mutation_relationships = sum(eval(relationship(mut1, mut2, gt_variant_dict, gt_descendants_dict),
                                    relationship(mut1, mut2, output_variant_dict, output_descendants_dict)) for mut1, mut2 in combinations(sim_mut.index, 2))
                          
    return mutation_relationships/math.comb(len(variant_data), 2)

## Presence / Absence of mutations

In [None]:
def eval_presabs(gt_variant_df, output_variant_df, output_edges_df, absence_threshold=0):
    # Get IDs of mutations
    gt_variant_df['mut_id'] = gt_variant_df.apply((lambda x: '{}:{}:{}:{}'.format(x['CHR'], x['POS'], x['REF'], x['ALT'])), axis=1)
    output_variant_df['mut_id'] = output_variant_df.apply((lambda x: '{}:{}:{}:{}'.format(x['CHR'], x['POS'], x['REF'], x['ALT'])), axis=1)
    
    # Get muts & ccfs
    gt_use = gt_variant_df[['mut_id', 'SAMPLE', 'CLUSTER', 'TRUE_CCF']].drop_duplicates()
    gt_use['CLUSTER_CCF'] = gt_use.groupby(['SAMPLE', 'CLUSTER'])['TRUE_CCF'].transform('mean')
    gt_use['CLUSTER_PRES'] = gt_use['CLUSTER_CCF'] > absence_threshold
    
    output_use = output_variant_df[['mut_id', 'SAMPLE', 'CLUSTER', 'CCF_OBS']].drop_duplicates()
    output_use['CLUSTER_CCF'] = output_use.groupby(['SAMPLE', 'CLUSTER'])['CCF_OBS'].transform('mean')
    output_use['CLUSTER_PRES'] = output_use['CLUSTER_CCF'] > absence_threshold
    
    
    # Merge data and calculate ARI
    compare_data = gt_use[['mut_id', 'CLUSTER_PRES']].merge(output_use[['mut_id', 'CLUSTER_PRES']], on=['mut_id'], suffixes=['_gt', '_output'], how='inner')
    presabs = calculate_metrics(compare_data, 'CLEAN_output', 'CLEAN_gt')
    return presabs

def calculate_metrics(evaluate_df, pred_pres_column, true_pres_column):

    tp = len(evaluate_df[(evaluate_df[true_pres_column] == True) & (evaluate_df[pred_pres_column] == True)])
    fp = len(evaluate_df[(evaluate_df[true_pres_column] == False) & (evaluate_df[pred_pres_column] == True)])
    tn = len(evaluate_df[(evaluate_df[true_pres_column] == False) & (evaluate_df[pred_pres_column] == False)])
    fn = len(evaluate_df[(evaluate_df[true_pres_column] == True) & (evaluate_df[pred_pres_column] == False)])

    precision = 0 if (tp+fp) == 0 else tp/(tp+fp)
    recall = 0 if (tp+fn) == 0 else tp/(tp+fn)
    accuracy = 0 if (tp+tn+fp+fn) == 0 else (tp+tn)/(tp+tn+fp+fn)
    f1 = 0 if (precision+recall) == 0 else (2*precision*recall)/(precision+recall)

    return [accuracy, precision, recall, f1]

## Identification of truncal muts

In [None]:
def eval_presabs(gt_variant_df, gt_edges_df, output_variant_df, output_edges_df):
    # Get IDs of mutations
    gt_variant_df['mut_id'] = gt_variant_df.apply((lambda x: '{}:{}:{}:{}'.format(x['CHR'], x['POS'], x['REF'], x['ALT'])), axis=1)
    output_variant_df['mut_id'] = output_variant_df.apply((lambda x: '{}:{}:{}:{}'.format(x['CHR'], x['POS'], x['REF'], x['ALT'])), axis=1)
    
    # Get truncal muts
    gt_use = gt_variant_df[['mut_id', 'CLUSTER']].drop_duplicates()
    gt_trunk = get_truncal_branch(gt_edges_df, normal_root=True)
    gt_use['TRUNCAL'] = gt_use['CLUSTER'] == gt_trunk
    
    output_use = gt_variant_df[['mut_id', 'CLUSTER']].drop_duplicates()
    output_root = get_truncal_branch(output_edges_df, normal_root=False)
    output_use['TRUNCAL'] = output_use['CLUSTER'] == output_root
    
    
    # Merge data and calculate ARI
    compare_data = gt_use.merge(output_use, on=['mut_id'], suffixes=['_gt', '_output'], how='inner')
    truncal = calculate_metrics(compare_data, 'TRUNCAL_output', 'TRUNCAL_gt')
    return truncal

def get_truncal_branch(edges_df, normal_root=False):
    if normal_root:
        root = np.array(edges_df.Parent)[~np.isin(np.array(edges_df.Parent), np.array(edges_df.Child))][0]
        trunk = edges_df[edges_df.Parent == root].Child.values[0]
    else:
        trunk = np.array(edges_df.Parent)[~np.isin(np.array(edges_df.Parent), np.array(edges_df.Child))]
    return trunk

def calculate_metrics(evaluate_df, pred_truncal_column, true_truncal_column):

    tp = len(evaluate_df[(evaluate_df[true_truncal_column] == True) & (evaluate_df[pred_truncal_column] == True)])
    fp = len(evaluate_df[(evaluate_df[true_truncal_column] == False) & (evaluate_df[pred_truncal_column] == True)])
    tn = len(evaluate_df[(evaluate_df[true_truncal_column] == False) & (evaluate_df[pred_truncal_column] == False)])
    fn = len(evaluate_df[(evaluate_df[true_truncal_column] == True) & (evaluate_df[pred_truncal_column] == False)])

    precision = 0 if (tp+fp) == 0 else tp/(tp+fp)
    recall = 0 if (tp+fn) == 0 else tp/(tp+fn)
    accuracy = 0 if (tp+tn+fp+fn) == 0 else (tp+tn)/(tp+tn+fp+fn)
    f1 = 0 if (precision+recall) == 0 else (2*precision*recall)/(precision+recall)

    return [accuracy, precision, recall, f1]

## Noise

In [None]:
def eval_noise(gt_variant_df, output_variant_df, output_edges_df):
    # Get IDs of mutations
    gt_variant_df['mut_id'] = gt_variant_df.apply((lambda x: '{}:{}:{}:{}'.format(x['CHR'], x['POS'], x['REF'], x['ALT'])), axis=1)
    output_variant_df['mut_id'] = output_variant_df.apply((lambda x: '{}:{}:{}:{}'.format(x['CHR'], x['POS'], x['REF'], x['ALT'])), axis=1)
    
    # Get noise and clean mutations
    gt_use = gt_variant_df[['mut_id', 'CLUSTER', 'NOISE']].drop_duplicates()
    gt_use['CLEAN'] = abs(1 - gt_use['NOISE']).astype(bool)
    output_clusters = output_edges_df[['Parent', 'Child']].unique()
    output_use = output_variant_df[['mut_id', 'CLUSTER']].drop_duplicates()
    output_use['CLEAN'] = np.where(output_use['CLUSTER'].isin(output_clusters), True, False)
    
    
    # Merge data and calculate ARI
    compare_data = gt_use.merge(output_use, on=['mut_id', 'CLUSTER'], suffixes=['_gt', '_output'], how='left')
    compare_data['CLEAN_output'] = compare_data['CLEAN_output'].fillna(False)
    noise = calculate_metrics(compare_data, 'CLEAN_output', 'CLEAN_gt')
    return noise

def calculate_metrics(evaluate_df, pred_clean_column, true_clean_column):

    tp = len(evaluate_df[(evaluate_df[true_clean_column] == True) & (evaluate_df[pred_clean_column] == True)])
    fp = len(evaluate_df[(evaluate_df[true_clean_column] == False) & (evaluate_df[pred_clean_column] == True)])
    tn = len(evaluate_df[(evaluate_df[true_clean_column] == False) & (evaluate_df[pred_clean_column] == False)])
    fn = len(evaluate_df[(evaluate_df[true_clean_column] == True) & (evaluate_df[pred_clean_column] == False)])

    noise = len(evaluate_df[evaluate_df[true_clean_column] == True])
    prop_noise_removed = np.NaN if noise == 0 else tp/noise
    precision = 1 if (tp+fp) == 0 else tp/(tp+fp)
    recall = 1 if (tp+fn) == 0 else tp/(tp+fn)
    accuracy = 1 if (tp+tn+fp+fn) == 0 else (tp+tn)/(tp+tn+fp+fn)
    f1 = np.NaN if (precision+recall) == 0 else (2*precision*recall)/(precision+recall)

    return [prop_noise_removed, accuracy, precision, recall, f1]