# Environment

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import divexplorer 
import pandas as pd
pd.set_option('max_colwidth', None)
import os
import numpy as np

from utils_analysis import filter_itemset_df_by_attributes, slice_by_itemset, \
    plot_true_pred, plotComparisonShapleyValues, plotMultipleSV, plotMultipleSV_4, plotShapleyValue

In [None]:
## Define the minimum support threshold for data subgroups
min_sup = 0.01

# Util Functions

In [None]:
## Function for sorting data cohorts
def sortItemset(x, abbreviations={}):
    x = list(x)
    x.sort()
    x = ", ".join(x)
    for k, v in abbreviations.items():
        x = x.replace(k, v)
    return x

In [None]:
def attributes_in_itemset(itemset, attributes, alls = True):
    """ Check if attributes are in the itemset (all or at least one)
    
    Args:
        itemset (frozenset): the itemset
        attributes (list): list of itemset of interest
        alls (bool): If True, check if ALL attributes of the itemset are the input attributes. 
        If False, check AT LEAST one attribute of the itemset is in the input attributes.
        
    """
    # Avoid returning the empty itemset (i.e., info of entire dataset)
    if itemset == frozenset() and attributes:
        return False
    
    for item in itemset:
        # Get the attribute
        attr_i = item.split("=")[0]
        
        #If True, check if ALL attributes of the itemset are the input attributes.
        if alls:
            # Check if the attribute is present. If not, the itemset is not admitted
            if attr_i not in attributes:
                return False
        else:
            # Check if least one attribute. If yes, return True
            if attr_i in attributes:
                return True
    if alls:
        # All attributes of the itemset are indeed admitted
        return True
    else:
        # Otherwise, it means that we find None
        return False
    
def filter_itemset_df_by_attributes(df: pd.DataFrame, attributes: list, alls = True, itemset_col_name: str = "itemsets") -> pd.DataFrame:
    """Get the set of itemsets that have the attributes in the input list (all or at least one)
    
    Args:
        df (pd.DataFrame): the input itemsets (with their info). 
        attributes (list): list of itemset of interest
        alls (bool): If True, check if ALL attributes of the itemset are the input attributes. 
        If False, check AT LEAST one attribute of the itemset is in the input attributes.
        itemset_col_name (str) : the name of the itemset column, "itemsets" as default
        
    Returns:
        pd.DataFrame: the set of itemsets (with their info)
    """

    return df.loc[df[itemset_col_name].apply(lambda x: attributes_in_itemset(x, attributes, alls = alls))]

In [None]:
## Define abbreviations for plot and visualization
from divexplorer.FP_Divergence import abbreviateDict
abbreviations = {'total_silence': 'tot_silence', \
                  'speaker_id' : 'spkID', \
                  'trimmed': 'trim', \
                  'total_':'tot_', \
                  'speed_rate_word_trimmed': 'speakRate_trim', \
                  'trim_duration': 'trim_dur', \
                  'speed_rate_word':'speakRate', \
                  'speed_rate_char':'speakCharRate', \
                  'duration': 'dur'}

abbreviations_shorter = abbreviations.copy()

In [None]:
n = 3  # Number of subgroups to visualize
K = 15  # Global Shapley values to visualize

# Define targets

In [None]:
## Target for DivExplorer: 'WER'
target_col = 'wer' 
target_metric = 'd_outcome'
target_div = f'd_{target_col}'
t_value_col = 't_value_outcome'
printable_columns = ['support', 'itemsets', 'wer', 'd_wer', 't_value']

In [None]:
## Columns for visualization
remapped_cols = { "outcome": target_col, "d_outcome": target_div, t_value_col: 't_value'}
show_cols = ['support', 'itemsets', target_col, target_div, 'support_count', 'length', 't_value']

In [None]:
## Columns of the df file that we are going to analyze 
signal_cols = ['total_duration', 'trimmed_duration', 'n_words', 
       'speed_rate_word', 'speed_rate_word_trimmed', 'snr', 'spectral_flatness']#, 'total_silence'] 
       
demo_cols = ['speaker_id']

input_cols = signal_cols + demo_cols

# Retrieve Data and Compute Divergence

In [None]:
from divexplorer.FP_DivergenceExplorer import FP_DivergenceExplorer
from divexplorer.FP_Divergence import FP_Divergence

In [None]:
configs = [
    "openai_whisper-base",
    "openai_whisper-base_en",
    "openai_whisper-small",
    "openai_whisper-small_en",
    "openai_whisper-medium",
    "openai_whisper-medium_en",
    "openai_whisper-large-v3",
    "openai_whisper-base_ft",
    "openai_whisper-small_ft",
    "openai_whisper-medium_ft",
    "openai_whisper-large-v3_ft",
    ]

FP_fm_dict = {}
fp_divergence_dict = {}
df_dict = {}

for config in configs:

    print(config)

    ## Read csv file
    if "ft" in config:
        input_file_divexplorer = os.path.join(\
            os.getcwd(), "dataframes", "fine_tune", f"ASR_track2_dev_{config}.csv")
    else:
        input_file_divexplorer = os.path.join(\
                os.getcwd(), "dataframes", "zero_shot", f"ASR_track2_dev_{config}.csv") 
    df = pd.read_csv(input_file_divexplorer, index_col=0)

    ## Discretize the dataframe
    from util_discretization import discretize

    df_discretized = discretize(
        df[input_cols+[target_col]],
        bins=3,
        attributes=input_cols,
        strategy="quantile", 
        round_v = 2,
        min_distinct=3,
    )
    
    ## Replace values with ranges: "low", "medium", "high"
    replace_values = {}

    for i in range(0, len(signal_cols)):
        
        for v in df_discretized[signal_cols[i]].unique():
            if "<=" == v[0:2]:
                replace_values[v] = "low"
            elif ">" == v[0]:
                replace_values[v] = "high"
            elif "("  == v[0] and "]"  == v[-1]:
                replace_values[v] = "medium"
            else:
                raise ValueError(v)

        df_discretized[signal_cols[i]].replace(replace_values, inplace=True)
                
    ## Create dict of Divergence df
    df_dict[config] = df_discretized

    fp_diver = FP_DivergenceExplorer(df_discretized, target_name=target_col)
    FP_fm = fp_diver.getFrequentPatternDivergence(min_support=min_sup, metrics=[target_metric])
        
    FP_fm.rename(columns = remapped_cols, inplace = True)
    FP_fm = FP_fm[show_cols].copy()
    FP_fm['wer'] = round(FP_fm['wer'], 5)
    FP_fm['d_wer'] = round(FP_fm['d_wer'], 5)
    FP_fm['t_value'] = round(FP_fm['t_value'], 2)
    FP_fm_dict[config] = FP_fm
    fp_divergence_dict[config] = FP_Divergence(FP_fm, target_div)

