In [130]:
import pandas as pd
import numpy as np
from sklearn.metrics import f1_score
from scipy import stats

# 데이터 로드 및 전처리
# data = pd.read_csv(f'/home/data/CCTA/250410/external_test_set.csv').fillna('None')

# pred = pd.read_excel('output/250410/five_institution_llama3.xlsx').iloc[:, 3:].astype(str) 
# pred = pd.read_excel('output/250410/five_institution_llama3.1.xlsx').iloc[:, 3:].astype(str) 
# pred = pd.read_excel('output/250410/five_institution_mistral.xlsx').iloc[:, 3:].astype(str) 


##############

data = pd.read_csv(f'/home/data/CCTA/sample_processed_v6.3(외부병원).csv').fillna('None')

pred = pd.read_excel('/home/workspace/report_labeler/output/llama3-8B_250409_entire_external.xlsx').iloc[:, 3:].astype(str) # 
# pred = pd.read_excel('/home/workspace/report_labeler/output/llama3.1-8B_entire_external.xlsx').iloc[:, 3:].astype(str) # 
# pred = pd.read_excel('/home/workspace/report_labeler/output/llama3.1-8B_250409_entire_external.xlsx').iloc[:, 3:].astype(str) # 
# pred = pd.read_excel('/home/workspace/report_labeler/output/250410/five_institution_llama3.xlsx').iloc[:, 3:].astype(str) # 

In [131]:
labels = data[['CAD-RADS', 'Plaque Burden', 'E', 'I', 'N', 'G', 'HRP', 'S', 'CAC_available']].astype(str)
pred.columns = pred.columns.str.replace('Plaque_Burden', 'Plaque Burden')
pred = pred[labels.iloc[:, :-1].columns]
pred.loc[pred['CAD-RADS'] == 'N', 'CAD-RADS'] = '0'
pred = pred.replace('nan', 'None')

In [132]:
# 부트스트랩 함수
def bootstrap_sample(df_pred, df_true, seed=None):
    np.random.seed(seed)
    indices = np.random.choice(len(df_pred), len(df_pred), replace=True)
    return df_pred.iloc[indices].reset_index(drop=True), df_true.iloc[indices].reset_index(drop=True)

# F1 점수 계산 함수 (조건별로 적용)
def get_all_f1(true_df, pred_df, average):
    f1_scores = []
    for column in true_df.iloc[:,:-1].columns:
        true_values = true_df[column]
        pred_values = pred_df[column]

        if column == 'Plaque Burden':
            mask = (true_df['S'] == '0') & (true_df['CAC_available'] == '1')
            true_values = true_values[mask]
            pred_values = pred_values[mask]
        elif column == 'CAD-RADS':
            mask = (true_df['N'] == '0')
            true_values = true_values[mask]
            pred_values = pred_values[mask]

        f1 = f1_score(true_values, pred_values, average=average)
        f1_scores.append(f1)

    return f1_scores

# 평균 및 신뢰구간 계산
def calculate_mean_std(scores):
    return np.mean(scores, axis=0), np.std(scores, axis=0)

def calculate_confidence_interval(scores, confidence=0.95):
    mean, std = calculate_mean_std(scores)
    se = std / np.sqrt(len(scores))
    margin = se * stats.t.ppf((1 + confidence) / 2., len(scores) - 1)
    return mean - margin, mean + margin

