# Outliers and Missing Data (Processing)

## 주요 패키지

In [None]:
import os
from datetime import datetime
from glob import glob
from typing import Dict, List, Tuple

import geopandas as gpd
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import missingno as msno
import numpy as np
import pandas as pd
import scipy.stats as stats
import seaborn as sns
from matplotlib.ticker import FuncFormatter
from metr.components import Metadata, TrafficData
from metr.components.metr_imc.interpolation import (
    Interpolator,
    LinearInterpolator,
    MonthlyMeanFillInterpolator,
    ShiftFillInterpolator,
    SplineLinearInterpolator,
    TimeMeanFillInterpolator,
)
from metr.components.metr_imc.outlier import (
    HourlyInSensorZscoreOutlierProcessor,
    InSensorZscoreOutlierProcessor,
    MADOutlierProcessor,
    OutlierProcessor,
    TrimmedMeanOutlierProcessor,
    WinsorizedOutlierProcessor,
)
from sklearn.metrics import mean_absolute_error, root_mean_squared_error
from songdo_rnn.preprocessing.missing import interpolate
from songdo_rnn.preprocessing.outlier import remove_outliers
from tqdm import tqdm

plt.rcParams["font.family"] = "AppleGothic"
plt.rcParams["axes.unicode_minus"] = False  # 음수 부호 깨짐 방지

## 공통 함수

In [None]:
def get_data_list(data_dir: str) -> List[TrafficData]:
    data_path_list = glob(os.path.join(data_dir, "*.h5"))
    return [TrafficData.import_from_hdf(path) for path in data_path_list]

In [None]:
# 두 데이터 비교
def visualize_df_comparison(
    df1: pd.Series,
    df2: pd.Series,
    title: str = "",
    df1_attr: Dict[str, str] = {"label": "Source", "color": "red"},
    df2_attr: Dict[str, str] = {"label": "Target", "color": "blue"},
):
    plt.figure(figsize=(12, 6))
    df1.plot(**df1_attr)
    df2.plot(**df2_attr)
    plt.title(title)
    plt.legend()
    plt.show()

In [None]:
# 두 데이터 비교
def vis_df_comparison_line_scatter(
    df1: pd.Series,
    df2: pd.Series,
    title: str = "",
    df1_attr: Dict[str, str] = {"label": "Source", "color": "blue"},
    df2_attr: Dict[str, str] = {"label": "Target", "color": "red", "s": 20},
):
    fig, ax = plt.subplots(figsize=(12, 5))
    sns.lineplot(data=df1, ax=ax, **df1_attr)
    sns.scatterplot(data=df2, ax=ax, **df2_attr)
    plt.title(title)
    plt.legend()
    plt.show()

In [None]:
def vis_df_comparison_scatter(
    df1: pd.Series,
    df2: pd.Series,
    title: str = "",
    df1_attr: Dict[str, str] = {"label": "Source", "color": "red", "s": 10},
    df2_attr: Dict[str, str] = {"label": "Target", "color": "blue", "s": 10},
):
    fig, ax = plt.subplots(figsize=(12, 6))
    sns.scatterplot(data=df1, ax=ax, **df1_attr)
    sns.scatterplot(data=df2, ax=ax, **df2_attr)
    plt.title(title)
    plt.legend()
    plt.show()

## 데이터 로딩

In [None]:
OUTLIER_DATA_DIR = "./output/outlier_processed"
OUTLIER_STEST_DATA_DIR = os.path.join(OUTLIER_DATA_DIR, "stest")
OUTLIER_PTEST_DATA_DIR = os.path.join(OUTLIER_DATA_DIR, "ptest")
INTERPOLATED_DATA_DIR = "./output/interpolated"
INTERPOLATED_STEST_DATA_DIR = os.path.join(INTERPOLATED_DATA_DIR, "stest")
INTERPOLATED_PTEST_DATA_DIR = os.path.join(INTERPOLATED_DATA_DIR, "ptest")
PREDICTION_OUTPUT_DIR = "./output/prediction"

