> title : 110_etri_lifelog_dm_cnn-image (이미지 파생변수)  <br>
 -  코드 실행 전 PATH 변경하세요.
  - PATH  =  '/content/drive/MyDrive/data/ch2025_data_items/share/submissions/input'


### 🔨 PATH 설정

In [1]:
PATH  =  '/content/drive/MyDrive/data/ch2025_data_items/share/submissions/input' ### <---- 코드 실행 전 PATH 변경하세요.

In [2]:
# 데이터는 구글드라이브에 저장되어 있어서 구글드라이브 마운트를 합니다.
# 데이터 저장 PATH를 변경하시면 아래 구글드라이브 마운트를 주석처리하시면 됩니다.
from google.colab import drive, files
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### 📦 라이브러리

In [3]:
# 라이브러리 설치
! pip install haversine >/dev/null
! pip install optuna >/dev/null
! pip install category_encoders >/dev/null
! pip install timm >/dev/null

In [4]:
# 기본 내장 라이브러리
import ast
import glob
import os
import random
import re
import sys
from collections import Counter
from datetime import datetime, time, timedelta
from functools import reduce
from math import asin, cos, radians, sin, sqrt

# 경고 무시
import warnings

# 경로 처리
from pathlib import Path

# 데이터 처리
import numpy as np
import pandas as pd

# 시각화
import matplotlib.pyplot as plt
import seaborn as sns

# 거리 계산
from haversine import haversine  # pip install haversine

# 진행률 표시
from tqdm.auto import tqdm

# 차원 축소
from sklearn.decomposition import PCA

# 전처리 및 인코딩
from sklearn.preprocessing import LabelEncoder, StandardScaler
from category_encoders import TargetEncoder

# 모델링 - 사이킷런
from sklearn.metrics import f1_score, roc_auc_score, roc_curve
from sklearn.model_selection import (
    KFold,
    StratifiedKFold,
    cross_val_score,
    train_test_split,
)

# 모델링 - LightGBM
import lightgbm as lgb
from lightgbm import LGBMClassifier, early_stopping, log_evaluation

# 모델링 - PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

# 모델링 - 이미지 분류용
import timm

# 경고 무시
warnings.filterwarnings('ignore')

# pandas 옵션
pd.set_option('display.max_columns', 999)
pd.set_option('display.max_rows', 999)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.float_format', lambda x: '%0.4f' % x)

# seed 고정
SD = 42
random.seed(SD)
np.random.seed(SD)
os.environ['PYTHONHASHSEED'] = str(SD)

In [5]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(1)

### 📦 데이터 읽기

In [6]:
# 1
mACStatus = pd.read_parquet(f'{PATH}/ETRI_lifelog_dataset/ch2025_data_items/ch2025_mACStatus.parquet')
mActivity = pd.read_parquet(f'{PATH}/ETRI_lifelog_dataset/ch2025_data_items/ch2025_mActivity.parquet')
mAmbience = pd.read_parquet(f'{PATH}/ETRI_lifelog_dataset/ch2025_data_items/ch2025_mAmbience.parquet')
mBle = pd.read_parquet(f'{PATH}/ETRI_lifelog_dataset/ch2025_data_items/ch2025_mBle.parquet')
mGps = pd.read_parquet(f'{PATH}/ETRI_lifelog_dataset/ch2025_data_items/ch2025_mGps.parquet')
mLight = pd.read_parquet(f'{PATH}/ETRI_lifelog_dataset/ch2025_data_items/ch2025_mLight.parquet')
mScreenStatus = pd.read_parquet(f'{PATH}/ETRI_lifelog_dataset/ch2025_data_items/ch2025_mScreenStatus.parquet')
mUsageStats = pd.read_parquet(f'{PATH}/ETRI_lifelog_dataset/ch2025_data_items/ch2025_mUsageStats.parquet')
mWifi = pd.read_parquet(f'{PATH}/ETRI_lifelog_dataset/ch2025_data_items/ch2025_mWifi.parquet')
wHr = pd.read_parquet(f'{PATH}/ETRI_lifelog_dataset/ch2025_data_items/ch2025_wHr.parquet')
wLight = pd.read_parquet(f'{PATH}/ETRI_lifelog_dataset/ch2025_data_items/ch2025_wLight.parquet')
wPedo = pd.read_parquet(f'{PATH}/ETRI_lifelog_dataset/ch2025_data_items/ch2025_wPedo.parquet')