In [133]:
def calculate_metric(labels, pred):
    # 부트스트랩으로 F1 점수 계산
    weighted_f1_scores, macro_f1_scores = [], []
    for i in range(10):
        bs_pred, bs_labels = bootstrap_sample(pred, labels, seed=i)
        weighted_f1_scores.append(get_all_f1(bs_labels, bs_pred, 'weighted'))
        macro_f1_scores.append(get_all_f1(bs_labels, bs_pred, 'macro'))

    # 결과 계산
    mean_macro, std_macro = calculate_mean_std(macro_f1_scores)
    ci_macro_lower, ci_macro_upper = calculate_confidence_interval(macro_f1_scores)

    mean_weighted, std_weighted = calculate_mean_std(weighted_f1_scores)
    ci_weighted_lower, ci_weighted_upper = calculate_confidence_interval(weighted_f1_scores)

    # 6개 컬럼만 선택하기 (E, I, N, G, HRP, S)
    target_columns = ['E', 'I', 'N', 'G', 'HRP', 'S']
    column_indices = [list(labels.iloc[:, :-1].columns).index(col) for col in target_columns if col in labels.iloc[:, :-1].columns]

    # 선택한 컬럼에 대한 평균 계산
    if column_indices:
        # Macro F1 평균
        selected_macro_means = mean_macro[column_indices]
        overall_macro_mean = np.mean(selected_macro_means)
        
        # Macro F1 신뢰구간 계산
        # 부트스트랩 샘플별로 6개 컬럼의 평균을 계산
        macro_means_per_bootstrap = []
        for bs_sample in macro_f1_scores:
            selected_scores = [bs_sample[idx] for idx in column_indices]
            macro_means_per_bootstrap.append(np.mean(selected_scores))
        
        # 평균의 신뢰구간 계산
        macro_mean_std = np.std(macro_means_per_bootstrap)
        macro_mean_se = macro_mean_std / np.sqrt(len(macro_f1_scores))
        macro_mean_margin = macro_mean_se * stats.t.ppf((1 + 0.95) / 2., len(macro_f1_scores) - 1)
        macro_mean_ci_lower = overall_macro_mean - macro_mean_margin
        macro_mean_ci_upper = overall_macro_mean + macro_mean_margin
        
        # Weighted F1 평균
        selected_weighted_means = mean_weighted[column_indices]
        overall_weighted_mean = np.mean(selected_weighted_means)
        
        # Weighted F1 신뢰구간 계산
        weighted_means_per_bootstrap = []
        for bs_sample in weighted_f1_scores:
            selected_scores = [bs_sample[idx] for idx in column_indices]
            weighted_means_per_bootstrap.append(np.mean(selected_scores))
        
        weighted_mean_std = np.std(weighted_means_per_bootstrap)
        weighted_mean_se = weighted_mean_std / np.sqrt(len(weighted_f1_scores))
        weighted_mean_margin = weighted_mean_se * stats.t.ppf((1 + 0.95) / 2., len(weighted_f1_scores) - 1)
        weighted_mean_ci_lower = overall_weighted_mean - weighted_mean_margin
        weighted_mean_ci_upper = overall_weighted_mean + weighted_mean_margin

    # 최종 결과 출력
    result_df = pd.DataFrame({
        'Metric': labels.iloc[:, :-1].columns,
        'Macro F1 Mean': np.round(mean_macro, 2),
        'Macro F1 CI Lower': np.round(ci_macro_lower, 2),
        'Macro F1 CI Upper': np.round(ci_macro_upper, 2),
        'Weighted F1 Mean': np.round(mean_weighted, 2),
        'Weighted F1 CI Lower': np.round(ci_weighted_lower, 2),
        'Weighted F1 CI Upper': np.round(ci_weighted_upper, 2)
    })

    # 6개 컬럼 평균 행 추가
    if column_indices:
        # 새로운 행 생성
        average_row = {
            'Metric': '6개 컬럼 평균 (E,I,N,G,HRP,S)',
            'Macro F1 Mean': np.round(overall_macro_mean, 2),
            'Macro F1 CI Lower': np.round(macro_mean_ci_lower, 2),
            'Macro F1 CI Upper': np.round(macro_mean_ci_upper, 2),
            'Weighted F1 Mean': np.round(overall_weighted_mean, 2),
            'Weighted F1 CI Lower': np.round(weighted_mean_ci_lower, 2),
            'Weighted F1 CI Upper': np.round(weighted_mean_ci_upper, 2)
        }
        
        # 평균 행 추가
        result_df = pd.concat([result_df, pd.DataFrame([average_row])], ignore_index=True)

    return result_df

In [134]:
calculate_metric(labels, pred)