RAW_DATA_PATH = "../datasets/metr-imc/metr-imc.h5"
BASE_DATA_PATH = "./output/outlier_processed/base.h5"

os.makedirs(OUTLIER_DATA_DIR, exist_ok=True)
os.makedirs(OUTLIER_STEST_DATA_DIR, exist_ok=True)
os.makedirs(OUTLIER_PTEST_DATA_DIR, exist_ok=True)
os.makedirs(INTERPOLATED_DATA_DIR, exist_ok=True)
os.makedirs(INTERPOLATED_STEST_DATA_DIR, exist_ok=True)
os.makedirs(INTERPOLATED_PTEST_DATA_DIR, exist_ok=True)
os.makedirs(PREDICTION_OUTPUT_DIR, exist_ok=True)

raw_data = TrafficData.import_from_hdf(RAW_DATA_PATH)
base_data = TrafficData.import_from_hdf(BASE_DATA_PATH)
raw_df = raw_data.data
base_df = base_data.data
base_df

## Training and Test Data Split



In [None]:
test_start = pd.Timestamp("2024-08-01")
test_end = pd.Timestamp("2024-08-01")

ratio = 0.25
for i in range(8, 12):
    test_start = pd.Timestamp(f"2024-{i:02d}-01")
    test_end = pd.Timestamp(f"2024-{i + 1:02d}-01")

    test_cand = raw_df.loc[test_start:test_end]
    test_cand_no_na = test_cand.dropna(axis=1)
    print(test_start, "-", test_end, ":", test_cand_no_na.shape[1], "/", test_cand.shape[1])

    train_data = raw_df.loc[(raw_df.index < test_start) | (raw_df.index >= test_end)]
    na_ratio = train_data.isna().mean()
    sensors_less_than = (na_ratio <= ratio).sum()
    total_sensors = train_data.shape[1]
    print(f"테스트 기간 외 결측치 비율 {ratio} 이하 센서: {sensors_less_than} / {total_sensors} ({sensors_less_than/total_sensors:.2%})")

# 적당히 길면서 결측치가 많지 않은 2024-10-01 ~ 2024-10-30 선택

In [None]:
training_df: pd.DataFrame = raw_df.loc[:"2024-09-30"]
test_df: pd.DataFrame = raw_df.loc["2024-10-01":"2024-10-31"]

print("Training Length:", training_df.index.min(), "-", training_df.index.max())
print("Test Length:", test_df.index.min(), "-", test_df.index.max())

테스트 데이터에는 결측치가 있으면 안 되므로 현 연구에 사용되는 데이터는 결측치가 없는 데이터로 선택.

In [None]:
test_data_no_na = test_df.dropna(axis=1)

test_df = test_data_no_na
training_df = training_df[test_df.columns]

print("Test Data Shape: From", test_df.shape, "to", test_data_no_na.shape)
print("Training Data Shape: From", training_df.shape, "to", training_df.shape)

## 테스트 데이터 임의 결측치 생성

이상치는 z-score 3이상을 이상치로 판단하고 Training 데이터에서 그 분포를 추출한다. 그리고 Training 데이터 비율과 동일하게 Test 데이터에 이상치와 결측치를 추가한다.

### 이상치 분포 확인


In [None]:
# 테스트 데이터에 NaN이 없는지 다시 확인
test_df.isna().sum().sum()

In [None]:
# 이상치 데이터의 범위
t_all = training_df.values.flatten()
t_all = t_all[~np.isnan(t_all)]
t_mean = t_all.mean()
t_std = t_all.std()
t_z = (t_all - t_mean) / t_std
t_outlier_indices = np.where(np.abs(t_z) > 3)[0]
t_outliers = t_all[t_outlier_indices]

# 이상치 데이터의 범위와 통계 출력
if len(t_outliers) > 0:
    min_outlier = t_outliers.min()
    max_outlier = t_outliers.max()
    print(f"이상치 데이터의 개수: {len(t_outliers)}")
    print(f"이상치 데이터의 범위: {min_outlier:.2f}에서 {max_outlier:.2f}")
    print(f"전체 데이터의 평균: {t_mean:.2f}, 표준편차: {t_std:.2f}")
    print(
        f"전체 데이터의 정상 범위: {t_mean - 3 * t_std:.2f}에서 {t_mean + 3 * t_std:.2f}"
    )
    print(f"이상치 비율: {len(t_outliers) / len(t_all) * 100:.2f}%")