In [None]:
## Compute WER for each config
from jiwer import wer

for config in configs:

        print(config)
        
        if "ft" in config:
                input_file_divexplorer = os.path.join(\
                        os.getcwd(), "dataframes", "fine_tune", f"ASR_track2_dev_{config}.csv")
        else:
                input_file_divexplorer = os.path.join(\
                        os.getcwd(), "dataframes", "zero_shot", f"ASR_track2_dev_{config}.csv") 
        df = pd.read_csv(input_file_divexplorer, index_col=0)
        print(df[target_col].mean())
        print("-------------")

In [None]:
WHISPER_BASE_ZS_WER = 92.457
WHISPER_BASE_EN_ZS_WER = 89.997
WHISPER_BASE_FT_WER = 77.444

WHISPER_SMALL_ZS_WER = 87.446
WHISPER_SMALL_EN_ZS_WER = 85.009
WHISPER_SMALL_FT_WER = 69.975

WHISPER_MEDIUM_ZS_WER = 82.366
WHISPER_MEDIUM_EN_ZS_WER = 80.000
WHISPER_MEDIUM_FT_WER = 60.028

WHISPER_LARGE_ZS_WER = 75.024
WHISPER_LARGE_FT_WER = 49.996

# Divergence Whisper Base

## Whisper Base (EN)

In [None]:
## Compute the divergence for Whisper Base (English)
config = 'openai_whisper-base_en'
fp_divergence_i = fp_divergence_dict[config]

In [None]:
from copy import deepcopy

## Retrieve Most Divergent Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_BASE_EN_ZS_WER)).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_i.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

In [None]:
## Retrieve Top Performing Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)[::-1] 
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = (pr["wer"] - WHISPER_BASE_EN_ZS_WER).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

In [None]:
## Compute average divergence with std
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.copy()
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_BASE_EN_ZS_WER)).round(3)

print(f"Average WER: {pr['d_wer'].mean()} +/- {pr['d_wer'].std()}")

In [None]:
# Individual and Global Divergence
# print("---------- Individual Divergence ----------")
# individual_divergence = fp_divergence_i.getFItemsetsDivergence()[1]
# individual_divergence = {k:v*100 for k,v in individual_divergence.items()}
# plotShapleyValue(shapley_values=individual_divergence, 
#                 sizeFig=(10,10), labelsize=15, titlesize=15)

print("---------- Global Divergence ----------")
global_item_divergence_whisper_base = fp_divergence_i.computeGlobalShapleyValue()

topK_global_whisper_base = {k:v*100 for k,v in global_item_divergence_whisper_base.items() 
                    if k in sorted(global_item_divergence_whisper_base, 
                    key=lambda x: abs(global_item_divergence_whisper_base[x]))[::-1][:K]}

plotShapleyValue(shapley_values=topK_global_whisper_base, 
                sizeFig=(10,10), labelsize=22, titlesize=22, 
                saveFig=True, nameFig="gsv_whisper_base_en.pdf")

## Whisper Base (Multilingual)

In [None]:
## Compute the divergence for Whisper Base (Multilingual)
config = 'openai_whisper-base'
fp_divergence_i = fp_divergence_dict[config]

In [None]:
from copy import deepcopy

## Retrieve Most Divergent Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_BASE_ZS_WER)).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_i.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

In [None]:
## Retrieve Top Performing Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)[::-1] 
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = (pr["wer"] - WHISPER_BASE_ZS_WER).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

In [None]:
# Individual and Global Divergence
# print("---------- Individual Divergence ----------")
# individual_divergence = fp_divergence_i.getFItemsetsDivergence()[1]
# individual_divergence = {k:v*100 for k,v in individual_divergence.items()}
# plotShapleyValue(shapley_values=individual_divergence, 
#                 sizeFig=(10,10), labelsize=15, titlesize=15)

print("---------- Global Divergence ----------")
global_item_divergence_whisper_base_m = fp_divergence_i.computeGlobalShapleyValue()

topK_global_whisper_base_m = {k:v*100 for k,v in global_item_divergence_whisper_base_m.items() 
                    if k in sorted(global_item_divergence_whisper_base_m, 
                    key=lambda x: abs(global_item_divergence_whisper_base_m[x]))[::-1][:K]}

plotShapleyValue(shapley_values=topK_global_whisper_base_m, 
                sizeFig=(10,10), labelsize=22, titlesize=22)

## Whisper Base FT 

In [None]:
## Compute the divergence for Whisper Base (FT)
config = 'openai_whisper-base_ft'
fp_divergence_i = fp_divergence_dict[config]

In [None]:
from copy import deepcopy

## Retrieve Most Divergent Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_BASE_FT_WER)).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_i.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

In [None]:
## Retrieve Top Performing Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)[::-1] 
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = (pr["wer"] - WHISPER_BASE_FT_WER).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

In [None]:
## Compute average divergence with std
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.copy()
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_BASE_FT_WER)).round(3)

print(f"Average WER: {pr['d_wer'].mean()} +/- {pr['d_wer'].std()}")

In [None]:
# Individual and Global Divergence
# print("---------- Individual Divergence ----------")
# individual_divergence = fp_divergence_i.getFItemsetsDivergence()[1]
# individual_divergence = {k:v*100 for k,v in individual_divergence.items()}
# plotShapleyValue(shapley_values=individual_divergence, 
#                 sizeFig=(10,10), labelsize=15, titlesize=15)

