# Data Preprocessing

S1-S4 Column Prediction model "MultiOutputClassifier" 전용 전처리 코드입니다.

In [1]:
import datetime
import glob
import os

import pandas as pd
from tqdm import tqdm

코드 실행 전에 다음 파일들이 `./data` 경로에 포함되어야 합니다.

```
.
├── [4.0K]  answer_sample.csv
├── [9.5K]  README_2020.txt
├── [4.0K]  test
│   ├── [627M]  ch2024_test__m_acc_part_5.parquet.gzip
│   ├── [1.1G]  ch2024_test__m_acc_part_6.parquet.gzip
│   ├── [1.0G]  ch2024_test__m_acc_part_7.parquet.gzip
│   ├── [935M]  ch2024_test__m_acc_part_8.parquet.gzip
│   ├── [912K]  ch2024_test__m_activity.parquet.gzip
│   ├── [3.2M]  ch2024_test__m_ambience.parquet.gzip
│   ├── [ 12M]  ch2024_test__m_gps.parquet.gzip
│   ├── [ 93K]  ch2024_test__m_light.parquet.gzip
│   ├── [203K]  ch2024_test__m_usage_stats.parquet.gzip
│   ├── [935K]  ch2024_test__w_heart_rate.parquet.gzip
│   ├── [106K]  ch2024_test__w_light.parquet.gzip
│   └── [909K]  ch2024_test__w_pedo.parquet.gzip
├── [4.0K]  train
│   ├── [4.0K]  user01
│   ├── [4.0K]  user02
│   ├── [4.0K]  user03
│   ├── [4.0K]  user04
│   ├── [4.0K]  user05
│   ├── [4.0K]  user06
│   ├── [4.0K]  user07
│   ├── [4.0K]  user08
│   ├── [4.0K]  user09
│   ├── [4.0K]  user10
│   ├── [4.0K]  user11
│   ├── [4.0K]  user12
│   ├── [4.0K]  user21
│   ├── [4.0K]  user22
│   ├── [4.0K]  user23
│   ├── [4.0K]  user24
│   ├── [4.0K]  user25
│   ├── [4.0K]  user26
│   ├── [4.0K]  user27
│   ├── [4.0K]  user28
│   ├── [4.0K]  user29
│   ├── [4.0K]  user30
│   ├── [1.1K]  user_info_2020.csv
│   ├── [ 73K]  user_sleep_2020.csv
│   └── [ 89K]  user_survey_2020.csv
├── [ 18K]  train_label.csv
├── [4.0K]  val
│   ├── [1.3G]  ch2024_val__m_acc_part_1.parquet.gzip
│   ├── [563M]  ch2024_val__m_acc_part_2.parquet.gzip
│   ├── [662M]  ch2024_val__m_acc_part_3.parquet.gzip
│   ├── [838M]  ch2024_val__m_acc_part_4.parquet.gzip
│   ├── [853K]  ch2024_val__m_activity.parquet.gzip
│   ├── [3.7M]  ch2024_val__m_ambience.parquet.gzip
│   ├── [ 15M]  ch2024_val__m_gps.parquet.gzip
│   ├── [ 88K]  ch2024_val__m_light.parquet.gzip
│   ├── [192K]  ch2024_val__m_usage_stats.parquet.gzip
│   ├── [925K]  ch2024_val__w_heart_rate.parquet.gzip
│   ├── [101K]  ch2024_val__w_light.parquet.gzip
│   └── [901K]  ch2024_val__w_pedo.parquet.gzip
└── [2.9K]  val_label.csv
```

In [2]:
# 경로 설정
data_dir = "../data"  # 데이터 저장 경로
preprocessed_dir_moc = "../data_preprocessed_moc"  # 전처리 완료된 데이터를 저장할 경로

## 1. Train