# 2
train = pd.read_csv(f'{PATH}/ETRI_lifelog_dataset/ch2025_metrics_train.csv')
test = pd.read_csv(f'{PATH}/ETRI_lifelog_dataset/ch2025_submission_sample.csv')

In [7]:
mACStatus['lifelog_date'] = mACStatus['timestamp'].astype(str).str[:10]
mActivity['lifelog_date'] = mActivity['timestamp'].astype(str).str[:10]

mLight['lifelog_date'] = mLight['timestamp'].astype(str).str[:10]
mScreenStatus['lifelog_date'] = mScreenStatus['timestamp'].astype(str).str[:10]

wHr['lifelog_date'] = wHr['timestamp'].astype(str).str[:10]
wLight['lifelog_date'] = wLight['timestamp'].astype(str).str[:10]
wPedo['lifelog_date'] = wPedo['timestamp'].astype(str).str[:10]

### 📌 이미지 생성
- spleeptime만 추출 (00시부터 06시까지)
- 참고 : https://github.com/seongjiko/Pixleep/tree/master

In [8]:
def filter_by_group_size(df, group_cols=['subject_id', 'lifelog_date']):
    # 그룹별 건수 계산
    group_counts = df.groupby(group_cols).size().reset_index(name='count')
    # 평균 건수 계산
    mean_count = group_counts['count'].mean()
    # 평균 초과 그룹만 추출
    valid_groups = group_counts[group_counts['count'] > mean_count*0.5][group_cols]
    # 원본과 inner join으로 필터링
    return df.merge(valid_groups, on=group_cols, how='inner')

def make_timestamps_unique(df, timestamp_col='timestamp'):
    # 'timestamp' 컬럼을 기준으로 정렬
    df = df.sort_values(by=[timestamp_col])
    # 각 'timestamp'가 중복된 횟수를 세어 나노초 단위로 증가시킴
    df[timestamp_col] = df[timestamp_col] + pd.to_timedelta(df.groupby(timestamp_col).cumcount(), unit='ns')
    return df

def average_list_columns(df, list_columns, pk_cols=['subject_id', 'lifelog_date']):

    for col in list_columns:

        def safe_mean(x):
            if isinstance(x, list):
                return np.mean(x) if len(x) > 0 else np.nan
            elif isinstance(x, (int, float, np.integer, np.floating, type(None))):
                return x
            elif isinstance(x, (np.ndarray, pd.Series)):
                return np.mean(x)
            elif pd.api.types.is_scalar(x) and pd.isna(x):
                return np.nan
            else:
                return np.nan

        df[col] = df[col].apply(safe_mean)

    return df

def center_list_values(df, list_columns):
    for col in list_columns:
        def center(x):
            if isinstance(x, list) and len(x) > 0:
                mean = np.mean(x)
                return [np.round(v - mean,3) for v in x]
            return x  # NaN이나 비리스트는 그대로 유지
        df[col] = df[col].apply(center)
    return df