print("---------- Global Divergence ----------")
global_item_divergence_whisper_base_ft = fp_divergence_i.computeGlobalShapleyValue()

topK_global_whisper_base_ft = {k:v*100 for k,v in global_item_divergence_whisper_base_ft.items() 
                    if k in sorted(global_item_divergence_whisper_base_ft, 
                    key=lambda x: abs(global_item_divergence_whisper_base_ft[x]))[::-1][:K]}

plotShapleyValue(shapley_values=topK_global_whisper_base_ft, 
                sizeFig=(10,10), labelsize=22, titlesize=22)

# Divergence Whisper Small

## Whisper Small (EN)

In [None]:
## Compute the divergence for Whisper Small (English)
config = 'openai_whisper-small_en'
fp_divergence_i = fp_divergence_dict[config]

In [None]:
from copy import deepcopy

## Retrieve Most Divergent Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_SMALL_EN_ZS_WER)).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_i.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

In [None]:
## Retrieve Top Performing Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)[::-1] 
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = (pr["wer"] - WHISPER_SMALL_EN_ZS_WER).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

In [None]:
## Compute average divergence with std
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.copy()
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_SMALL_EN_ZS_WER)).round(3)

print(f"Average WER: {pr['d_wer'].mean()} +/- {pr['d_wer'].std()}")

In [None]:
# Individual and Global Divergence
# print("---------- Individual Divergence ----------")
# individual_divergence = fp_divergence_i.getFItemsetsDivergence()[1]
# individual_divergence = {k:v*100 for k,v in individual_divergence.items()}
# plotShapleyValue(shapley_values=individual_divergence, 
#                 sizeFig=(10,10), labelsize=17, titlesize=17)

print("---------- Global Divergence ----------")
global_item_divergence_whisper_small = fp_divergence_i.computeGlobalShapleyValue()

topK_global_whisper_small = {k:v*100 for k,v in global_item_divergence_whisper_small.items() 
                    if k in sorted(global_item_divergence_whisper_small, 
                    key=lambda x: abs(global_item_divergence_whisper_small[x]))[::-1][:K]}

plotShapleyValue(shapley_values=topK_global_whisper_small, 
                sizeFig=(10,10), labelsize=22, titlesize=22)

## Whisper Small (Multilingual)

In [None]:
## Compute the divergence for Whisper Small (Multilingual)
config = 'openai_whisper-small'
fp_divergence_i = fp_divergence_dict[config]

In [None]:
from copy import deepcopy

## Retrieve Most Divergent Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_SMALL_ZS_WER)).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_i.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

In [None]:
## Retrieve Top Performing Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)[::-1] 
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = (pr["wer"] - WHISPER_SMALL_ZS_WER).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

In [None]:
# Individual and Global Divergence
# print("---------- Individual Divergence ----------")
# individual_divergence = fp_divergence_i.getFItemsetsDivergence()[1]
# individual_divergence = {k:v*100 for k,v in individual_divergence.items()}
# plotShapleyValue(shapley_values=individual_divergence, 
#                 sizeFig=(10,10), labelsize=17, titlesize=17)

print("---------- Global Divergence ----------")
global_item_divergence_whisper_small_m = fp_divergence_i.computeGlobalShapleyValue()

topK_global_whisper_small_m = {k:v*100 for k,v in global_item_divergence_whisper_small_m.items() 
                    if k in sorted(global_item_divergence_whisper_small_m, 
                    key=lambda x: abs(global_item_divergence_whisper_small_m[x]))[::-1][:K]}

plotShapleyValue(shapley_values=topK_global_whisper_small_m, 
                sizeFig=(10,10), labelsize=22, titlesize=22)

## Whisper Small FT

In [None]:
## Compute the divergence for Whisper Small (FT)
config = 'openai_whisper-small_ft'
fp_divergence_i = fp_divergence_dict[config]

In [None]:
from copy import deepcopy

## Retrieve Most Divergent Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_SMALL_FT_WER)).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_i.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

In [None]:
## Retrieve Top Performing Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)[::-1] 
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = (pr["wer"] - WHISPER_SMALL_FT_WER).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

In [None]:
## Compute average divergence with std
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.copy()
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_SMALL_FT_WER)).round(3)

print(f"Average WER: {pr['d_wer'].mean()} +/- {pr['d_wer'].std()}")

In [None]:
# Individual and Global Divergence
# print("---------- Individual Divergence ----------")
# individual_divergence = fp_divergence_i.getFItemsetsDivergence()[1]
# individual_divergence = {k:v*100 for k,v in individual_divergence.items()}
# plotShapleyValue(shapley_values=individual_divergence, 
#                 sizeFig=(10,10), labelsize=17, titlesize=17)

print("---------- Global Divergence ----------")
global_item_divergence_whisper_small_m_ft = fp_divergence_i.computeGlobalShapleyValue()

topK_global_whisper_small_m_ft = {k:v*100 for k,v in global_item_divergence_whisper_small_m_ft.items() 
                    if k in sorted(global_item_divergence_whisper_small_m_ft, 
                    key=lambda x: abs(global_item_divergence_whisper_small_m_ft[x]))[::-1][:K]}

plotShapleyValue(shapley_values=topK_global_whisper_small_m_ft, 
                sizeFig=(10,10), labelsize=22, titlesize=22)

# Divergence Whisper Medium

## Whisper Medium (EN)

In [None]:
## Compute the divergence for Whisper Medium (EN)
config = 'openai_whisper-medium_en'
fp_divergence_i = fp_divergence_dict[config]

In [None]:
from copy import deepcopy

## Retrieve Most Divergent Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_MEDIUM_EN_ZS_WER)).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_i.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16, 
                    saveFig=True, nameFig="shapley_values_whisper_medium_en_negative.pdf")

In [None]:
## Retrieve Top Performing Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)[::-1] 
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = (pr["wer"] - WHISPER_MEDIUM_EN_ZS_WER).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_i.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16, negative=True,
                    saveFig=True, nameFig="shapley_values_whisper_medium_en_positive.pdf")