In [None]:
# user time series concat
for user_folder in filter(os.path.isdir, sorted(glob.glob(os.path.join(data_dir, "train", "user*")))):
    user_id = os.path.basename(user_folder)
    
    for session_folder in tqdm(sorted(glob.glob(os.path.join(user_folder, "*")))):
        session_id = os.path.basename(session_folder)
        
        file_name = f"{user_id}_{datetime.datetime.fromtimestamp(int(session_id), datetime.timezone(datetime.timedelta(hours=9))).strftime('%Y%m%d')}.csv"
        if os.path.join(preprocessed_dir_moc, "train", file_name) in glob.glob(os.path.join(preprocessed_dir_moc, "train", "*")):
            print(f"Skipping {file_name} because it already exists")
            continue
        session_df = None
        for sensor_folder in filter(os.path.isdir, sorted(glob.glob(os.path.join(session_folder, "*")))):
            sensor_type = os.path.basename(sensor_folder)

            df = None

            for timestamp_csv in sorted(glob.glob(os.path.join(sensor_folder, "*.csv"))):
                tmp = pd.read_csv(timestamp_csv)
                # timestamp 열에 파일 이름 숫자를 더하기
                tmp["timestamp"] = tmp["timestamp"].apply(lambda x: x + float(os.path.basename(timestamp_csv).split(".")[0]))
                tmp["time"] = tmp["timestamp"].apply(lambda x: datetime.datetime.fromtimestamp(x, datetime.timezone(datetime.timedelta(hours=9))))
                if df is None:
                    df = tmp
                else:
                    df = pd.concat([df, tmp])
            if df is None:
                continue
            columns_order = ['time'] + [col for col in df.columns if col != 'time' and col != 'timestamp']
            df = df[columns_order]
            df.columns = [sensor_type + "_" + col if col != 'time' else col for col in df.columns]
            df.set_index("time", inplace=True)

            grouped = df.resample('1min')

            mean_df = grouped.mean()
            var_df = grouped.var()

            result_df = mean_df.join(var_df, lsuffix='_mean', rsuffix='_var')
        
            if session_df is None:
                session_df = result_df
            else:
                session_df = pd.concat([session_df, result_df], axis=1)
        # save file
        os.makedirs(os.path.join(preprocessed_dir_moc, "train", "before_label"), exist_ok=True)

        new_filename = f"{user_id}_{result_df.index[0].strftime('%Y%m%d')}.csv"
        session_df.to_csv(os.path.join(preprocessed_dir_moc, "train", "before_label", new_filename))
    print(f"{user_id} preprocess done")

In [8]:
def simplify_column_name(column_name):
    parts = column_name.split('_')
    if len(parts) >= 3 and parts[1] == parts[2]:
        return '_'.join([parts[0], parts[1], parts[3]])
    return column_name

In [None]:
for user_folder in filter(os.path.isdir, sorted(glob.glob(os.path.join(data_dir, "train", "user*")))):
    user_id = os.path.basename(user_folder)
    
    for session_folder in tqdm(sorted(glob.glob(os.path.join(user_folder, "*")))):
        session_id = os.path.basename(session_folder)
        session_date = datetime.datetime.fromtimestamp(int(session_id), datetime.timezone(datetime.timedelta(hours=9))).strftime('%Y%m%d')

        train_df = pd.read_csv(os.path.join(preprocessed_dir_moc, "train", "before_label", f"{user_id}_{session_date}.csv"), index_col=0)
        train_df.index = pd.to_datetime(train_df.index)

        train_action_df = pd.read_csv(os.path.join(session_folder, f"{session_id}_label.csv"))
        train_action_df.index = train_action_df["ts"].apply(lambda x: datetime.datetime.fromtimestamp(x, datetime.timezone(datetime.timedelta(hours=9))))
        train_action_df.index = pd.to_datetime(train_action_df.index)
        
 
        merged = pd.merge(train_df, train_action_df, how='outer', left_index=True, right_index=True)

        os.makedirs(os.path.join(preprocessed_dir_moc, "train", "after_label"), exist_ok=True)
        merged.to_csv(os.path.join(preprocessed_dir_moc, "train", "after_label", f"{user_id}_{session_date}.csv"))