else:
    print("z-score가 3 이상인 이상치 데이터가 없습니다.")

In [None]:
# 전체 결측치 비율
missing_ratio = training_df.isna().sum().sum() / training_df.size
print(f"전체 결측치 비율: {missing_ratio:.4f} ({missing_ratio*100:.2f}%)")

# 열별 결측치 비율
column_missing = training_df.isna().mean()

# 결측치가 가장 많은 열 확인
print("\n결측치가 가장 많은 열 Top 5:")
print(column_missing.sort_values(ascending=False).head())

# 행별 결측치 비율 분포 확인
row_missing = training_df.isna().mean(axis=1)
print(f"\n행별 결측치 비율 평균: {row_missing.mean():.4f}")
print(f"행별 결측치 비율 최댓값: {row_missing.max():.4f}")

# 결측치 값 출력
t_outliers.sort()
t_outliers

### 이상치 및 결측치 삽입

In [None]:
corrupted_test_df = test_df.copy()
outlier_ratio = len(t_outliers) / len(t_all)
missing_ratio = training_df.isna().sum().sum() / training_df.size
total_points = corrupted_test_df.size

In [None]:
print("원래 결측치:", corrupted_test_df.isna().sum().sum())
print(corrupted_test_df.shape)
n_missing_in_sensor = test_df.shape[0] * missing_ratio  # 각 센서마다 결측치 개수
corrupted_test_df = test_df.copy()

failed_dict = {}
for col_idx, col in enumerate(test_df.columns):
    n_missing = int(n_missing_in_sensor)
    
    # 다양한 길이의 결측치 블록 생성
    missing_lengths = []
    while sum(missing_lengths) < n_missing:
        remaining = n_missing - sum(missing_lengths)
        length = np.random.randint(1, min(remaining + 1, n_missing + 1))
        missing_lengths.append(length)
    missing_lengths[-1] -= sum(missing_lengths) - n_missing
    
    # 현재 컬럼에서 사용 가능한 모든 행 인덱스
    available_rows = set(range(test_df.shape[0]))
    
    # 각 결측치 블록 삽입
    for length in missing_lengths:
        max_attempts = 100  # 최대 시도 횟수 설정
        attempt = 0
        block_created = False
        
        while attempt < max_attempts and not block_created:
            if len(available_rows) == 0 or length > len(available_rows):
                print(f"센서 {col}에 충분한 공간이 없습니다. 필요: {length}, 가용: {len(available_rows)}")
                break
            
            # 시작점 후보들 찾기
            potential_starts = []
            for row in available_rows:
                # 현재 위치부터 length만큼의 연속된 인덱스가 모두 available_rows에 있는지 확인
                if all((row + i) in available_rows for i in range(length)):
                    potential_starts.append(row)
            
            if not potential_starts:
                # 연속된 블록을 찾지 못했다면 더 짧은 블록으로 시도
                if length > 1:
                    length -= 1
                    failed_dict[col] = (length, attempt)
                    continue
                else:
                    print(f"센서 {col}에서 연속된 블록을 생성할 수 없습니다.")
                    break
            
            # 후보 중에서 무작위로 시작점 선택
            start_row = np.random.choice(potential_starts)
            for i in range(length):
                row = start_row + i
                corrupted_test_df.iloc[row, col_idx] = np.nan
                available_rows.remove(row)
            
            block_created = True
            
        if not block_created:
            print(f"센서 {col}에서 {length} 길이의 블록 생성 실패")

actual_missing = corrupted_test_df.isna().sum().sum()
expected_missing = int(n_missing_in_sensor * len(test_df.columns))
print(f"생성된 결측치 개수: {actual_missing}, 예상 결측치 개수: {expected_missing}")
print(failed_dict)