def sleeptime_cutter(data): # 잠자는 시간 데이터가 더 중요한지 실험(🔥🔥🔥)

    data_filtered = data.copy()
    data_filtered['timestamp'] = pd.to_datetime(data_filtered['timestamp'])
    data_filtered['lifelog_date'] = pd.to_datetime(data_filtered['lifelog_date'])

    # spleeptime만 추출 (00시부터 06시까지)
    data_filtered = data_filtered[(data_filtered['timestamp'].dt.hour >= 0) & (data_filtered['timestamp'].dt.hour < 6)]

    # 하루 차감
    data_filtered['timestamp'] = data_filtered['timestamp'] - pd.Timedelta(days=1)
    data_filtered['lifelog_date'] = data_filtered['lifelog_date'] - pd.Timedelta(days=1)
    # print('>> D-1 하루 차감! (lifelog_date 실제 일자는 D+1 새벽(0~6시) 데이터임)')

    # lifelog_date를 다시 문자열로
    data_filtered['lifelog_date'] = data_filtered['lifelog_date'].dt.date.astype(str)

    return data_filtered

def merge_data_for_group(user, date):

    # 데이터 로드
    # acc_group = mGps.copy()
    activity_group = mActivity.copy()
    hr_group = wHr.copy()
    wPedo_group = wPedo[['subject_id','timestamp','lifelog_date','step']].copy()
    mLight_group = mLight[['subject_id','timestamp','lifelog_date','m_light']].copy()
    wLight_group = wLight[['subject_id','timestamp','lifelog_date','w_light']].copy()

    # 건수가 없는 일자 이상치로 판단하고 제외
    activity_group = filter_by_group_size(activity_group)
    hr_group = filter_by_group_size(hr_group)
    wPedo_group = filter_by_group_size(wPedo_group)
    mLight_group = filter_by_group_size(mLight_group)
    wLight_group = filter_by_group_size(wLight_group)

    # sleeptime만 남기고 나머지 삭제 (🔥🔥🔥)
    activity_group = sleeptime_cutter(activity_group)
    hr_group = sleeptime_cutter(hr_group)
    wPedo_group = sleeptime_cutter(wPedo_group)
    mLight_group = sleeptime_cutter(mLight_group)
    wLight_group = sleeptime_cutter(wLight_group)

    # 필터
    activity_group = activity_group.loc[(activity_group['subject_id']==user) & (activity_group['lifelog_date']==date),:]
    hr_group = hr_group.loc[(hr_group['subject_id']==user) & (hr_group['lifelog_date']==date),:]
    wPedo_group = wPedo_group.loc[(wPedo_group['subject_id']==user) & (wPedo_group['lifelog_date']==date),:]
    mLight_group = mLight_group.loc[(mLight_group['subject_id']==user) & (mLight_group['lifelog_date']==date),:]
    wLight_group = wLight_group.loc[(wLight_group['subject_id']==user) & (wLight_group['lifelog_date']==date),:]

    # 리스트 평균값으로 변환
    # acc_group = average_list_columns(acc_group, ['altitude', 'latitude', 'longitude','speed'])
    hr_group = average_list_columns(hr_group, ['heart_rate'])

    # 'timestamp'를 고유하게 만듦
    # acc_group = make_timestamps_unique(acc_group)
    activity_group = make_timestamps_unique(activity_group)
    hr_group = make_timestamps_unique(hr_group)
    wPedo_group = make_timestamps_unique(wPedo_group)
    mLight_group = make_timestamps_unique(mLight_group)
    wLight_group = make_timestamps_unique(wLight_group)

    # 'timestamp'를 인덱스로 설정하고 'subject_id'와 'date' 컬럼 제거
    # mAcc_data = acc_group.set_index('timestamp').drop(columns=['subject_id', 'lifelog_date']).resample('S').nearest()
    activity_data = activity_group.set_index('timestamp').drop(columns=['subject_id', 'lifelog_date']).resample('S').nearest()
    e4Hr_data = hr_group.set_index('timestamp').drop(columns=['subject_id', 'lifelog_date']).resample('S').nearest()
    wPedo_data = wPedo_group.set_index('timestamp').drop(columns=['subject_id', 'lifelog_date']).resample('S').nearest()
    mLight_data = mLight_group.set_index('timestamp').drop(columns=['subject_id', 'lifelog_date']).resample('S').nearest()
    wLight_data = wLight_group.set_index('timestamp').drop(columns=['subject_id', 'lifelog_date']).resample('S').nearest()

    # 하루 86400초의 타임스탬프 생성
    start_time = datetime.strptime(date, '%Y-%m-%d')
    end_time = start_time + timedelta(days=1)
    all_timestamps = pd.date_range(start=start_time, end=end_time, freq='S', inclusive='left')
    merged_data = pd.DataFrame(index=all_timestamps)
    merged_data.index.name = 'timestamp'

    # 데이터 병합
    # if not mAcc_data.empty:
    #     merged_data = merged_data.join(mAcc_data, how='left')
    if not e4Hr_data.empty:
        merged_data = merged_data.join(e4Hr_data, how='left')
    if not activity_data.empty:
        merged_data = merged_data.join(activity_data, how='left')
    if not wPedo_data.empty:
        merged_data = merged_data.join(wPedo_data, how='left')
    if not mLight_data.empty:
        merged_data = merged_data.join(mLight_data, how='left')
    if not wLight_data.empty:
        merged_data = merged_data.join(wLight_data, how='left')

    # 필요한 컬럼만 유지하고 NaN 값으로 채우기
    # merged_data = merged_data.reindex(columns=['altitude', 'latitude', 'longitude', 'speed', 'heart_rate', 'm_activity', 'step'])
    merged_data = merged_data.reindex(columns=['heart_rate', 'm_activity', 'step', 'm_light', 'w_light'])

    # 선형 보간 적용
    merged_data = merged_data.interpolate(method='time')

    ### Activity 데이터의 그룹화 적용
    # group0 : 0 (IN_VEHICLE), 1 (ON_BICYCLE), 2 (ON_FOOT), 7 (WALKING), 8 (RUNNING), 5 (TILTING)
    # group1 : 3 (STILL)
    # group2 : 4 (UNKNOWN)
    activity_mapping = {
        0: 1,
        1: 1,
        2: 1,
        7: 1,
        8: 2,
        5: 1,
        3: 0,
        4: 0
    }
    merged_data['m_activity'] = merged_data['m_activity'].map(activity_mapping)

    # subject_id와 date를 추가
    merged_data['subject_id'] = user
    merged_data['lifelog_date'] = date

    return merged_data

