In [157]:
import pandas as pd
import numpy as np
import os

In [None]:
def data_preprocess(subject_id):
    """
    데이터 로드 및 안드로이드 데이터와 센서 데이터의 싱크 맞추기 위한 함수
    """
    android_csv_columns = [
        'Time(ms)', 'fps', 'BPM', 'Confidence', 'Stress', 'rRSP', 'SPO2', 'VLF', 
        'LF', 'HF', 'TP', 'LF/HF', 'NNI Mean', 'NNI20', 'SDNN', 'RMSSD', 'SBP',
        'DBP', 'Age', 'Gender'
    ]

    # Time(ms)는 그대로, 나머지는 android_ 접두사 붙이기
    renamed_columns = [
        col if col == 'Time(ms)' else f'android_{col}'
        for col in android_csv_columns
    ]

    df_android = pd.read_csv(f"./smu_data/android_data/mobile_subject{subject_id:02d}_android.csv", names=renamed_columns)
    df_sensor = pd.read_csv(f"./smu_data/sensor_data_concat/subject{subject_id:02d}_sensors.csv")

    # 센서 데이터 나노초 → 밀리초로 변환 후 데이터형식 맞추기
    df_sensor['Time(ms)'] = (df_sensor['Time(ns)'] / 1_000_000).astype('int64')
    df_android['Time(ms)'] = df_android['Time(ms)'].astype('int64')

    # 정렬
    df_sensor = df_sensor.sort_values('Time(ms)')
    df_android = df_android.sort_values('Time(ms)')

    # 1초 이내 가장 가까운 시간 기준으로 병합
    merged_df = pd.merge_asof(
        df_sensor,
        df_android,
        on='Time(ms)',
        direction='nearest',
        tolerance=1000 # 1000ms = 1초
    )

    # merged_df['Sensor Datetime'] = pd.to_datetime(merged_df['Time(ns)'], unit='ns')
    # merged_df['Android Datetime'] = pd.to_datetime(merged_df['Time(ms)'], unit='ms')

    # merge 데이터에서 안드로이드 데이터만 추출
    matched_android_df = merged_df[renamed_columns].copy()
    matched_android_df = matched_android_df.dropna(axis=0)
    matched_android_df = matched_android_df.reset_index(drop=True)
    matched_android_df.columns = android_csv_columns

    # merge 데이터에서 센서 데이터 추출
    matched_sensor_df = merged_df[df_sensor.columns]
    matched_sensor_df = matched_sensor_df.loc[matched_android_df.index]

    matched_sensor_df = matched_sensor_df.reset_index(drop=True)
    matched_sensor_df['SBP'] = df_sensor['SBP'][:2]
    matched_sensor_df['DBP'] = df_sensor['DBP'][:2]
    matched_sensor_df['rRSP'] = df_sensor['rRSP'][:10]

    return matched_sensor_df, matched_android_df

def bpm_error(df_sensor, df_app, cutoff=0):
    """
    BPM 오차 계산 함수 - PTE6
    - cutoff: 각 데이터프레임의 앞 부분 행을 자르기 위한 변수
    """
    # app의 BPM이 0인 데이터는 제외
    select_app_df = df_app[df_app['BPM'] > 0]['BPM']
    select_senson_df = df_sensor['BPM'].loc[df_app[df_app['BPM'] > 0]['BPM'].index]

    # 앞 몇개 데이터 날리기
    select_app_df = select_app_df[cutoff:]
    select_senson_df = select_senson_df[cutoff:]

    abs_error = np.abs(select_senson_df - select_app_df)
    tolerance = 6
    masked_error = np.where(abs_error <= tolerance, 0, abs_error)
    pte6_mae = np.mean(masked_error)

    return pte6_mae

def sbp_error(df_sensor, df_app):
    """
    SBP 오차 계산 함수
    - sensor 데이터 -> 평균
    - app 데이터 -> 중앙값
    """
    # app의 SBP가 0 이하인 데이터는 제외
    select_app_df = df_app[df_app['SBP'] > 0]['SBP']

    app_sbp_median = select_app_df.median()
    sensor_sbp_mean = df_sensor['SBP'].mean()

    sbp_error = np.abs(sensor_sbp_mean - app_sbp_median)

    return sbp_error