In [None]:
n_outliers = int(total_points * outlier_ratio)  # 비율에 따른 이상치 개수

#--------------------------------------------------------------------------

# 결측치(NaN) 위치 찾기
nan_mask = corrupted_test_df.isna().to_numpy()
nan_indices = np.where(nan_mask.flatten())[0]  # 1차원 배열에서 NaN 위치 인덱스

# 모든 가능한 인덱스에서 결측치 위치를 제외하고 유효한 인덱스만 선택
all_indices = np.arange(total_points)
valid_indices = np.setdiff1d(all_indices, nan_indices)

# 유효한 인덱스 중에서 이상치 개수만큼 무작위 선택
if len(valid_indices) < n_outliers:
    print(f"경고: 충분한 유효 위치가 없습니다. 필요: {n_outliers}, 가용: {len(valid_indices)}")
    n_outliers = len(valid_indices)

#--------------------------------------------------------------------------

random_indices = np.random.choice(total_points, n_outliers, replace=False)
rows = random_indices // corrupted_test_df.shape[1]
cols = random_indices % corrupted_test_df.shape[1]

# 선택된 인덱스에 이상치 데이터 삽입
for i in range(n_outliers):
    # t_outliers에서 랜덤하게 하나 선택
    outlier_value = np.random.choice(t_outliers)
    r, c = rows[i], cols[i]
    corrupted_test_df.iloc[r, c] = outlier_value

In [None]:
print(f"원본 test_df 크기: {test_df.shape}")
print(f"손상된 test_df 크기: {corrupted_test_df.shape}")
print(f"추가된 이상치 개수: {n_outliers} (비율: {outlier_ratio:.4f})")
print(f"추가된 결측치 개수: {n_missing} (비율: {missing_ratio:.4f})")
print(f"손상된 데이터의 실제 결측치 비율: {corrupted_test_df.isna().sum().sum() / corrupted_test_df.size:.4f}")

## 최종 데이터 생성

### 결측치 및 이상치 대체를 위한 데이터

In [None]:
srep_df = pd.concat([training_df, corrupted_test_df], axis=0, copy=True)
print(srep_df.shape)
print(srep_df.index.is_monotonic_increasing)
srep_df

### 예측 모델 테스트를 위한 데이터

In [None]:
training_df

### 테스트 데이터

In [None]:
test_df

In [None]:
test_df.to_hdf(os.path.join(PREDICTION_OUTPUT_DIR, "test.h5"), key="data")
print("테스트 데이터 저장 완료 여부:")
os.path.exists(os.path.join(PREDICTION_OUTPUT_DIR, "test.h5"))

## Outlier 처리

In [None]:
outlier_processors: List[OutlierProcessor] = [
    HourlyInSensorZscoreOutlierProcessor(),
    InSensorZscoreOutlierProcessor(),
    WinsorizedOutlierProcessor(),
    TrimmedMeanOutlierProcessor(),
    MADOutlierProcessor(),
]

base_name = "base_alt"
outlier_processors[0].name = "hzscore"
outlier_processors[1].name = "zscore"
outlier_processors[2].name = "winsor"
outlier_processors[3].name = "trimm"
outlier_processors[4].name = "mad"

### 단순 보정 성능 측정용 데이터

In [None]:
srep_df.to_hdf(os.path.join(OUTLIER_STEST_DATA_DIR, f"{base_name}.h5"), key="data")

srep_outlier_result_paths = remove_outliers(
    data=srep_df,
    outlier_processors=outlier_processors,
    output_dir=OUTLIER_STEST_DATA_DIR,
)
srep_outlier_result_paths

In [None]:
srep_outlier_results = get_data_list(OUTLIER_STEST_DATA_DIR)
srep_outlier_dfs = [tdata.data for tdata in srep_outlier_results]
len(srep_outlier_dfs)

### 예측 모델용 데이터

In [None]:
training_df.to_hdf(os.path.join(OUTLIER_PTEST_DATA_DIR, f"{base_name}.h5"), key="data")