def plot_time_series(data, user, date, channel_name):

    # x축을 00:00:00부터 23:59:59까지 고정
    total_seconds = 86400
    time_range = pd.date_range(start=datetime.strptime(date, '%Y-%m-%d'), periods=total_seconds, freq='S')

    # 데이터를 시간 단위로 정렬
    data = data.reindex(time_range)

    # 시계열 이미지 생성
    fig, axes = plt.subplots(5, 1, figsize=(5, 5), sharex=True, facecolor='black')
    fig.patch.set_facecolor('black')

    for ax in axes:
        ax.set_facecolor('black')
        ax.spines['top'].set_visible(False)           # Hide the top spine
        ax.spines['right'].set_visible(False)         # Hide the right spine
        ax.spines['left'].set_visible(False)          # Hide the left spine
        ax.spines['bottom'].set_visible(False)        # Hide the bottom spine

    # 설정한 시간 범위에 맞게 x축 설정
    for ax in axes:
        ax.set_xlim([time_range[0], time_range[-1]])

    # plot
    if 'heart_rate' in data.columns:
        axes[0].plot(data.index, data['heart_rate'], color='white')
    if 'm_activity' in data.columns:
        axes[1].plot(data.index, data['m_activity'], color='white')
    if 'step' in data.columns:
        axes[2].plot(data.index, data['step'], color='white')
    if 'm_light' in data.columns:
        axes[3].plot(data.index, data['m_light'], color='white')
    if 'w_light' in data.columns:
        axes[4].plot(data.index, data['w_light'], color='white')

    plt.tight_layout()  # Make the layout tight
    fname = f'{PATH}/{channel_name}/user{user}_{date}_{channel_name}.png'
    plt.savefig(fname)
    # print(fname)
    # plt.show()