In [15]:
for files in sorted(glob.glob(os.path.join(preprocessed_dir_moc, "train", "after_label", "user*"))):
    df = pd.read_csv(files, index_col=0)
    tst = ["s_" + col[2:].lower() if col.startswith("e4")
           else "m_" + col[1:].lower() if col.startswith("m")
           else col for col in df.columns]
    tst = [simplify_column_name(col) for col in tst]
    df.rename(columns=dict(zip(df.columns, tst)), errors='ignore', inplace=True)
    df.to_csv(files)

In [38]:
def process_train_data(user_id, train_directory):
    train_csv = [f for f in os.listdir(train_directory) if f.startswith(f'user{user_id:02d}') and f.endswith('.csv')]
    train_user = pd.DataFrame()
    
    for file in train_csv:
        train_df = pd.read_csv(os.path.join(train_directory, file))
        train_user = train_df if train_df.empty else pd.concat([train_user, train_df], ignore_index=True)
    # print(train_user.columns)
    train_user['timestamp'] = pd.to_datetime(train_user["time"])
    train_user['date'] = train_user['timestamp'].dt.date
    
    daily_summary = train_user.groupby('date').agg({
        'm_acc_x_mean': 'mean',
        'm_acc_x_var': 'var',
        'm_acc_y_mean': 'mean',
        'm_acc_y_var': 'var',
        'm_acc_z_mean': 'mean',
        'm_acc_z_var': 'var',
        'activity': 'mean',
        's_hr_mean': 'mean',
        'm_gps_lat_mean': 'mean',
        'm_gps_lon_mean': 'mean'
    }).reset_index()

    daily_summary['user_id'] = int(user_id)

    return daily_summary

In [81]:
# user01부터 user30까지 반복하면서 각 사용자의 데이터를 처리
all_users_summary = pd.DataFrame()

for user_id in tqdm(range(1, 31)):
    try:
        user_summary = process_train_data(user_id, os.path.join(preprocessed_dir_moc, "train", "after_label"))
        all_users_summary = pd.concat([all_users_summary, user_summary], ignore_index=True)
    except Exception as e:
        print(e)

 40%|████      | 12/30 [00:05<00:09,  1.92it/s]

'time'
'time'
'time'
'time'
'time'
'time'
'time'
'time'


100%|██████████| 30/30 [00:09<00:00,  3.12it/s]


In [82]:
train_label = pd.read_csv(os.path.join(data_dir, 'train_label.csv'), index_col=0)
train_label["user_id"] = train_label["subject_id"].apply(lambda x: int(x.split("r")[1]))

train_label["date"] = pd.to_datetime(train_label["date"]).dt.date
all_users_summary["date"] = pd.to_datetime(all_users_summary["date"]).dt.date

# train_label과 all_users_summary 병합
merged = train_label.merge(all_users_summary, on=['date', 'user_id'], how='right')

# 필요없는 컬럼 제거
all_users_summary = merged.drop(['Q1', 'Q2', 'Q3', 'S1', 'S2', 'S3', 'S4'], axis=1)
all_users_summary.dropna(subset=['subject_id'], inplace=True)
all_users_summary.drop(['subject_id'], axis=1, inplace=True)

merged.drop(['user_id', 'm_acc_x_mean', 'm_acc_x_var', 'm_acc_y_mean',
       'm_acc_y_var', 'm_acc_z_mean', 'm_acc_z_var', 'activity', 's_hr_mean',
       'm_gps_lat_mean', 'm_gps_lon_mean'], axis=1, inplace=True)


# CSV 파일로 저장
all_users_summary.to_csv(os.path.join(preprocessed_dir_moc, 'train_final', 'train_user.csv'), index=False)
merged.to_csv(os.path.join(preprocessed_dir_moc, 'train_final', 'train_label.csv'), index=False)

## 2. Validation

In [3]:
val_dir = os.path.join(data_dir, "val")

In [4]:
def simplify_column_name_part(column_name):
    parts = column_name.split('_')
    if len(parts) >= 5 and column_name.startswith("m_acc"):
        return '_'.join([parts[0], parts[1], parts[4]])
    return column_name

