In [None]:
import numpy as np
from sklearn import metrics
import matplotlib.pyplot as plt
from sklearn.metrics import auc
import pandas as pd
from tqdm import tqdm
import os
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_recall_fscore_support

In [None]:
#상위 몇 %를 잘라서 이상치로 보았을 때, 그 안의 precision이 가장 높은 구간
def find_best_percent(result, granularity_all=1000):
    """
    find threshold
    :param result: sorted result
    :param granularity_all: granularity_all
    """
    max_percent = 0
    best_n = 1
    print("threshold tuning start:")
    for n in tqdm(range(1, 100)):
        head_n = n / granularity_all
        data_length = max(round(len(result) * head_n), 1)
        count_dist = count_entries(result.loc[:data_length - 1], 'label')
        try:
            percent = count_dist['1'] / max(1,(count_dist['0'] + count_dist['1']))
            # anormal 갯수 파악.
        except KeyError:
            print("can't find n%,take 1%")
            percent = 0.01
        if percent > max_percent:
            max_percent = percent
            best_n = n
    print("top %d / %s is the highest, %s" % (granularity_all, best_n, max_percent))
    print("Count dist : " ,count_dist)
    return best_n, max_percent, granularity_all

def count_entries(df, col_name):
    """
    count
    """
    count_dist = {'0': 0, '1': 0}
    col = df[col_name]
    for entry in col:
        if str(int(entry)) in count_dist.keys():
            count_dist[str(int(entry))] = count_dist[str(int(entry))] + 1
        else:
            count_dist[str(int(entry))] = 1
    return count_dist

def find_best_result(threshold_n, result, dataframe_std):
    """
    find_best_result
    :param threshold_n: threshold
    :param result: sorted result
    :param dataframe_std: label
    """
    best_result, best_h, best_re, best_fa, best_f1, best_precision = None, 0, 0, 0, 0, 0
    best_auroc = 0
    for h in tqdm(range(10, 1000, 5)):
        train_result = charge_to_car(threshold_n, result, head_n=h)
        f1, recall, false_rate, precision, accuracy, auroc = evaluation(dataframe_std, train_result)
        if auroc >= best_auroc:
            best_f1 = f1
            best_h = h
            best_re = recall
            best_precision = precision
            best_result = train_result
            best_auroc = auroc
    return best_result, best_h, best_re, best_precision, best_f1, best_auroc

def charge_to_car(threshold_n, rec_result, head_n=92):
    """
    mapping from charge to car
    :param threshold_n: threshold
    :param rec_result: sorted result
    :param head_n: top %n
    :param gran: granularity
    """
    gran = 1000
    result = []
    for grp in rec_result.groupby('car'):
        temp = grp[1].values[:, -1].astype(float)
        idx = max(round(head_n / gran * len(temp)), 1)
        error = np.mean(temp[:idx])  
        result.append([grp[0], int(error > threshold_n), error, threshold_n])

        """ top_errors = temp[:idx]
        snip_pred = (top_errors > threshold_n).astype(int)

        ratio = snip_pred.mean() # head_n %의 구간 중 이상 스니펫 비율
        # result.append([grp[0], int(ratio > 0), error, threshold_n]) """ 
    return pd.DataFrame(result, columns=['car', 'predict', 'error', 'threshold_n'])

def evaluation(dataframe_std, dataframe):
    """
    calculated statistics
    :param dataframe_std:
    :param dataframe:
    :return:
    """

    # calculate auroc
#     print(dataframe) # error car
    _label = []
    for each_car in dataframe['car']:
        if int(each_car) in ind_car_num_list:
            _label.append(0)
        if int(each_car) in ood_car_num_list:
            _label.append(1)

    fpr, tpr, thresholds = metrics.roc_curve(_label, list(dataframe['error']), pos_label=1)
    auroc = auc(fpr, tpr)


    data = pd.merge(dataframe_std, dataframe, on='car')
    cm = confusion_matrix(data['label'].astype(int), data['predict'].astype(int))
    tn = cm[0, 0]
    fp = cm[0, 1]
    fn = cm[1, 0]
    tp = cm[1, 1] 
    precision = tp / (tp + fp) if tp + fp != 0 else 0
    recall = tp / (tp + fn) if tp + fn != 0 else 0
    false_rate = fp / (tn + fp) if tn + fp != 0 else 0
    accuracy = (tp + tn) / (tp + tn + fp + fn) if tp + tn + fp + fn != 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if precision + recall != 0 else 0
    return f1, recall, false_rate, precision, accuracy, auroc

# Load Data


In [None]:
brand = ["1","2","3"]
brand_num = 0