In [None]:
%%time

channel_name = 'ch5_sleeptime'

# train test 데이터 합치기
a1 = train[['subject_id', 'lifelog_date']].copy()
a2 = test[['subject_id', 'lifelog_date']].copy()
val_df = pd.concat([a1,a2]).reset_index(drop=True)
print('# train:',len(train))
print('# test:',len(test))
print('# 전체 데이터:',len(val_df))

# 파일명
val_df = val_df[['subject_id', 'lifelog_date']].copy()
val_df['filename'] = val_df.apply(lambda x: f"user{x['subject_id']}_{x['lifelog_date']}_{channel_name}.png", axis=1)

# 만들어진 이미지
image_dir = f'{PATH}/{channel_name}'
image_files = [f for f in os.listdir(image_dir) if f.endswith(f'_{channel_name}.png')]

# 남은 샘플
val_df = val_df.loc[~val_df['filename'].isin(image_files),:].reset_index(drop=True)
print('# 남은 샘플수:',len(val_df))

# ====================================
# 샘플 테스트
# ====================================
# rules = (
#   (val_df['subject_id']=='id01') & (val_df['lifelog_date'].isin(['2024-07-01']))
# )
# val_df = val_df.loc[rules,:].copy().head(1)

# 이미지 생성
bar = tqdm(range(val_df.shape[0]))
for idx in bar:
    user, date, *rest = val_df.iloc[idx].values
    bar.set_description(f'user: {user}, date: {date}')
    merged_data = merge_data_for_group(user, date)
    plot_time_series(merged_data, user, date, channel_name)

# train: 450
# test: 250
# 전체 데이터: 700
# 남은 샘플수: 520


  0%|          | 0/520 [00:00<?, ?it/s]

In [None]:
from PIL import Image
import torchvision.transforms as transforms