pred_outlier_result_paths = remove_outliers(
    data=training_df,
    outlier_processors=outlier_processors,
    output_dir=OUTLIER_PTEST_DATA_DIR,
)
pred_outlier_result_paths

In [None]:
pred_outlier_results = get_data_list(OUTLIER_PTEST_DATA_DIR)
pred_outlier_dfs = [tdata.data for tdata in pred_outlier_results]
len(pred_outlier_dfs)

### Outlier 처리 결과 확인 및 시각화

In [None]:
idx = -1
outlier_type = 0

In [None]:
target_df = srep_outlier_dfs[outlier_type]
while idx < target_df.shape[1] - 1:
    idx += 1
    target = target_df.iloc[:, idx]
    source = raw_df[target.name]
    source = source.loc[target.index.min():target.index.max()]
    

    if source.isna().sum() == target.isna().sum():
        # 결측치 처리가 되지 않은 경우는 제외
        continue

    visualize_df_comparison(source, target, title=f"Source vs Target: {source.name}")
    break


In [None]:
ratio = 0.75
target_df_idx = 2

too_many_missing_sensors = set()

df = None
target_df = None
for idx, data in enumerate(pred_outlier_results):
    df = data.data
    na_ratio = df.isna().mean()
    sensors_less_than_list = na_ratio <= ratio
    too_many_na_sensors = sensors_less_than_list[~sensors_less_than_list].index

    if idx == target_df_idx:
        target_df = df[too_many_na_sensors]

    sensors_less_than = sensors_less_than_list.sum()
    total_sensors = df.shape[1]
    print(
        f"{data.path} 결측치 비율 {ratio} 이하 센서: {sensors_less_than} / {total_sensors} ({sensors_less_than/total_sensors:.2%})"
    )
    print(", ".join(too_many_na_sensors[:5]), "...", f"{len(too_many_na_sensors)}개")

    too_many_missing_sensors.update(too_many_na_sensors)

len(too_many_missing_sensors)

#### 제대로 보간되지 않을 것으로 예상되는 데이터

Outlier 제거된 데이터에서 결측치가 너무 많이 관찰되는 센서 시각화

In [None]:
idx_a = -1

In [None]:
while idx_a < target_df.shape[1] - 1:
    idx_a += 1
    target = target_df.iloc[:, idx_a]
    source = raw_df[target.name]
    source = source.loc[target.index.min() : target.index.max()]

    if source.isna().sum() == target.isna().sum():
        # 결측치 처리가 되지 않은 경우는 제외
        continue

    print(idx_a)
    visualize_df_comparison(source, target, title=f"Source vs Target: {source.name}")
    break
else:
    print("모든 센서에 대해 비교 완료")

## 결측치 처리

In [None]:
interpolators: List[Interpolator] = [
    LinearInterpolator(),
    SplineLinearInterpolator(),
    TimeMeanFillInterpolator(),
    MonthlyMeanFillInterpolator(),
    ShiftFillInterpolator(periods=7 * 24),
]

interpolators[0].name = "linear"
interpolators[1].name = "spline"
interpolators[2].name = "time_mean"
interpolators[3].name = "monthly_mean"
interpolators[4].name = "week_shift"

In [None]:
def show_nans(data: pd.Series, idx: int, resample: str = "3d"):
    target_s_na = pd.Series(index=data.index)
    target_s_na[data.isna()] = 0
    # print(target_s_na.loc[target_s_na == 0])

    target_gs = pd.Series(index=data.index)
    target_resampled = data.resample("3d").mean()
    target_gs.loc[target_resampled.index] = target_resampled

    vis_df_comparison_line_scatter(target_gs, target_s_na)

### 단순 보정 성능 측정용 데이터

In [None]:
srep_result_paths = interpolate(srep_outlier_results, interpolators, INTERPOLATED_STEST_DATA_DIR)
len(srep_result_paths)

In [None]:
srep_results = get_data_list(INTERPOLATED_STEST_DATA_DIR)
srep_dfs = [tdata.data for tdata in srep_results]
len(srep_dfs)

#### 보간되지 못한 데이터 확인