train_label_obj = np.load(f"./AUROCscore/labels/brand3/train_labels.npy",allow_pickle=True).item()
test_label_obj = np.load(f"./AUROCscore/labels/brand3/labels.npy",allow_pickle=True).item()
train_scores = np.load(f"./AUROCscore/scores/brand3/tranad_1_epochs5_lr1e-3/train_scores.npy",allow_pickle=True).item()
test_scores = np.load(f"./AUROCscore/scores/brand3/tranad_1_epochs5_lr1e-3/test_scores.npy",allow_pickle=True).item()

In [None]:
train_list = []
for k,v in train_scores.items():
    train_list.append(len(v))
print(train_list)

test_list = []
for k,v in test_scores.items():
    test_list.append(len(v))
print(test_list)
print(test_label_obj)

[372, 86, 367, 463, 646, 531, 310, 717, 196, 542, 488, 340, 733, 1236, 1034, 147, 265, 236, 222, 111, 49, 357, 738, 308, 531, 81, 142, 302, 438, 349, 160, 730, 127, 163, 171, 260, 91, 267, 95, 143, 123, 277, 221, 325, 80, 227, 601, 109, 114, 104, 289, 370, 108, 167, 200, 453, 100, 268, 160, 318, 589, 319, 260, 95, 138, 56, 164, 46, 24, 29, 53, 78, 12]
[208, 126, 103, 170, 304, 138, 425, 167, 82, 860, 91, 355, 864, 980, 250, 270, 202, 159, 509, 266, 208, 57, 137, 129, 937, 503, 77]
{203: 0, 204: 1, 205: 1, 206: 1, 209: 1, 210: 1, 212: 1, 213: 0, 215: 1, 217: 0, 219: 1, 225: 1, 226: 0, 235: 0, 236: 1, 237: 1, 239: 1, 240: 1, 241: 1, 243: 1, 247: 1, 249: 0}


In [None]:
""" train_temp = train_scores.copy()
test_temp = test_scores.copy()
for cid, data in train_scores.items():
    if len(data)<100 : 
        del train_temp[cid]
        del train_label_obj[cid]

for cid, data in test_scores.items():
    if len(data) <100 :
        del test_temp[cid]
        del test_label_obj[cid]

print(len(train_temp.keys()))
print(len(test_temp.keys()))

train_scores = train_temp
test_scores = test_temp """

' train_temp = train_scores.copy()\ntest_temp = test_scores.copy()\nfor cid, data in train_scores.items():\n    if len(data)<100 : \n        del train_temp[cid]\n        del train_label_obj[cid]\n\nfor cid, data in test_scores.items():\n    if len(data) <100 :\n        del test_temp[cid]\n        del test_label_obj[cid]\n\nprint(len(train_temp.keys()))\nprint(len(test_temp.keys()))\n\ntrain_scores = train_temp\ntest_scores = test_temp '

In [None]:
train_scores.update(test_scores) # train, test 하나로 합침.
ind_car_num_list = list(train_label_obj.keys())
ood_car_num_list = []

for k,v in test_label_obj.items():
    if test_label_obj[k]==1:
        ood_car_num_list.append(k)
    else :
        ind_car_num_list.append(k)

all_car_num_list = set(ind_car_num_list + ood_car_num_list)
print(len(all_car_num_list))

49


In [None]:
rows = []

for car_id, snippet_scores in train_scores.items():
    if car_id in ind_car_num_list:
        label = 0
    elif car_id in ood_car_num_list:
        label = 1
    else :
        continue

    for s in snippet_scores:
        rows.append([car_id,label,float(s)])

all_snippet_df = pd.DataFrame(rows,columns=['car','label','rec_error'])
print(len(ind_car_num_list))
print(len(ood_car_num_list))

33
16


In [None]:
labels = []
for each_car in all_car_num_list:
    if each_car in ind_car_num_list:
        label = 0
    elif each_car in ood_car_num_list:
        label = 1
    labels.append([each_car,int(label)])
dataframe = pd.DataFrame(labels,columns=['car','label'])

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn import metrics
from sklearn.metrics import auc, precision_recall_fscore_support