def split_sensor_by_user(val_dict, sensor_type, agg_dict=None, interpolate=False, is_data_num=False, preprocess=True, rename_col=None, user_split=True):
    if user_split:
        grouped = val_dict[sensor_type].groupby('subject_id')
    else:
        grouped = [(sensor_type[-1], val_dict[sensor_type])]
    sensor_type_rename = "s_" + sensor_type[2:] if sensor_type.startswith("w_") else sensor_type
    for name, group in grouped:
        if sensor_type_rename == "m_activity":
            group["m_activity"] = group["m_activity"].apply(lambda x : int(x))
        if sensor_type_rename == "m_ambience":
            # 각 array 중 array[i][1]이 0.1 이상인 것의 array[i][0]을 return
            group["ambience_labels"] = group["ambience_labels"].apply(lambda x : [i for i in x if float(i[1]) >= 0.3])
        if sensor_type_rename == "m_usage_stats":
            group['m_usage_stats'] = group['m_usage_stats'].apply(lambda x: [[i["app_name"], i["total_time"]] for i in x])

        del group['subject_id']
        group.set_index('timestamp', inplace=True)
        if len(group.columns) > 1:
            group.columns = [sensor_type_rename + "_" + col if col != 'timestamp' else col for col in group.columns]
        else:
            group.columns = [sensor_type_rename]

        if preprocess:
            group = group[~group.index.duplicated(keep='first')]
            if interpolate:
                # resampled = group.resample('1min').interpolate("linear") if is_data_num else group.resample('1min').bfill()
                resampled = group.resample('1min').first()

            else:
                agg_functions = {col: ['mean', 'var'] for col in group.columns} if agg_dict is None else agg_dict
                resampled = group.resample('1min').agg(agg_functions)
                resampled.columns = rename_col if rename_col is not None else [f'{simplify_column_name_part(i[0])}_{i[1]}' for i in resampled.columns]
                
        else:
            resampled = group
        if user_split:
            val_dict[f'{sensor_type_rename}_part_{name}'] = resampled
        else:
            val_dict[sensor_type_rename] = resampled
    if user_split:
        del grouped, resampled, val_dict[sensor_type]

In [5]:
val_files = glob.glob(os.path.join(val_dir, "*"))

val_dict = {}

for val in tqdm(val_files):
    extracted_text = (os.path.basename(val).split('__'))[1].split('.')[0]
    val_dict[extracted_text] = pd.read_parquet(val)

# 전처리 수행
## 1초 이하 단위로 기록되어 있는 것들
split_sensor_by_user(val_dict, "m_activity", {'m_activity': [lambda x: x.mode()[0] if not x.empty else None, "var"]}, rename_col=["m_activity_mode", "m_activity_var"])
split_sensor_by_user(val_dict, "w_light", rename_col=["s_light_mean", "s_light_var"])
split_sensor_by_user(val_dict, "w_pedo", None, preprocess=False)
for k in val_dict.keys():
    if k.startswith("m_acc"):
        split_sensor_by_user(val_dict, k, user_split=False)

## 1초보다 큰 단위로 기록되어 있는 것들
split_sensor_by_user(val_dict, "w_heart_rate", None, interpolate=True, is_data_num=True)
split_sensor_by_user(val_dict, "m_light", None, interpolate=True, is_data_num=True)
split_sensor_by_user(val_dict, "m_usage_stats", None, interpolate=True, is_data_num=False)
split_sensor_by_user(val_dict, "m_ambience", None, interpolate=True, is_data_num=False)
split_sensor_by_user(val_dict, "m_gps", None, interpolate=True, is_data_num=True)

100%|██████████| 12/12 [00:16<00:00,  1.41s/it]


In [6]:
merged_parts = {}

for i in range(1, 5):
    part_data = [df for key, df in val_dict.items() if key.endswith(f"part_{i}")]
    for idx, part in enumerate(part_data):
        if idx == 0:
            result_df = part
            continue
        result_df = pd.merge(result_df, part, how='outer', left_index=True, right_index=True)
    merged_parts[f"part_{i}"] = result_df