In [None]:
## Compute average divergence with std
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.copy()
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_MEDIUM_EN_ZS_WER)).round(3)

print(f"Average WER: {pr['d_wer'].mean()} +/- {pr['d_wer'].std()}")

In [None]:
# Individual and Global Divergence
# print("---------- Individual Divergence ----------")
# individual_divergence = fp_divergence_i.getFItemsetsDivergence()[1]
# individual_divergence = {k:v*100 for k,v in individual_divergence.items()}
# plotShapleyValue(shapley_values=individual_divergence, 
#                 sizeFig=(10,10), labelsize=17, titlesize=17)

print("---------- Global Divergence ----------")
global_item_divergence_whisper_medium = fp_divergence_i.computeGlobalShapleyValue()

topK_global_whisper_medium = {k:v*100 for k,v in global_item_divergence_whisper_medium.items() 
                    if k in sorted(global_item_divergence_whisper_medium, 
                    key=lambda x: abs(global_item_divergence_whisper_medium[x]))[::-1][:K]}

plotShapleyValue(shapley_values=topK_global_whisper_medium, 
                sizeFig=(10,10), labelsize=22, titlesize=22)

## Whisper Medium (Multilingual)

In [None]:
## Compute the divergence for Whisper Medium (Multilingual)
config = 'openai_whisper-medium'
fp_divergence_i = fp_divergence_dict[config]

In [None]:
from copy import deepcopy

## Retrieve Most Divergent Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_MEDIUM_ZS_WER)).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_i.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

In [None]:
## Retrieve Top Performing Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)[::-1] 
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = (pr["wer"] - WHISPER_MEDIUM_ZS_WER).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

In [None]:
# Individual and Global Divergence
# print("---------- Individual Divergence ----------")
# individual_divergence = fp_divergence_i.getFItemsetsDivergence()[1]
# individual_divergence = {k:v*100 for k,v in individual_divergence.items()}
# plotShapleyValue(shapley_values=individual_divergence, 
#                 sizeFig=(10,10), labelsize=17, titlesize=17)

print("---------- Global Divergence ----------")
global_item_divergence_whisper_medium_m = fp_divergence_i.computeGlobalShapleyValue()

topK_global_whisper_medium_ = {k:v*100 for k,v in global_item_divergence_whisper_medium_m.items() 
                    if k in sorted(global_item_divergence_whisper_medium_m, 
                    key=lambda x: abs(global_item_divergence_whisper_medium_m[x]))[::-1][:K]}

plotShapleyValue(shapley_values=topK_global_whisper_medium_, 
                sizeFig=(10,10), labelsize=22, titlesize=22)

## Whisper Medium FT

In [None]:
## Compute the divergence for Whisper Medium (FT)
config = 'openai_whisper-medium_ft'
fp_divergence_i = fp_divergence_dict[config]

In [None]:
from copy import deepcopy

## Retrieve Most Divergent Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_MEDIUM_FT_WER)).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_i.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

In [None]:
## Retrieve Top Performing Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)[::-1] 
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = (pr["wer"] - WHISPER_MEDIUM_FT_WER).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

In [None]:
## Compute average divergence with std
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.copy()
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_MEDIUM_FT_WER)).round(3)

print(f"Average WER: {pr['d_wer'].mean()} +/- {pr['d_wer'].std()}")

In [None]:
# Individual and Global Divergence
# print("---------- Individual Divergence ----------")
# individual_divergence = fp_divergence_i.getFItemsetsDivergence()[1]
# individual_divergence = {k:v*100 for k,v in individual_divergence.items()}
# plotShapleyValue(shapley_values=individual_divergence, 
#                 sizeFig=(10,10), labelsize=17, titlesize=17)

print("---------- Global Divergence ----------")
global_item_divergence_whisper_medium_ft = fp_divergence_i.computeGlobalShapleyValue()

topK_global_whisper_medium_ft = {k:v*100 for k,v in global_item_divergence_whisper_medium_ft.items() 
                    if k in sorted(global_item_divergence_whisper_medium_ft, 
                    key=lambda x: abs(global_item_divergence_whisper_medium_ft[x]))[::-1][:K]}

plotShapleyValue(shapley_values=topK_global_whisper_medium_ft, 
                sizeFig=(10,10), labelsize=22, titlesize=22)

# Divergence Whisper Large

## Whisper Large (Multilingual)

In [None]:
## Compute the divergence for Whisper Large
config = 'openai_whisper-large-v3'
fp_divergence_i = fp_divergence_dict[config]

In [None]:
from copy import deepcopy

## Retrieve Most Divergent Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_LARGE_ZS_WER)).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_i.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

In [None]:
## Retrieve Top Performing Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)[::-1] 
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = (pr["wer"] - WHISPER_LARGE_ZS_WER).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

In [None]:
## Compute average divergence with std
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.copy()
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_LARGE_ZS_WER)).round(3)

print(f"Average WER: {pr['d_wer'].mean()} +/- {pr['d_wer'].std()}")

In [None]:
# Individual and Global Divergence
# print("---------- Individual Divergence ----------")
# individual_divergence = fp_divergence_i.getFItemsetsDivergence()[1]
# individual_divergence = {k:v*100 for k,v in individual_divergence.items()}
# plotShapleyValue(shapley_values=individual_divergence, 
#                 sizeFig=(10,10), labelsize=17, titlesize=17)

print("---------- Global Divergence ----------")
global_item_divergence_whisperl = fp_divergence_i.computeGlobalShapleyValue()

topK_global_whisperl = {k:v*100 for k,v in global_item_divergence_whisperl.items() 
                    if k in sorted(global_item_divergence_whisperl, 
                    key=lambda x: abs(global_item_divergence_whisperl[x]))[::-1][:K]}

plotShapleyValue(shapley_values=topK_global_whisperl, 
                sizeFig=(10,10),labelsize=22, titlesize=22)

## Whisper Large FT

In [None]:
## Compute the divergence for Whisper Large (FT)
config = 'openai_whisper-large-v3_ft'
fp_divergence_i = fp_divergence_dict[config]

In [None]:
from copy import deepcopy