mean_fpr = np.linspace(0, 1, 100)
tprs = []
AUC_fivefold_list = []
plt.figure(figsize=(9,6))
for i in range(5):
    
    fold_num = i
    test_car_list = ind_car_num_list[
        int(fold_num * len(ind_car_num_list) / 5) : int((fold_num + 1) * len(ind_car_num_list) / 5)
    ] + ood_car_num_list[: int(fold_num * len(ood_car_num_list) / 5)] \
      + ood_car_num_list[int((fold_num + 1) * len(ood_car_num_list) / 5) :]
    test_car_list = set(test_car_list)
    train_car_list = all_car_num_list - test_car_list

    print(f"Fold Num{fold_num} Test car List : {test_car_list}")
    print(f"Fold Num{fold_num} Train car List : {train_car_list}")
    # ------------------------------
    # Train part: threshold_n, best_h 튜닝
    # ------------------------------
    train_result = all_snippet_df[all_snippet_df['car'].isin(train_car_list)].copy()
    test_result  = all_snippet_df[all_snippet_df['car'].isin(test_car_list)].copy()      
    
    train_res_csv = train_result[['label','car','rec_error']].to_numpy()
    test_res_csv  = test_result[['label','car','rec_error']].to_numpy()

    rec_sorted_index = np.argsort(-train_res_csv[:, 2].astype(float))  # 정렬한 인덱스 반환
    res = [train_res_csv[j][[1, 0, 2]] for j in rec_sorted_index]      # [car, label, rec_error]
    result = pd.DataFrame(res, columns=['car', 'label', 'rec_error'])
    
    best_n, max_percent, granularity = find_best_percent(result, granularity_all=1000)
    head_n = best_n / granularity
    data_length = 1,round(len(result) * head_n)
    # threshold_n : precision이 최대가 되는 지점의 임계값.
    if result.empty:
        threshold_n = 1e-7
    else : 
        threshold_n = result['rec_error'].values[data_length - 1].astype(float)
    
    print("threshold_n", threshold_n)
    print("start tuning, flag is", 'rec_error')
    best_result, best_h, best_re, best_fa, best_f1, best_auroc = find_best_result(
        threshold_n, result, dataframe
    )
    if dataframe.shape[0] != best_result.shape[0]:
        print('dataframe_std is ', dataframe.shape[0], '&&   dataframe is ', best_result.shape[0])
        
    print("F1 Scores through Train data")
    print("best 1000 / %d:" % best_h)
    print("re:", best_re)
    print("fa:", best_fa)
    print("F1:", best_f1)
    
    # ------------------------------
    # Test part: charge_to_car → car-level score / 예측
    # ------------------------------
    rec_sorted_index = np.argsort(-test_res_csv[:, 2].astype(float))
    res = [test_res_csv[j][[1, 0, 2]] for j in rec_sorted_index]
    result = pd.DataFrame(res, columns=['car', 'label', 'rec_error'])
    result['car'] = result['car'].astype("int").astype("str")

    test_result_car = charge_to_car(threshold_n, result, head_n=best_h)
    # columns: ['car', 'predict', 'error', 'threshold_n']

    _score = list(test_result_car['error'])
    y_true = []
    for each_car in test_result_car['car']:
        if int(each_car) in ind_car_num_list:
            y_true.append(0)
        if int(each_car) in ood_car_num_list:
            y_true.append(1)
    y_pred = list(test_result_car['predict'])  # charge_to_car에서 0/1로 만들어 둔 것

    precision, recall, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, pos_label=1, average='binary'
    )

    print("F1 Score through Test data")
    print("Test Precision:", precision)
    print("Test Recall:", recall)
    print("Test F1:", f1)

    print('len(_score)', len(_score))
    fpr, tpr, thresholds = metrics.roc_curve(y_true, _score, pos_label=1)
    auc_fold = auc(fpr, tpr)
    print('AUC', auc_fold)
    # np.save(f"/results/true_score_fold{i}.npy",y_true)
    # np.save(f"/results/pred_score_fold{i}.npy",_score)
    # fold별 ROC 그리기
    plt.plot(fpr, tpr, alpha=0.3, label=f"Fold {i} AUC = {auc_fold:.3f}")

    # mean ROC 계산용 보간
    tpr_interp = np.interp(mean_fpr, fpr, tpr)
    tpr_interp[0] = 0.0
    tprs.append(tpr_interp)
    AUC_fivefold_list.append(auc_fold)

# ------------------------------
# 5-fold 평균 ROC + 표준편차 밴드
# ------------------------------
mean_tpr = np.mean(tprs, axis=0)
std_tpr  = np.std(tprs, axis=0)
mean_tpr[-1] = 1.0  # 끝점 보정

plt.plot(
    mean_fpr,
    mean_tpr,
    color="blue",
    lw=2,
    label=f"Mean ROC (AUC={np.mean(AUC_fivefold_list):.3f})"
)
plt.fill_between(
    mean_fpr,
    mean_tpr - std_tpr,
    mean_tpr + std_tpr,
    color="blue",
    alpha=0.2,
    label="± 1 std"
)
plt.plot([0, 1], [0, 1], "k--")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title(f"Brand{brand[brand_num]} ROC with Mean ± Std (5-Fold, charge_to_car)")
plt.legend()
plt.show()

print("Fold AUCs:", AUC_fivefold_list)
print("AUC mean ", np.mean(AUC_fivefold_list))


threshold tuning start:


100%|██████████| 99/99 [00:00<00:00, 18651.40it/s]


top 1000 / 1 is the highest, 0
Count dist :  {'0': 0, '1': 0}
threshold_n 1e-07
start tuning, flag is rec_error


  0%|          | 0/198 [00:00<?, ?it/s]


IndexError: cannot do a non-empty take from an empty axes.

<Figure size 900x600 with 0 Axes>