In [None]:
for data in srep_results:
    # 각 열의 결측치 개수를 먼저 계산
    na_counts_per_col = data.data.isna().sum()
    cols_with_missings = (na_counts_per_col > 0).sum()
    total_cols = data.data.shape[1]
    missing_ratio = cols_with_missings / total_cols * 100
    
    print(f"{data.path}: 결측치가 있는 열 {cols_with_missings}/{total_cols} ({missing_ratio:.2f}%)")

In [None]:
idx = -1
srep_type = 1

In [None]:
print(srep_results[srep_type].path)
while idx < srep_dfs[srep_type].shape[1] - 1:
    idx += 1
    target_s = srep_dfs[srep_type].iloc[:, idx]
    
    if target_s.isna().sum() == 0:
        continue

    show_nans(target_s, idx)
    break


### 예측 모델용 데이터

In [None]:
pred_result_paths = interpolate(pred_outlier_results, interpolators, INTERPOLATED_PTEST_DATA_DIR)
len(pred_result_paths)

In [None]:
pred_results = get_data_list(INTERPOLATED_PTEST_DATA_DIR)
pred_dfs = [tdata.data for tdata in pred_results]
len(srep_dfs)

#### 보간되지 못한 데이터 확인

In [None]:
for data in pred_results:
    # 각 열의 결측치 개수를 먼저 계산
    na_counts_per_col = data.data.isna().sum()
    cols_with_missings = (na_counts_per_col > 0).sum()
    total_cols = data.data.shape[1]
    missing_ratio = cols_with_missings / total_cols * 100
    
    print(f"{data.path}: 결측치가 있는 열 {cols_with_missings}/{total_cols} ({missing_ratio:.2f}%)")

In [None]:
idx = -1
pred_type = 4

In [None]:
print(srep_results[pred_type].path)
while idx < pred_dfs[pred_type].shape[1] - 1:
    idx += 1
    target_s = pred_dfs[pred_type].iloc[:, idx]

    if target_s.isna().sum() == 0:
        continue

    # x축은 target_s.index, y축은 NaN이 있는 경우 0, 아닌 경우 NaN
    target_s_na = pd.Series(index=target_s.index)
    target_s_na[target_s.isna()] = 0
    # print(target_s_na.loc[target_s_na == 0])

    target_gs = pd.Series(index=target_s.index)
    target_resampled = target_s.resample("3d").mean()
    target_gs.loc[target_resampled.index] = target_resampled

    vis_df_comparison_line_scatter(target_gs, target_s_na)
    break

## 보간 성능 검증

In [None]:
srep_targets = {data.path: data.data.loc["2024-10-01":"2024-11-01", :] for data in srep_results}
srep_targets[list(srep_targets.keys())[0]]


In [None]:
test_df

In [None]:
def calculate_metrics(y_true: pd.DataFrame, y_pred: pd.DataFrame, skip_na: bool = True):
    """
    두 데이터프레임 간의 MAE와 RMSE를 계산
    
    Parameters:
    -----------
    y_true : pd.DataFrame - 참값 데이터프레임 (test_df)
    y_pred : pd.DataFrame - 예측값 데이터프레임
    skip_na : bool - True면 NaN 값은 계산에서 제외
    
    Returns:
    --------
    dict - 전체 및 센서별 메트릭 결과
    """
    # Shape 확인
    if y_true.shape != y_pred.shape:
        raise ValueError(f"데이터프레임 shape이 일치하지 않습니다: {y_true.shape} vs {y_pred.shape}")
    
    # 전체 데이터에 대한 메트릭
    if skip_na:
        # NaN이 있는 위치는 계산에서 제외
        mask = ~(y_true.isna() | y_pred.isna())
        true_values = y_true.values[mask.values]
        pred_values = y_pred.values[mask.values]
    else:
        true_values = y_true.values.flatten()
        pred_values = y_pred.values.flatten()
    
    global_mae = mean_absolute_error(true_values, pred_values)
    global_rmse = root_mean_squared_error(true_values, pred_values)
    
    # 센서별 메트릭 계산
    per_sensor_metrics = {}
    for col in y_true.columns:
        if skip_na:
            mask = ~(y_true[col].isna() | y_pred[col].isna())
            col_true = y_true.loc[mask, col]
            col_pred = y_pred.loc[mask, col]
        else:
            col_true = y_true[col].dropna()
            col_pred = y_pred[col].dropna()
            
        if len(col_true) > 0:
            mae = mean_absolute_error(col_true, col_pred)
            rmse = root_mean_squared_error(col_true, col_pred)
            per_sensor_metrics[col] = {'mae': mae, 'rmse': rmse, 'count': len(col_true)}
    
    return {
        'global': {'mae': global_mae, 'rmse': global_rmse, 'count': len(true_values)},
        'per_sensor': per_sensor_metrics
    }