## Retrieve Most Divergent Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_LARGE_FT_WER)).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_i.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

In [None]:
## Retrieve Top Performing Itemsets 
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)[::-1] 
pr = FPdiv.head(n).copy()
pr["support"] = pr["support"].round(2)
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = (pr["wer"] - WHISPER_LARGE_FT_WER).round(3)
pr_l = pr[[ "itemsets", "support", "wer", "d_wer", "t_value"]].copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
display(pr_l)

In [None]:
## Compute average divergence with std
FPdiv = fp_divergence_i.getDivergence(th_redundancy=0.0)
pr = FPdiv.copy()
pr["wer"] = (pr["wer"]*100).round(3)
pr["d_wer"] = ((pr["wer"] - WHISPER_LARGE_FT_WER)).round(3)

print(f"Average WER: {pr['d_wer'].mean()} +/- {pr['d_wer'].std()}")

In [None]:
# Individual and Global Divergence
# print("---------- Individual Divergence ----------")
# individual_divergence = fp_divergence_i.getFItemsetsDivergence()[1]
# individual_divergence = {k:v*100 for k,v in individual_divergence.items()}
# plotShapleyValue(shapley_values=individual_divergence, 
#                 sizeFig=(10,10), labelsize=17, titlesize=17)

print("---------- Global Divergence ----------")
global_item_divergence_whisperl_ft = fp_divergence_i.computeGlobalShapleyValue()

topK_global_whisperl_ft = {k:v*100 for k,v in global_item_divergence_whisperl_ft.items() 
                    if k in sorted(global_item_divergence_whisperl_ft, 
                    key=lambda x: abs(global_item_divergence_whisperl_ft[x]))[::-1][:K]}

plotShapleyValue(shapley_values=topK_global_whisperl_ft, 
                sizeFig=(10,10),labelsize=22, titlesize=22)

# Divergence difference Whisper Base vs Whisper Large

In [None]:
## Compute the divergence for Whisper Large-v3
config = 'openai_whisper-small'
fp_divergence_i = fp_divergence_dict[config]
FPdiv_wl = fp_divergence_i.getDivergence(th_redundancy=None).copy()
wlarge = FPdiv_wl.set_index("itemsets")

## Compute the divergence for Whisper Base EN
config = 'openai_whisper-base_en'
fp_divergence_i = fp_divergence_dict[config]
FPdiv_wb = fp_divergence_i.getDivergence(th_redundancy=None).copy()
wbase = FPdiv_wb.set_index("itemsets")

## Merge the df
merged = wbase.join(wlarge, lsuffix='_base', rsuffix='_large')
merged = merged.rename(columns={'support_large': 'support'})

In [None]:
## Compute difference in performance between the models
diff = "d_difference"
merged[diff] = merged["wer_large"] - merged["wer_base"]
merged["difference"] = merged["wer_large"] - merged["wer_base"]

In [None]:
## Create "Gain Base-Large Whisper" df and compute divergence
base_large_gain_df = merged[['support', 'wer_large', 'd_wer_large', 't_value_large', \
       'support_count_large', 'length_large', 't_value_large'] \
       + [diff, "difference", "wer_base", "t_value_base"]]
base_large_gain_df = base_large_gain_df.rename(columns={'length_large':'length'})
base_large_gain_df = base_large_gain_df.reset_index()

fp_divergence_difference = FP_Divergence(base_large_gain_df, diff)
diff_nr = fp_divergence_difference.getDivergence(th_redundancy=0.0) 

In [None]:
## Select meaningful columns 
sel = diff_nr.itemsets.values
compare_performance = merged.loc[sel].sort_values(diff, ascending = False)
cols = ['d_difference', 'wer_base', 'wer_large', 'support', 't_value_base', 't_value_large']

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")

## plot the distribution of the gain in performance
list_diff_pos = list(compare_performance[compare_performance['d_difference'] <= 0.0].d_difference)
list_diff_pos = [i * 100 for i in list_diff_pos]
fig, ax = plt.subplots(figsize=(6, 4))
sns.histplot(list_diff_pos, bins=10, kde=False, ax=ax, color='C01', palette="colorblind")
list_diff_neg = list(compare_performance[compare_performance['d_difference'] >= 0.0].d_difference)
list_diff_neg = [i * 100 for i in list_diff_neg]
sns.histplot(list_diff_neg, bins=2, kde=False, ax=ax, color='#83C4FA', palette="colorblind")

ax.set_xlabel("Gap in performance", fontsize=28)
ax.set_ylabel(r"# Subgroups", fontsize=28)
plt.xticks(fontsize=20)
plt.yticks(fontsize=20)
plt.tight_layout()
# plt.savefig("distribution_gain_whisper_base_large.pdf")

## Stats

In [None]:
## Compute meaningful statistics:
diff_nr_0 = fp_divergence_difference.getDivergence(th_redundancy=None)

# Percentage of itemsets for which performance are equal for the two model
print("Equal")
print(round(100*(diff_nr_0.loc[(diff_nr_0[diff])==0].shape[0]/diff_nr_0.shape[0]),10))

# Percentage of itemsets for which performance are lower for Whisper Large
# (The higher the WER the lower the performance)
print("Greater")
print(round(100*(diff_nr_0.loc[(diff_nr_0[diff])>0].shape[0]/diff_nr_0.shape[0]), 10))
greater = False if round(100*(diff_nr_0.loc[(diff_nr_0[diff])>0].shape[0]/diff_nr_0.shape[0]), 10) == 0.0 else True
    
# Percentage of itemsets for which performance are greater for Whisper Large
# (The lower the WER the greater the performance)
print("Lower")
print(round(100*(diff_nr_0.loc[(diff_nr_0[diff])<0].shape[0]/diff_nr_0.shape[0]), 10))

## Gain > 0

In [None]:
## Retrieve the data cohorts for which Whisper Large performs worse than Whisper Base
pr = compare_performance[cols].head(20).reset_index()
pr["support"] = pr["support"].round(2)
pr["wer_large"] = (pr["wer_large"]*100).round(2)
pr["wer_base"] = (pr["wer_base"]*100).round(2)
pr["d_difference"] = (pr["d_difference"]*100).round(2)