def dbp_error(df_sensor, df_app):
    """
    DBP 오차 계산 함수
    - sensor 데이터 -> 평균
    - app 데이터 -> 중앙값
    """
    # app의 DBP가 0 이하인 데이터는 제외
    select_app_df = df_app[df_app['DBP'] > 0]['DBP']

    app_dbp_median = select_app_df.median()
    sensor_dbp_mean = df_sensor['DBP'].mean()

    dbp_error = np.abs(sensor_dbp_mean - app_dbp_median)

    return dbp_error

def rrsp_error(df_sensor, df_app):
    """
    rRSP 오차 계산 함수
    - sensor 데이터 -> 중앙값
    - app 데이터 -> 중앙값
    """
    # app의 rRSP가 0 이하인 데이터는 제외
    select_app_df = df_app[df_app['rRSP'] > 0]['rRSP']

    app_rrsp_median = select_app_df.median()
    sensor_rrsp_median = df_sensor['rRSP'].median()

    rsp_error = np.abs(sensor_rrsp_median - app_rrsp_median)

    return rsp_error

def spo2_error(df_sensor, df_app):
    """ 
    SPO2 오차 계산 함수
    - sensor 데이터 -> 평균
    - mobile 데이터 -> 평균
    """
    app_spo2_mean = df_app['SPO2'].mean()
    sensor_spo2_mean = df_sensor['SPO2'].mean()
    
    spo2_error = np.abs(sensor_spo2_mean - app_spo2_mean)

    return spo2_error

def calculate_errors(subject_id):
    """ 
    각 지표의 오차 계산 후 리스트로 정리
    """
    # 데이터 로드 및 전처리
    df_sensor, df_android = data_preprocess(subject_id)

    # 각 지표 오차 계산
    spo2_err = spo2_error(df_sensor, df_android)
    rrsp_err = rrsp_error(df_sensor, df_android)
    dbp_err = dbp_error(df_sensor, df_android)
    sbp_err = sbp_error(df_sensor, df_android)
    bpm_err = bpm_error(df_sensor, df_android)

    mae_values = {
        "spo2_mae": spo2_err,
        "rrsp_mae": rrsp_err,
        "dbp_mae": dbp_err,
        "sbp_mae": sbp_err,
        "bpm_mae": bpm_err
    }

    return mae_values

def calculate_total_mae(errors_list):
    """
    각 지표의 총 MAE 계산
    """
    return np.mean(errors_list)

In [159]:
# 각 subject에 대해 오차 계산 및 MAE 저장
spo2_maes = []
rrsp_maes = []
dbp_maes = []
sbp_maes = []
bpm_maes = []

for subject_id in range(1, 20):  # subject00 제외(센서 데이터 부족 이슈)
    errors = calculate_errors(subject_id)
    spo2_maes.append(np.abs(errors["spo2_mae"]))
    rrsp_maes.append(np.abs(errors["rrsp_mae"]))
    dbp_maes.append(np.abs(errors["dbp_mae"]))
    sbp_maes.append(np.abs(errors["sbp_mae"]))
    bpm_maes.append(np.abs(errors["bpm_mae"]))

# 결과 출력
print(f"Total MAE for SPO2: {calculate_total_mae(spo2_maes)}")
print(f"Total MAE for RRSP: {calculate_total_mae(rrsp_maes)}")
print(f"Total MAE for DBP: {calculate_total_mae(dbp_maes)}")
print(f"Total MAE for SBP: {calculate_total_mae(sbp_maes)}")
print(f"Total MAE for BPM: {calculate_total_mae(bpm_maes)}")


Total MAE for SPO2: 2.4167302128259527
Total MAE for RRSP: 5.190712276257966
Total MAE for DBP: 12.770825342105262
Total MAE for SBP: 20.36311842105263
Total MAE for BPM: 6.720715704168235