def visualize_metrics_comparison(metrics_results, title="모델 성능 비교"):
    """
    여러 모델의 메트릭을 시각화
    
    Parameters:
    -----------
    metrics_results : dict - 모델 이름을 키로, calculate_metrics()의 결과를 값으로 하는 딕셔너리
    title : str - 그래프 제목
    """
    
    # 모델별 전역 메트릭 추출
    model_names = list(metrics_results.keys())
    mae_values = [metrics['global']['mae'] for metrics in metrics_results.values()]
    rmse_values = [metrics['global']['rmse'] for metrics in metrics_results.values()]
    
    # 시각화
    fig, axs = plt.subplots(1, 2, figsize=(16, 6), dpi=120)
    
    # MAE 그래프
    sns.barplot(x=model_names, y=mae_values, ax=axs[0], palette="viridis")
    axs[0].set_title('모델별 MAE 비교', fontsize=14)
    axs[0].set_ylabel('MAE', fontsize=12)
    axs[0].set_ylim(bottom=0)
    
    # RMSE 그래프
    sns.barplot(x=model_names, y=rmse_values, ax=axs[1], palette="viridis")
    axs[1].set_title('모델별 RMSE 비교', fontsize=14)
    axs[1].set_ylabel('RMSE', fontsize=12)
    axs[1].set_ylim(bottom=0)
    
    # 전체 그래프 타이틀
    plt.suptitle(title, fontsize=16, y=1.05)
    plt.tight_layout()
    
    # 수치 표시
    for i, ax in enumerate([axs[0], axs[1]]):
        values = mae_values if i == 0 else rmse_values
        for j, v in enumerate(values):
            ax.text(j, v + v*0.01, f'{v:.4f}', ha='center', fontsize=10)
    
    return fig

# 사용 예시
# 여러 모델의 결과를 test_df와 비교
def compare_models_with_test(test_df, prediction_dfs):
    """
    여러 모델의 예측값을 test_df와 비교하여 메트릭 계산 및 시각화
    
    Parameters:
    -----------
    test_df : 참값 데이터프레임
    prediction_dfs : dict - 모델 이름을 키로, 예측 데이터프레임을 값으로 하는 딕셔너리
    """
    metrics_results = {}
    
    # 각 모델별 메트릭 계산
    for model_name, pred_df in prediction_dfs.items():
        metrics = calculate_metrics(test_df, pred_df)
        metrics_results[model_name] = metrics
        
        # 개별 모델 결과 출력
        print(f"\n{model_name}:")
        print(f"  MAE: {metrics['global']['mae']:.4f}")
        print(f"  RMSE: {metrics['global']['rmse']:.4f}")
        print(f"  유효 샘플 수: {metrics['global']['count']}")
    
    # 모델 비교 시각화
    visualize_metrics_comparison(metrics_results)
    
    # 센서별 성능 데이터프레임 생성
    sensor_metrics = {}
    for model_name, metrics in metrics_results.items():
        for sensor, sensor_metric in metrics['per_sensor'].items():
            if sensor not in sensor_metrics:
                sensor_metrics[sensor] = {}
            sensor_metrics[sensor][f"{model_name}_MAE"] = sensor_metric['mae'] 
            sensor_metrics[sensor][f"{model_name}_RMSE"] = sensor_metric['rmse']
    
    return pd.DataFrame(sensor_metrics).T, metrics_results