## Abbreviate itemset names for better visualization
pr_l = pr.head(2).copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
pr_l

In [None]:
## Compute Shapley Values for a given itemset
if greater and len(pr) > 0:
    itemset_1 = pr.iloc[1].itemsets
    itemset_shap = fp_divergence_difference.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

## Gain < 0 

In [None]:
## Retrieve the data cohorts for which Whisper Large performs better than Whisper Base
pr = compare_performance[cols][::-1].head(20).reset_index()
pr["support"] = pr["support"].round(2)
pr["wer_large"] = (pr["wer_large"]*100).round(2)
pr["wer_base"] = (pr["wer_base"]*100).round(2)
pr["d_difference"] = (pr["d_difference"]*100).round(2)

## Abbreviate itemset names for better visualization
pr_l = pr.head(2).copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
pr_l

In [None]:
## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_difference.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

## Gain = 0 

In [None]:
## Retrieve the data cohorts for which Whisper Large performs equal to Whisper Base
pr = merged.loc[ fp_divergence_difference.getDivergence(th_redundancy=0.0).itemsets.values][cols].reset_index()
pr["support_large"] = pr["support"].round(2)
pr["wer_large"] = (pr["wer_large"]*100).round(2)
pr["wer_base"] = (pr["wer_base"]*100).round(2)
pr["d_difference"] = (pr["d_difference"]*100).round(2)
pr = pr.loc[abs(pr["d_difference"])==0]
pr = pr.sort_values("wer_large").sort_values("wer_large")

## Abbreviate itemset names for better visualization
pr_l = pr.head(2).copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
pr_l

In [None]:
## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_difference.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

## Global Shapley value

In [None]:
## Compute the top-K global shapley values related to the gain in performance between Whisper Base and Whisper Large
global_item_divergence_wb_wl = fp_divergence_difference.computeGlobalShapleyValue()

K = 15
topK_global_wb_wl = {k:v for k,v in global_item_divergence_wb_wl.items() \
                        if k in sorted(global_item_divergence_wb_wl, 
                        key=lambda x: abs(global_item_divergence_wb_wl[x]))[::-1][:K]}

In [None]:
## Plot and Save the image 
sizeFig = (3.2,4)
labelsize = 16
titlesize = 16

topK_global_wb_wl_abbr = abbreviateDict(topK_global_wb_wl, abbreviations)
topK_global_wb_wl_abbr = {k:v*100 for k,v in topK_global_wb_wl_abbr.items()}
name_fig = "global_shapley_gain_wb_wl.pdf"
plotShapleyValue(shapley_values=topK_global_wb_wl_abbr, \
                sizeFig=sizeFig, labelsize=labelsize, titlesize=titlesize, \
                title=r"$\tilde{\Delta}^g_{gain} WhisperB - WhisperL$",
                nameFig=name_fig, saveFig=False)

# Divergence difference Whisper Base vs Whisper Base ft.

In [None]:
## Compute the divergence for Whisper Base Ft
config = 'openai_whisper-large-v3_ft'
fp_divergence_i = fp_divergence_dict[config]
FPdiv_wl = fp_divergence_i.getDivergence(th_redundancy=None).copy()
wbase_ft = FPdiv_wl.set_index("itemsets")

## Compute the divergence for Whisper Base EN
config = 'openai_whisper-large-v3'
fp_divergence_i = fp_divergence_dict[config]
FPdiv_wb = fp_divergence_i.getDivergence(th_redundancy=None).copy()
wbase = FPdiv_wb.set_index("itemsets")

## Merge the df
merged = wbase.join(wbase_ft, lsuffix='_base', rsuffix='_base_ft')
merged = merged.rename(columns={'support_base_ft': 'support'})

In [None]:
## Compute difference in performance between the models
diff = "d_difference"
merged[diff] = merged["wer_base_ft"] - merged["wer_base"]
merged["difference"] = merged["wer_base_ft"] - merged["wer_base"]

In [None]:
## Create "Gain Base-Large Whisper" df and compute divergence
base_baseft_gain_df = merged[['support', 'wer_base_ft', 'd_wer_base_ft', 't_value_base_ft', \
       'support_count_base_ft', 'length_base_ft', 't_value_base_ft'] \
       + [diff, "difference", "wer_base", "t_value_base"]]
base_baseft_gain_df = base_baseft_gain_df.rename(columns={'length_base_ft':'length'})
base_baseft_gain_df = base_baseft_gain_df.reset_index()

fp_divergence_difference = FP_Divergence(base_baseft_gain_df, diff)
diff_nr = fp_divergence_difference.getDivergence(th_redundancy=0.0) 

In [None]:
## Select meaningful columns 
sel = diff_nr.itemsets.values
compare_performance = merged.loc[sel].sort_values(diff, ascending = False)
cols = ['d_difference', 'wer_base', 'wer_base_ft', 'support', 't_value_base', 't_value_base_ft']

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")

## plot the distribution of the gain in performance
list_diff_pos = list(compare_performance[compare_performance['d_difference'] <= 0.0].d_difference)
list_diff_pos = [i * 100 for i in list_diff_pos]
fig, ax = plt.subplots(figsize=(6, 4))
sns.histplot(list_diff_pos, bins=10, kde=False, ax=ax, color='C01', palette="colorblind")
list_diff_neg = list(compare_performance[compare_performance['d_difference'] >= 0.0].d_difference)
list_diff_neg = [i * 100 for i in list_diff_neg]
sns.histplot(list_diff_neg, bins=5, kde=False, ax=ax, color='#83C4FA', palette="colorblind")

ax.set_xlabel("Gap in performance", fontsize=28)
ax.set_ylabel(r"# Subgroups", fontsize=28)
plt.xticks(fontsize=20)
plt.yticks(fontsize=20)
plt.tight_layout()
# plt.savefig("distribution_gain_whisper_base_base_ft.pdf")

## Stats

In [None]:
## Compute meaningful statistics:
diff_nr_0 = fp_divergence_difference.getDivergence(th_redundancy=None)