In [7]:
for part in merged_parts.keys():
    daily_groups = merged_parts[part].resample('1D')

    for date, group in daily_groups:
        user_number = "{:02}".format(int(part.split("_")[-1]))
        file_name = f'user_{user_number}_{date.strftime("%Y%m%d")}.csv'
        os.makedirs(os.path.join(preprocessed_dir_moc, "val"), exist_ok=True)
        group.to_csv(os.path.join(preprocessed_dir_moc, "val", file_name))

In [8]:
for user in range(1, 5):
    user_df = pd.DataFrame()
    for user_data in glob.glob(os.path.join(preprocessed_dir_moc, "val", f"user_{user:02d}_*")):
        user_partial = pd.read_csv(user_data)
        user_df = user_partial if user_df.empty else pd.concat([user_df, user_partial])
    os.makedirs(os.path.join(preprocessed_dir_moc, "val_after"), exist_ok=True)
    
    user_df.to_csv(os.path.join(preprocessed_dir_moc, "val_after", f'val_{user:02d}.csv'), index=False)
    user_df = pd.read_csv(os.path.join(preprocessed_dir_moc, "val_after", f'val_{user:02d}.csv'))

    user_df['timestamp'] = pd.to_datetime(user_df['timestamp'])

    user_df['date'] = user_df['timestamp'].dt.date

    daily_summary = user_df.groupby('date').agg({
        'm_acc_x_mean': 'mean',
        'm_acc_x_var': 'mean',
        'm_acc_y_mean': 'mean',
        'm_acc_y_var': 'mean',
        'm_acc_z_mean': 'mean',
        'm_acc_z_var': 'mean',
        'm_activity_mode': 'mean',
        'm_activity_var': 'mean',
        's_light_mean': 'mean',
        's_pedo_step_frequency': 'sum',
        's_pedo_walking_steps': 'sum',
        's_heart_rate': 'mean',
        'm_light': 'mean',
        'm_gps_altitude': 'mean',
        'm_gps_latitude': 'mean',
        'm_gps_longitude': 'mean',
        'm_gps_speed': 'mean'
    }).reset_index()

    os.makedirs(os.path.join(preprocessed_dir_moc, "val_final"), exist_ok=True)
    daily_summary.to_csv(os.path.join(preprocessed_dir_moc, "val_final", f'val_{user:02d}.csv'), index=False)

In [9]:
# Load the data file
val_label_df = pd.read_csv(os.path.join(data_dir, "val_label.csv"))

# Group by 'subject_id' and save each group to a separate CSV file without the 'subject_id' column
for subject_id, group_df in val_label_df.groupby('subject_id'):
    group_df.drop(columns=['subject_id']).to_csv(os.path.join(preprocessed_dir_moc, "val_final", f'val_{subject_id:02d}_label.csv'), index=False)

for user in range(1, 5):
    val_user = pd.read_csv(os.path.join(preprocessed_dir_moc, "val_final", f'val_{user:02d}.csv'))
    val_label = pd.read_csv(os.path.join(preprocessed_dir_moc, "val_final", f'val_{user:02d}_label.csv'))

    merged_df = pd.merge(val_user, val_label, on='date')

    merged_df.to_csv(os.path.join(preprocessed_dir_moc, "val_final", f'val_{user:02d}_user.csv'), index=False)

## 3. Test

In [10]:
test_dir = os.path.join(data_dir, "test")

In [11]:
test_files = glob.glob(os.path.join(test_dir, "*"))

test_dict = {}

for test in tqdm(test_files):
    extracted_text = (os.path.basename(test).split('__'))[1].split('.')[0]
    test_dict[extracted_text] = pd.read_parquet(test)

# 데이터 전처리
## 1초 이하 단위로 기록되어 있는 것들
split_sensor_by_user(test_dict, "m_activity", {'m_activity': [lambda x: x.mode()[0] if not x.empty else None, "var"]}, rename_col=["m_activity_mode", "m_activity_var"])
split_sensor_by_user(test_dict, "w_light")
split_sensor_by_user(test_dict, "w_pedo", None, preprocess=False)
for k in test_dict.keys():
    if k.startswith("m_acc"):
        split_sensor_by_user(test_dict, k, user_split=False)