# 사용 예시:
# prediction_dfs = {
#     "모델1": model1_df,
#     "모델2": model2_df,
#     "모델3": model3_df
# }
# sensor_metrics_df, all_metrics = compare_models_with_test(test_df, prediction_dfs)


In [None]:
sensor_metrics_df, all_metrics = compare_models_with_test(test_df, srep_targets)

In [None]:
def calc_all_metrics(y_true: pd.DataFrame, y_pred: pd.DataFrame, skip_na: bool = True):
    # Shape 확인
    if y_true.shape != y_pred.shape:
        raise ValueError(f"데이터프레임 shape이 일치하지 않습니다: {y_true.shape} vs {y_pred.shape}")
    
    # 전체 데이터에 대한 메트릭
    if skip_na:
        # NaN이 있는 위치는 계산에서 제외
        mask = ~(y_true.isna() | y_pred.isna())
        true_values = y_true.values[mask.values]
        pred_values = y_pred.values[mask.values]
    else:
        true_values = y_true.values.flatten()
        pred_values = y_pred.values.flatten()
    
    mae = mean_absolute_error(true_values, pred_values)
    rmse = root_mean_squared_error(true_values, pred_values)

    return mae, rmse

def calc_each_metrics(y_true: pd.DataFrame, y_pred: pd.DataFrame, skip_na: bool = True):
    # Shape 확인
    if y_true.shape != y_pred.shape:
        raise ValueError(f"데이터프레임 shape이 일치하지 않습니다: {y_true.shape} vs {y_pred.shape}")
    
    # 센서별 메트릭 계산
    per_sensor_metrics = {}
    for col in y_true.columns:
        if skip_na:
            mask = ~(y_true[col].isna() | y_pred[col].isna())
            col_true = y_true.loc[mask, col]
            col_pred = y_pred.loc[mask, col]
        else:
            col_true = y_true[col].dropna()
            col_pred = y_pred[col].dropna()
            
        if len(col_true) > 0:
            mae = mean_absolute_error(col_true, col_pred)
            rmse = root_mean_squared_error(col_true, col_pred)
            per_sensor_metrics[col] = {'mae': mae, 'rmse': rmse, 'count': len(col_true)}
    
    return per_sensor_metrics

In [None]:
mae_list = []
rmse_list = []

for model_name, pred_df in tqdm(srep_targets.items()):
    mae, rmse = calc_all_metrics(test_df, pred_df)
    mae_list.append((mae, model_name))
    rmse_list.append((rmse, model_name))

mae_list.sort(key=lambda x: x[0])
rmse_list.sort(key=lambda x: x[0])

In [None]:
sensor_mae_dict = {}
sensor_rmse_dict = {}

for model_name, pred_df in tqdm(srep_targets.items()):
    results = calc_each_metrics(test_df, pred_df)

    mae_result = {col: result['mae'] for col, result in results.items()}
    rmse_result = {col: result['rmse'] for col, result in results.items()}
    sensor_mae_dict[os.path.basename(model_name)] = mae_result
    sensor_rmse_dict[os.path.basename(model_name)] = rmse_result

In [None]:
pd.DataFrame(sensor_mae_dict).T.to_excel("./output/srep_sensor_mae.xlsx")
pd.DataFrame(sensor_rmse_dict).T.to_excel("./output/srep_sensor_rmse.xlsx")

In [None]:
mae_list

### 결과 분석

In [None]:
sensor_id = "1690003706"
strue = test_df[sensor_id]
strue.name = sensor_id + "(True)"
s1 = srep_targets["./output/interpolated/stest/hzscore-time_mean.h5"][sensor_id]
s2 = srep_targets["./output/interpolated/stest/mad-linear.h5"][sensor_id]

plt.figure(figsize=(12, 6))
s1.plot()
strue.plot()
plt.legend()
plt.show()

plt.figure(figsize=(12, 6))
s2.plot()
strue.plot()
plt.legend()
plt.show()