Unnamed: 0,Metric,Macro F1 Mean,Macro F1 CI Lower,Macro F1 CI Upper,Weighted F1 Mean,Weighted F1 CI Lower,Weighted F1 CI Upper
0,CAD-RADS,0.78,0.74,0.82,0.85,0.82,0.87
1,Plaque Burden,0.9,0.84,0.96,1.0,1.0,1.0
2,E,0.77,0.68,0.87,0.99,0.98,0.99
3,I,1.0,1.0,1.0,1.0,1.0,1.0
4,N,0.49,0.49,0.49,0.97,0.96,0.98
5,G,1.0,1.0,1.0,1.0,1.0,1.0
6,HRP,0.93,0.9,0.96,0.99,0.98,0.99
7,S,1.0,1.0,1.0,1.0,1.0,1.0
8,"6개 컬럼 평균 (E,I,N,G,HRP,S)",0.87,0.85,0.88,0.99,0.99,0.99


In [135]:
for institution in set(data['institution']):
    label_institution = labels[data['institution'] == institution]
    pred_institution = pred[data['institution'] == institution]
    print(f'####### Institution {institution}######################')
    print(calculate_metric(label_institution, pred_institution))
    print('')
    

KeyError: 'institution'

Unnamed: 0,CAD-RADS,Plaque Burden,E,I,N,G,HRP,S
0,2,P1,0,0,0,0,0,0
1,3,Not measurable,0,0,0,1,0,1
2,2,P1,0,0,0,0,0,0
3,0,,0,0,0,0,0,0
4,3,P3,0,0,0,0,1,0
5,2,P3,0,0,0,0,0,0
6,3,P2,0,0,0,0,0,0
7,2,P3,0,0,0,0,0,0
8,2,Not measurable,0,0,0,0,0,1
9,2,P2,0,0,0,0,0,0


In [29]:
# 부트스트랩으로 F1 점수 계산
weighted_f1_scores, macro_f1_scores = [], []
for i in range(10):
    bs_pred, bs_labels = bootstrap_sample(pred, labels, seed=i)
    weighted_f1_scores.append(get_all_f1(bs_labels, bs_pred, 'weighted'))
    macro_f1_scores.append(get_all_f1(bs_labels, bs_pred, 'macro'))

# 결과 계산
mean_macro, std_macro = calculate_mean_std(macro_f1_scores)
ci_macro_lower, ci_macro_upper = calculate_confidence_interval(macro_f1_scores)

mean_weighted, std_weighted = calculate_mean_std(weighted_f1_scores)
ci_weighted_lower, ci_weighted_upper = calculate_confidence_interval(weighted_f1_scores)

# 6개 컬럼만 선택하기 (E, I, N, G, HRP, S)
target_columns = ['E', 'I', 'N', 'G', 'HRP', 'S']
column_indices = [list(labels.iloc[:, :-1].columns).index(col) for col in target_columns if col in labels.iloc[:, :-1].columns]

# 선택한 컬럼에 대한 평균 계산
if column_indices:
    # Macro F1 평균
    selected_macro_means = mean_macro[column_indices]
    overall_macro_mean = np.mean(selected_macro_means)
    
    # Macro F1 신뢰구간 계산
    # 부트스트랩 샘플별로 6개 컬럼의 평균을 계산
    macro_means_per_bootstrap = []
    for bs_sample in macro_f1_scores:
        selected_scores = [bs_sample[idx] for idx in column_indices]
        macro_means_per_bootstrap.append(np.mean(selected_scores))
    
    # 평균의 신뢰구간 계산
    macro_mean_std = np.std(macro_means_per_bootstrap)
    macro_mean_se = macro_mean_std / np.sqrt(len(macro_f1_scores))
    macro_mean_margin = macro_mean_se * stats.t.ppf((1 + 0.95) / 2., len(macro_f1_scores) - 1)
    macro_mean_ci_lower = overall_macro_mean - macro_mean_margin
    macro_mean_ci_upper = overall_macro_mean + macro_mean_margin
    
    # Weighted F1 평균
    selected_weighted_means = mean_weighted[column_indices]
    overall_weighted_mean = np.mean(selected_weighted_means)
    
    # Weighted F1 신뢰구간 계산
    weighted_means_per_bootstrap = []
    for bs_sample in weighted_f1_scores:
        selected_scores = [bs_sample[idx] for idx in column_indices]
        weighted_means_per_bootstrap.append(np.mean(selected_scores))
    
    weighted_mean_std = np.std(weighted_means_per_bootstrap)
    weighted_mean_se = weighted_mean_std / np.sqrt(len(weighted_f1_scores))
    weighted_mean_margin = weighted_mean_se * stats.t.ppf((1 + 0.95) / 2., len(weighted_f1_scores) - 1)
    weighted_mean_ci_lower = overall_weighted_mean - weighted_mean_margin
    weighted_mean_ci_upper = overall_weighted_mean + weighted_mean_margin