## 1초보다 큰 단위로 기록되어 있는 것들
split_sensor_by_user(test_dict, "w_heart_rate", None, interpolate=True, is_data_num=True)
split_sensor_by_user(test_dict, "m_light", None, interpolate=True, is_data_num=True)
split_sensor_by_user(test_dict, "m_usage_stats", None, interpolate=True, is_data_num=False)
split_sensor_by_user(test_dict, "m_ambience", None, interpolate=True, is_data_num=False)
split_sensor_by_user(test_dict, "m_gps", None, interpolate=True, is_data_num=True)

100%|██████████| 12/12 [00:18<00:00,  1.54s/it]


In [12]:
merged_parts = {}

for i in range(5, 9):
    part_data = [df for key, df in test_dict.items() if key.endswith(f"part_{i}")]
    result_df = part_data[0]

    for part in part_data[1:]:
        result_df = pd.merge(result_df, part, how='outer', left_index=True, right_index=True)
    merged_parts[f"part_{i}"] = result_df

In [13]:
for part in merged_parts.keys():
    daily_groups = merged_parts[part].resample('1D')

    for date, group in daily_groups:
        user_number = "{:02}".format(int(part.split("_")[-1]))
        file_name = f'user_{user_number}_{date.strftime("%Y%m%d")}.csv'
        os.makedirs(os.path.join(preprocessed_dir_moc, "test"), exist_ok=True)
        group.to_csv(os.path.join(preprocessed_dir_moc, "test", file_name))

In [14]:
for user in range(5, 9):
    user_df = pd.DataFrame()
    for user_data in glob.glob(os.path.join(preprocessed_dir_moc, "test", f"user_{user:02d}_*")):
        user_partial = pd.read_csv(user_data)
        user_df = user_partial if user_df.empty else pd.concat([user_df, user_partial])
    os.makedirs(os.path.join(preprocessed_dir_moc, "test_after"), exist_ok=True)
    
    user_df.to_csv(os.path.join(preprocessed_dir_moc, "test_after", f'test_{user:02d}.csv'), index=False)
    user_df = pd.read_csv(os.path.join(preprocessed_dir_moc, "test_after", f'test_{user:02d}.csv'))

    user_df['timestamp'] = pd.to_datetime(user_df['timestamp'])

    user_df['date'] = user_df['timestamp'].dt.date

    daily_summary = user_df.groupby('date').agg({
        'm_acc_x_mean': 'mean',
        'm_acc_x_var': 'mean',
        'm_acc_y_mean': 'mean',
        'm_acc_y_var': 'mean',
        'm_acc_z_mean': 'mean',
        'm_acc_z_var': 'mean',
        'm_activity_mode': 'mean',
        'm_activity_var': 'mean',
        's_light_mean': 'mean',
        's_pedo_step_frequency': 'sum',
        's_pedo_walking_steps': 'sum',
        's_heart_rate': 'mean',
        'm_light': 'mean',
        'm_gps_altitude': 'mean',
        'm_gps_latitude': 'mean',
        'm_gps_longitude': 'mean',
        'm_gps_speed': 'mean'
    }).reset_index()

    os.makedirs(os.path.join(preprocessed_dir_moc, "test_final"), exist_ok=True)
    daily_summary.to_csv(os.path.join(preprocessed_dir_moc, "test_final", f'test_{user:02d}_user.csv'), index=False)

최종 결과물에서 다음 파일을 활용하였습니다.

`../data_preprocessed_moc/train_final/train_label.csv`
`../data_preprocessed_moc/train_final/train_user.csv`

`../data_preprocessed_moc/val_final/val_01_user.csv`
`../data_preprocessed_moc/val_final/val_02_user.csv`
`../data_preprocessed_moc/val_final/val_03_user.csv`
`../data_preprocessed_moc/val_final/val_04_user.csv`

`../data_preprocessed_moc/test_final/test_05_user.csv`
`../data_preprocessed_moc/test_final/test_06_user.csv`
`../data_preprocessed_moc/test_final/test_07_user.csv`
`../data_preprocessed_moc/test_final/test_08_user.csv`