# 정사각형 패딩 후, 299x299로 리사이즈 (Xception 기준)
def preprocess_image(img):
    # 정사각형 패딩
    w, h = img.size
    max_dim = max(w, h)
    padded_img = Image.new("RGB", (max_dim, max_dim), (127, 127, 127))  # 중간값 127로 채움
    padded_img.paste(img, ((max_dim - w) // 2, (max_dim - h) // 2))

    # 리사이즈 및 [-1, 1] 정규화
    transform = transforms.Compose([
        transforms.Resize((299, 299)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5]*3, std=[0.5]*3)  # [0,1] -> [-1,1]
    ])
    return transform(padded_img)

In [None]:
channel_name = 'ch5_sleeptime'

# 이미지 경로 설정
dataset_path = f'{PATH}/{channel_name}'

# 이미지 크기 설정 (Resize에 사용할 값)
image_size = 500

def find_img_mean_std(dataset_path,image_size):

  import torch
  import os
  from torchvision import transforms
  from PIL import Image

  # 전처리 파이프라인 (Normalize 제외)
  transform = transforms.Compose([
      transforms.Resize(image_size),
      transforms.ToTensor(),  # [0, 255] -> [0, 1]로 스케일링
  ])

  # PNG 파일 목록 가져오기
  image_files = [os.path.join(dataset_path, f) for f in os.listdir(dataset_path) if f.endswith('.png')]

  # 이미지 로드 및 텐서 변환
  images = []
  for img_file in image_files:
      try:
          img = Image.open(img_file).convert('RGB')  # RGB로 강제 변환
          tensor_img = transform(img)  # [C, H, W] 형태
          images.append(tensor_img)
      except Exception as e:
          print(f"이미지 로드 실패: {img_file} - {e}")

  # 모든 이미지를 하나의 텐서로 결합
  # shape: [N, C, H, W] (N: 이미지 수, C: 채널, H: 높이, W: 너비)
  all_images = torch.stack(images, dim=0)

  # 채널별 평균 및 표준편차 계산
  # 평균: [N, C, H, W] → [C,] (모든 이미지, 모든 픽셀에 대한 평균)
  # 표준편차: 동일한 방식으로 계산
  mean = all_images.mean(dim=[0, 2, 3])  # [C,] (예: [R, G, B])
  std = all_images.std(dim=[0, 2, 3])    # [C,] (예: [R, G, B])

  # 결과 출력
  print("평균(mean):", mean.tolist())
  print("표준편차(std):", std.tolist())

  return mean.tolist(), std.tolist()

img_mean, img_std = find_img_mean_std(dataset_path,image_size)

In [None]:
def extract_cnn_features(
    image_root_dir,
    img_mean, img_std,
    batch_size=32,
    image_size=(500, 500),
    model_name='resnet50'
):
    # 이미지 확장자 허용 목록
    valid_exts = {'.png'}

    # 이미지 경로 수집
    def collect_image_paths(root_dir):
        image_paths = []
        for root, _, files in os.walk(root_dir):
            for fname in files:
                if os.path.splitext(fname)[1].lower() in valid_exts:
                    image_paths.append(os.path.join(root, fname))
        return image_paths

    # Dataset 정의
    class ImageDataset(Dataset):
        def __init__(self, image_paths, transform=None):
            self.image_paths = image_paths
            self.transform = transform

        def __len__(self):
            return len(self.image_paths)

        def __getitem__(self, idx):
            path = self.image_paths[idx]
            image = Image.open(path).convert('RGB')
            if self.transform:
                image = self.transform(image)
            return image, os.path.relpath(path)

    # Transform & 모델
    transform = transforms.Compose([
        transforms.Resize(image_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=img_mean, std=img_std)
    ])
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Xception 모델 불러오기
    model = timm.create_model(model_name, pretrained=True)

    # 모든 레이어 fine-tuning 가능하게 설정
    for param in model.parameters():
        param.requires_grad = True

    # feature extractor 만들기 (특징 추출 전단만 사용 시)
    features = list(model.children())[:-1]  # 마지막 분류기 제거
    # features.extend([
    #     torch.nn.Flatten(),
    #     torch.nn.Linear(model.num_features, nfeatures)  # model.num_features → 2048
    # ])

    feature_extractor = torch.nn.Sequential(*features).to(device)
    feature_extractor.eval()

    # 데이터로더 생성
    image_paths = collect_image_paths(image_root_dir)
    dataset = ImageDataset(image_paths, transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

    # Feature 추출
    all_features = []
    all_names = []
    with torch.no_grad():
        for imgs, names in tqdm(dataloader):
            imgs = imgs.to(device)
            feats = feature_extractor(imgs)
            feats = feats.view(feats.size(0), -1).cpu()
            all_features.append(feats)
            all_names.extend(names)

    features_tensor = torch.cat(all_features, dim=0)
    df = pd.DataFrame(features_tensor.numpy())
    df.insert(0, 'image_path', all_names)

    return df

In [None]:
%%time

model_name = 'resnet50'

# 이미지 파생변수 생성
img_features = extract_cnn_features(
    image_root_dir=f'{PATH}/{channel_name}',
    img_mean=img_mean, img_std=img_std,
    batch_size = 32,
    image_size = (500, 500),
    model_name = model_name
)

# check
print('# img_features.shape:',img_features.shape)
img_features.head()

### 📌 PCA 결과 저장

In [None]:
for n_featurse in [5,10]:

  print(f"# n_features: {n_featurse}")
  pca = PCA(n_components=n_featurse)

  a1 = img_features.set_index(['image_path'])
  X_pca = pca.fit_transform(a1)

  X_pca = pd.DataFrame(X_pca, columns=[str(i) for i in range(n_featurse)])
  X_pca['image_path'] = img_features['image_path']

  # 저장
  fname = f"{PATH}/img_features_{channel_name}_{model_name}_{n_featurse}.csv"
  X_pca.to_csv(fname, index=False)
  print(f">> Features saved to: {fname}")

  # check
  display(X_pca.head(1))