# 최종 결과 출력
result_df = pd.DataFrame({
    'Metric': labels.iloc[:, :-1].columns,
    'Macro F1 Mean': np.round(mean_macro, 2),
    'Macro F1 CI Lower': np.round(ci_macro_lower, 2),
    'Macro F1 CI Upper': np.round(ci_macro_upper, 2),
    'Weighted F1 Mean': np.round(mean_weighted, 2),
    'Weighted F1 CI Lower': np.round(ci_weighted_lower, 2),
    'Weighted F1 CI Upper': np.round(ci_weighted_upper, 2)
})

# 6개 컬럼 평균 행 추가
if column_indices:
    # 새로운 행 생성
    average_row = {
        'Metric': '6개 컬럼 평균 (E,I,N,G,HRP,S)',
        'Macro F1 Mean': np.round(overall_macro_mean, 2),
        'Macro F1 CI Lower': np.round(macro_mean_ci_lower, 2),
        'Macro F1 CI Upper': np.round(macro_mean_ci_upper, 2),
        'Weighted F1 Mean': np.round(overall_weighted_mean, 2),
        'Weighted F1 CI Lower': np.round(weighted_mean_ci_lower, 2),
        'Weighted F1 CI Upper': np.round(weighted_mean_ci_upper, 2)
    }
    
    # 평균 행 추가
    result_df = pd.concat([result_df, pd.DataFrame([average_row])], ignore_index=True)

result_df

Unnamed: 0,Metric,Macro F1 Mean,Macro F1 CI Lower,Macro F1 CI Upper,Weighted F1 Mean,Weighted F1 CI Lower,Weighted F1 CI Upper
0,CAD-RADS,0.86,0.81,0.9,0.92,0.9,0.93
1,Plaque Burden,1.0,1.0,1.0,1.0,1.0,1.0
2,E,0.8,0.67,0.92,0.96,0.94,0.98
3,I,1.0,1.0,1.0,1.0,1.0,1.0
4,N,0.59,0.45,0.74,0.99,0.98,0.99
5,G,1.0,1.0,1.0,1.0,1.0,1.0
6,HRP,0.64,0.47,0.81,0.98,0.98,0.99
7,S,1.0,1.0,1.0,1.0,1.0,1.0
8,"6개 컬럼 평균 (E,I,N,G,HRP,S)",0.84,0.79,0.89,0.99,0.98,0.99


In [31]:
# 부트스트랩으로 F1 점수 계산
weighted_f1_scores, macro_f1_scores = [], []
for i in range(10):
    bs_pred, bs_labels = bootstrap_sample(pred, labels, seed=i)
    weighted_f1_scores.append(get_all_f1(bs_labels, bs_pred, 'weighted'))
    macro_f1_scores.append(get_all_f1(bs_labels, bs_pred, 'macro'))

# 결과 계산
mean_macro, std_macro = calculate_mean_std(macro_f1_scores)
ci_macro_lower, ci_macro_upper = calculate_confidence_interval(macro_f1_scores)

mean_weighted, std_weighted = calculate_mean_std(weighted_f1_scores)
ci_weighted_lower, ci_weighted_upper = calculate_confidence_interval(weighted_f1_scores)

# 6개 컬럼만 선택하기 (E, I, N, G, HRP, S)
target_columns = ['E', 'I', 'N', 'G', 'HRP', 'S']
column_indices = [list(labels.iloc[:, :-1].columns).index(col) for col in target_columns if col in labels.iloc[:, :-1].columns]