# Percentage of itemsets for which performance are equal for the two model
print("Equal")
print(round(100*(diff_nr_0.loc[(diff_nr_0[diff])==0].shape[0]/diff_nr_0.shape[0]),10))

# Percentage of itemsets for which performance are lower for Whisper Large
# (The higher the WER the lower the performance)
print("Greater")
print(round(100*(diff_nr_0.loc[(diff_nr_0[diff])>0].shape[0]/diff_nr_0.shape[0]), 10))
greater = False if round(100*(diff_nr_0.loc[(diff_nr_0[diff])>0].shape[0]/diff_nr_0.shape[0]), 10) == 0.0 else True
    
# Percentage of itemsets for which performance are greater for Whisper Large
# (The lower the WER the greater the performance)
print("Lower")
print(round(100*(diff_nr_0.loc[(diff_nr_0[diff])<0].shape[0]/diff_nr_0.shape[0]), 10))

## Gain > 0

In [None]:
if greater:
## Retrieve the data cohorts for which Whisper Large performs worse than Whisper Base
pr = compare_performance[cols].head(20).reset_index()
pr["support"] = pr["support"].round(2)
pr["wer_large"] = (pr["wer_large"]*100).round(2)
pr["wer_base"] = (pr["wer_base"]*100).round(2)
pr["d_difference"] = (pr["d_difference"]*100).round(2)

## Abbreviate itemset names for better visualization
pr_l = pr.head(2).copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
pr_l

In [None]:
## Compute Shapley Values for a given itemset
if greater and len(pr) > 0:
    itemset_1 = pr.iloc[1].itemsets
    itemset_shap = fp_divergence_difference.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

## Gain < 0 

In [None]:
## Retrieve the data cohorts for which Whisper Base-Ft performs better than Whisper Base
pr = compare_performance[cols][::-1].head(20).reset_index()
pr["support"] = pr["support"].round(2)
pr["wer_base_ft"] = (pr["wer_base_ft"]*100).round(2)
pr["wer_base"] = (pr["wer_base"]*100).round(2)
pr["d_difference"] = (pr["d_difference"]*100).round(2)

## Abbreviate itemset names for better visualization
pr_l = pr.head(2).copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
pr_l

In [None]:
## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_difference.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

## Gain = 0 

In [None]:
## Retrieve the data cohorts for which Whisper Large performs equal to Whisper Base
pr = merged.loc[ fp_divergence_difference.getDivergence(th_redundancy=0.0).itemsets.values][cols].reset_index()
pr["support_large"] = pr["support"].round(2)
pr["wer_large"] = (pr["wer_large"]*100).round(2)
pr["wer_base"] = (pr["wer_base"]*100).round(2)
pr["d_difference"] = (pr["d_difference"]*100).round(2)
pr = pr.loc[abs(pr["d_difference"])==0]
pr = pr.sort_values("wer_large").sort_values("wer_large")

## Abbreviate itemset names for better visualization
pr_l = pr.head(2).copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
pr_l

In [None]:
## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_difference.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

## Global Shapley value

In [None]:
## Compute the top-K global shapley values related to the gain in performance between Whisper Base and Whisper Base Ft
global_item_divergence_wb_wbft = fp_divergence_difference.computeGlobalShapleyValue()

K = 15
topK_global_wb_wbft = {k:v for k,v in global_item_divergence_wb_wbft.items() \
                        if k in sorted(global_item_divergence_wb_wbft, 
                        key=lambda x: abs(global_item_divergence_wb_wbft[x]))[::-1][:K]}

In [None]:
## Plot and Save the image 
sizeFig = (3.2,4)
labelsize = 16
titlesize = 16

topK_global_wb_wbft_abbr = abbreviateDict(topK_global_wb_wbft, abbreviations)
topK_global_wb_wbft_abbr = {k:v*100 for k,v in topK_global_wb_wbft_abbr.items()}
name_fig = "global_shapley_gain_wb_wbft.pdf"
plotShapleyValue(shapley_values=topK_global_wb_wbft_abbr, \
                sizeFig=sizeFig, labelsize=labelsize, titlesize=titlesize, \
                title=r"$\tilde{\Delta}^g_{gain} WhisperB - WhisperB FT$",
                nameFig=name_fig, saveFig=False)

# Divergence difference Whisper Medium EN vs Whisper Medium Multilingual

In [None]:
## Compute the divergence for Whisper Base
config = 'openai_whisper-medium'
fp_divergence_i = fp_divergence_dict[config]
FPdiv_wl = fp_divergence_i.getDivergence(th_redundancy=None).copy()
wbase_m = FPdiv_wl.set_index("itemsets")

## Compute the divergence for Whisper Base EN
config = 'openai_whisper-medium_en'
fp_divergence_i = fp_divergence_dict[config]
FPdiv_wb = fp_divergence_i.getDivergence(th_redundancy=None).copy()
wbase = FPdiv_wb.set_index("itemsets")

## Merge the df
merged = wbase.join(wbase_m, lsuffix='_base', rsuffix='_base_m')
merged = merged.rename(columns={'support_base_m': 'support'})

In [None]:
## Compute difference in performance between the models
diff = "d_difference"
merged[diff] = merged["wer_base_m"] - merged["wer_base"]
merged["difference"] = merged["wer_base_m"] - merged["wer_base"]

In [None]:
## Create "Gain Base-BaseM Whisper" df and compute divergence
base_basem_gain_df = merged[['support', 'wer_base_m', 'd_wer_base_m', 't_value_base_m', \
       'support_count_base_m', 'length_base_m', 't_value_base_m'] \
       + [diff, "difference", "wer_base", "t_value_base"]]
base_basem_gain_df = base_basem_gain_df.rename(columns={'length_base_m':'length'})
base_basem_gain_df = base_basem_gain_df.reset_index()

fp_divergence_difference = FP_Divergence(base_basem_gain_df, diff)
diff_nr = fp_divergence_difference.getDivergence(th_redundancy=0.0) 