# 선택한 컬럼에 대한 평균 계산
if column_indices:
    # Macro F1 평균
    selected_macro_means = mean_macro[column_indices]
    overall_macro_mean = np.mean(selected_macro_means)
    
    # Macro F1 신뢰구간 계산
    # 부트스트랩 샘플별로 6개 컬럼의 평균을 계산
    macro_means_per_bootstrap = []
    for bs_sample in macro_f1_scores:
        selected_scores = [bs_sample[idx] for idx in column_indices]
        macro_means_per_bootstrap.append(np.mean(selected_scores))
    
    # 평균의 신뢰구간 계산
    macro_mean_std = np.std(macro_means_per_bootstrap)
    macro_mean_se = macro_mean_std / np.sqrt(len(macro_f1_scores))
    macro_mean_margin = macro_mean_se * stats.t.ppf((1 + 0.95) / 2., len(macro_f1_scores) - 1)
    macro_mean_ci_lower = overall_macro_mean - macro_mean_margin
    macro_mean_ci_upper = overall_macro_mean + macro_mean_margin
    
    # Weighted F1 평균
    selected_weighted_means = mean_weighted[column_indices]
    overall_weighted_mean = np.mean(selected_weighted_means)
    
    # Weighted F1 신뢰구간 계산
    weighted_means_per_bootstrap = []
    for bs_sample in weighted_f1_scores:
        selected_scores = [bs_sample[idx] for idx in column_indices]
        weighted_means_per_bootstrap.append(np.mean(selected_scores))
    
    weighted_mean_std = np.std(weighted_means_per_bootstrap)
    weighted_mean_se = weighted_mean_std / np.sqrt(len(weighted_f1_scores))
    weighted_mean_margin = weighted_mean_se * stats.t.ppf((1 + 0.95) / 2., len(weighted_f1_scores) - 1)
    weighted_mean_ci_lower = overall_weighted_mean - weighted_mean_margin
    weighted_mean_ci_upper = overall_weighted_mean + weighted_mean_margin

# 최종 결과 출력
result_df = pd.DataFrame({
    'Metric': labels.iloc[:, :-1].columns,
    'Macro F1 Mean': np.round(mean_macro, 2),
    'Macro F1 CI Lower': np.round(ci_macro_lower, 2),
    'Macro F1 CI Upper': np.round(ci_macro_upper, 2),
    'Weighted F1 Mean': np.round(mean_weighted, 2),
    'Weighted F1 CI Lower': np.round(ci_weighted_lower, 2),
    'Weighted F1 CI Upper': np.round(ci_weighted_upper, 2)
})

# 6개 컬럼 평균 행 추가
if column_indices:
    # 새로운 행 생성
    average_row = {
        'Metric': '6개 컬럼 평균 (E,I,N,G,HRP,S)',
        'Macro F1 Mean': np.round(overall_macro_mean, 2),
        'Macro F1 CI Lower': np.round(macro_mean_ci_lower, 2),
        'Macro F1 CI Upper': np.round(macro_mean_ci_upper, 2),
        'Weighted F1 Mean': np.round(overall_weighted_mean, 2),
        'Weighted F1 CI Lower': np.round(weighted_mean_ci_lower, 2),
        'Weighted F1 CI Upper': np.round(weighted_mean_ci_upper, 2)
    }
    
    # 평균 행 추가
    result_df = pd.concat([result_df, pd.DataFrame([average_row])], ignore_index=True)

result_df

Unnamed: 0,Metric,Macro F1 Mean,Macro F1 CI Lower,Macro F1 CI Upper,Weighted F1 Mean,Weighted F1 CI Lower,Weighted F1 CI Upper
0,CAD-RADS,0.8,0.79,0.81,0.91,0.89,0.93
1,Plaque Burden,1.0,1.0,1.0,1.0,1.0,1.0
2,E,0.8,0.67,0.92,0.96,0.94,0.98
3,I,1.0,1.0,1.0,1.0,1.0,1.0
4,N,0.59,0.45,0.74,0.99,0.98,0.99
5,G,1.0,1.0,1.0,1.0,1.0,1.0
6,HRP,1.0,1.0,1.0,1.0,1.0,1.0
7,S,1.0,1.0,1.0,1.0,1.0,1.0
8,"6개 컬럼 평균 (E,I,N,G,HRP,S)",0.9,0.87,0.93,0.99,0.99,1.0