In [None]:
## Select meaningful columns 
sel = diff_nr.itemsets.values
compare_performance = merged.loc[sel].sort_values(diff, ascending = False)
cols = ['d_difference', 'wer_base', 'wer_base_m', 'support', 't_value_base', 't_value_base_m']

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")

## plot the distribution of the gain in performance
list_diff_pos = list(compare_performance[compare_performance['d_difference'] <= 0.0].d_difference)
list_diff_pos = [i * 100 for i in list_diff_pos]
fig, ax = plt.subplots(figsize=(6, 4))
sns.histplot(list_diff_pos, bins=2, kde=False, ax=ax, color='C01', palette="colorblind")
list_diff_neg = list(compare_performance[compare_performance['d_difference'] >= 0.0].d_difference)
list_diff_neg = [i * 100 for i in list_diff_neg]
sns.histplot(list_diff_neg, bins=9, kde=False, ax=ax, color='#83C4FA', palette="colorblind")

ax.set_xlabel("Gap in performance", fontsize=28)
ax.set_ylabel(r"# Subgroups", fontsize=28)
plt.xticks(fontsize=20)
plt.yticks(fontsize=20)
plt.tight_layout()
plt.savefig("distribution_gain_whisper_medium_medium_m.pdf")

## Stats

In [None]:
## Compute meaningful statistics:
diff_nr_0 = fp_divergence_difference.getDivergence(th_redundancy=None)

# Percentage of itemsets for which performance are equal for the two model
print("Equal")
print(round(100*(diff_nr_0.loc[(diff_nr_0[diff])==0].shape[0]/diff_nr_0.shape[0]),10))

# Percentage of itemsets for which performance are lower for Whisper Large
# (The higher the WER the lower the performance)
print("Greater")
print(round(100*(diff_nr_0.loc[(diff_nr_0[diff])>0].shape[0]/diff_nr_0.shape[0]), 10))
greater = False if round(100*(diff_nr_0.loc[(diff_nr_0[diff])>0].shape[0]/diff_nr_0.shape[0]), 10) == 0.0 else True
    
# Percentage of itemsets for which performance are greater for Whisper Large
# (The lower the WER the greater the performance)
print("Lower")
print(round(100*(diff_nr_0.loc[(diff_nr_0[diff])<0].shape[0]/diff_nr_0.shape[0]), 10))

## Gain > 0

In [None]:
## Retrieve the data cohorts for which Whisper BaseM performs worse than Whisper Base
pr = compare_performance[cols].head(20).reset_index()
pr["support"] = pr["support"].round(2)
pr["wer_base_m"] = (pr["wer_base_m"]*100).round(2)
pr["wer_base"] = (pr["wer_base"]*100).round(2)
pr["d_difference"] = (pr["d_difference"]*100).round(2)

## Abbreviate itemset names for better visualization
pr_l = pr.head(2).copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
pr_l

In [None]:
## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[1].itemsets
    itemset_shap = fp_divergence_difference.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

## Gain < 0 

In [None]:
## Retrieve the data cohorts for which Whisper BaseM performs better than Whisper Base
pr = compare_performance[cols][::-1].head(20).reset_index()
pr["support"] = pr["support"].round(2)
pr["wer_base_m"] = (pr["wer_base_m"]*100).round(2)
pr["wer_base"] = (pr["wer_base"]*100).round(2)
pr["d_difference"] = (pr["d_difference"]*100).round(2)

## Abbreviate itemset names for better visualization
pr_l = pr.head(2).copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
pr_l

In [None]:
## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_difference.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

## Gain = 0 

In [None]:
## Retrieve the data cohorts for which Whisper Large performs equal to Whisper Base
pr = merged.loc[ fp_divergence_difference.getDivergence(th_redundancy=0.0).itemsets.values][cols].reset_index()
pr["support_large"] = pr["support"].round(2)
pr["wer_base_m"] = (pr["wer_base_m"]*100).round(2)
pr["wer_base"] = (pr["wer_base"]*100).round(2)
pr["d_difference"] = (pr["d_difference"]*100).round(2)
pr = pr.loc[abs(pr["d_difference"])==0]
pr = pr.sort_values("wer_base_m").sort_values("wer_base_m")

## Abbreviate itemset names for better visualization
pr_l = pr.head(2).copy()
pr_l['itemsets'] = pr_l['itemsets'].apply(lambda x: sortItemset(x, abbreviations))
pr_l

In [None]:
## Compute Shapley Values for a given itemset
if len(pr) > 0:
    itemset_1 = pr.iloc[0].itemsets
    itemset_shap = fp_divergence_difference.computeShapleyValue(itemset_1)
    itemset_shap = {k:v*100 for k,v in itemset_shap.items()}
    plotShapleyValue(shapley_values=abbreviateDict(itemset_shap, abbreviations), 
                    sizeFig=(2,2), labelsize=16, titlesize=16)

## Global Shapley value

In [None]:
## Compute the top-K global shapley values related to the gain in performance between Whisper Base and Whisper Base Ft
global_item_divergence_wb_wbm = fp_divergence_difference.computeGlobalShapleyValue()

K = 15
topK_global_wb_wbm = {k:v for k,v in global_item_divergence_wb_wbm.items() \
                        if k in sorted(global_item_divergence_wb_wbm, 
                        key=lambda x: abs(global_item_divergence_wb_wbm[x]))[::-1][:K]}

In [None]:
## Plot and Save the image 
sizeFig = (3.2,4)
labelsize = 16
titlesize = 16

topK_global_wb_wbm_abbr = abbreviateDict(topK_global_wb_wbm, abbreviations)
topK_global_wb_wbm_abbr = {k:v*100 for k,v in topK_global_wb_wbm.items()}
name_fig = "global_shapley_gain_wb_wbft.pdf"
plotShapleyValue(shapley_values=topK_global_wb_wbm_abbr, \
                sizeFig=sizeFig, labelsize=labelsize, titlesize=titlesize, \
                title=r"$\tilde{\Delta}^g_{gain} WhisperM_{EN} - WhisperM_{Multi}$",
                nameFig=name_fig, saveFig=False)