# 0. 설정 / 스키마
## 0.1 센서/이벤트 매핑(코어/세션/이동/메타)

In [1]:
import pandas as pd
import numpy as np
from pathlib import Path
from typing import List, Dict, Optional, Iterable, Tuple

DATA_DIR = Path("../data/logs")
CSV_PATHS = sorted(DATA_DIR.glob("*.csv"))

print("N_USERS:", len(CSV_PATHS))
print("SAMPLE FILES:", [p.name for p in CSV_PATHS[:5]])

## 0.1 센서/이벤트 매핑
# CORE_SENSORS는 EDA에서 확인된, 실제 분석이 가능한 최소 집합입니다.
CORE_SENSORS = ["Screen", "UserAct"]      # "의지/행동 기반"
PASSIVE_SENSORS = ["Notif"]              # 외부자극
AUX_SENSORS = ["Unlock"]                 # 세션 시작 의지(있으면 쓰고, 없으면 0)

EVENT_MAP: Dict[str, str] = {
    # 1. 핵심 행동 (UserAct는 활동의 '양'을 측정)
    "USER_INTERACTION": "UserAct",
    
    # 2. 화면 상태 (Screen은 활동의 '기간'과 '세션'을 측정)
    "SCREEN_INTERACTIVE": "Screen",
    "SCREEN_NON_INTERACTIVE": "Screen",
    
    # 3. 알림 (외부 자극)
    "NOTIF_INTERRUPTION": "Notif",
    
    # 4. 잠금 해제 (Unlock은 세션의 '정교한 시작점' 및 '의지'를 측정)
    # EDA 데이터에는 없을 수 있으므로 "보조적"으로 활용
    "KEYGUARD_HIDDEN": "Unlock"
}

# 공개데이터 sensor_id(대문자)도 같이 매핑되게 추가
EVENT_MAP.update({
    "SCREEN": "Screen",
    "USERACT": "UserAct",
    "NOTIF": "Notif",
    "UNLOCK": "Unlock",        # 혹시 공개데이터에 있으면
    "KEYGUARD_HIDDEN": "Unlock",  # 안드로이드용도 유지
})

# 분석 그룹별 대상 정의
# Rhythm과 Gap은 '의도적 사용'을 판별하기 위해 Unlock을 포함
RHYTHM_SENSORS = ["Screen", "UserAct", "Unlock"]
GAP_SENSORS    = ["Screen", "UserAct", "Unlock"]

# App 로그를 대신할 세션 기준: 화면 켜짐/꺼짐을 세션의 경계로 사용
SESSION_EVENTS = ["SCREEN_INTERACTIVE", "SCREEN_NON_INTERACTIVE"]

MOBILITY_EVENTS = ["CELL_CHANGE", "WIFI_SSID"]
META_EVENT_TYPE = "HEARTBEAT"

# Step이 없을 경우 공개데이터의 Acc_Avg를 활동량으로 대체 사용
STEP_SENSOR_ALIASES = ["Step_Count", "Step", "Steps", "STEP_COUNT", "STEP", "Acc_Avg"]

print("✅ Schema constants ready (Practical & Android Aligned)")

N_USERS: 342
SAMPLE FILES: ['u072142.csv', 'u0cba7d.csv', 'u11abb4.csv', 'u11cef8.csv', 'u11cf22.csv']
✅ Schema constants ready (Practical & Android Aligned)


## 0.2 QC 임계값 분리(core/rhythm/gap/meta)

In [2]:
# core/rhythm/gap 최소 데이터량 기준
MIN_DAILY_EVENTS   = 50
MIN_RHYTHM_EVENTS  = 50
MIN_GAP_EVENTS     = 50

# 의미있는 공백 기준 -> 낮잠, 외출 등
GAP_THR_HOURS      = 2.0
# 매우 긴 공백 기준 -> 고입, 무기력 등
GAP_THR_6HOURS     = 6.0

# meta(heartbeat) 최소 기준
MIN_HEARTBEAT_PER_DAY = 1

# 네트워크 환경이 불안정
RETRY_WARN_THR     = 3
QUEUE_WARN_THR     = 20

# timezone 변화 감지(분) -> 해외여행
TZ_CHANGE_THR_MINUTES = 60

# "부분수집" 후보(코어가 너무 적은 날) -> 로그가 10개 미만
PARTIAL_CORE_MIN_EVENTS = 10

# [v3] Baseline 오염 방지용 Winsorize 범위
WINSORIZE_LOWER = 0.005
WINSORIZE_UPPER = 0.995

## 0.3 컬럼 표준화 헬퍼

In [3]:
EVENT_COL_CANDIDATES = [
    "event_name","sensor_id","event",
    "name","action","type_name","eventCode","event_id",
    "event_type"
]

TS_COL_CANDIDATES = [
    "ts_raw", "timestamp", "ts",
    "time", "event_time", "created_at", "occurred_at",
    "client_ts", "client_time"
]

HEARTBEAT_TS_CANDIDATES = [
    "timestamp", "ts_raw", "ts",
    "client_ts", "client_time",
    "time", "event_time", "created_at", "occurred_at"
]

BASE_USECOLS = [
    "uuid",

    # event 후보
    "sensor_id", "event", "event_name", "event_type",
    "name", "action", "type_name", "eventCode", "event_id",

    # ts 후보
    "ts_raw", "timestamp", "ts",
    "time", "event_time", "created_at", "occurred_at",
    "client_ts", "client_time",

    # meta / heartbeat
    "type", "queue_size", "retry_count", "tz_offset_minutes", "client_last_event_ts",

    # mobility
    "cell_lac", "coarse_gps", "wifi_ssid", "lat", "lon",

    # steps
    "value", "step_count",
]

# 1. 데이터 로드 및 환경 설정
## 1.1 공통 수치 유틸

In [4]:
# 활동의 '불균형도(엔트로피)'를 계산
def entropy_from_counts(counts: np.ndarray) -> float:
    s = float(np.nansum(counts))
    if s <= 0:
        return np.nan
    p = counts / s
    p = p[p > 0]
    return float(-(p * np.log(p)).sum())

# 여러 명의 데이터를 합칠 때 발생하는 '에러'를 방지
def safe_concat(frames: List[pd.DataFrame], cols: List[str]) -> pd.DataFrame:
    """빈 리스트면 빈 DF 반환. (컬럼 스키마 보장)"""
    if not frames:
        return pd.DataFrame(columns=cols)
    return pd.concat(frames, ignore_index=True)

## 1.2 컬럼 선택/표준화 유틸

In [5]:
# 파일의 헤더만 읽어옴
def read_header_cols(path: Path) -> List[str]:
    return pd.read_csv(path, nrows=0).columns.tolist()

# 여러 후보 이름 중 이 파일에 '진짜 있는 이름'을 하나 골라냄
def pick_first_existing_col(cols: List[str], candidates: List[str]) -> Optional[str]:
    s = set(cols)
    for c in candidates:
        if c in s:
            return c
    return None

# 제각각인 컬럼명들을 '표준 이름'으로 통일
def standardize_event_ts_cols(
    df: pd.DataFrame,
    event_candidates: List[str],
    ts_candidates: List[str],
    out_event_col: str = "event_std",
    out_ts_col: str = "ts_std",
) -> pd.DataFrame:
    """
    event 후보 컬럼 중 하나를 event_std로, ts 후보 컬럼 중 하나를 ts_std로 복사.
    - event_std: raw 이벤트명 (string)
    - ts_std   : raw timestamp (ms/sec/datetime/string 가능)
    """
    cols = df.columns.tolist()
    e_col = pick_first_existing_col(cols, event_candidates)
    t_col = pick_first_existing_col(cols, ts_candidates)

    if e_col is None:
        df[out_event_col] = pd.Series([pd.NA] * len(df), dtype="string")
    else:
        df[out_event_col] = df[e_col].astype("string")
        df[out_event_col] = df[out_event_col].where(df[out_event_col].notna(), pd.NA)
    
    df[out_event_col] = (
        df[out_event_col].astype("string")
        .str.strip()
        .str.upper()
    )

    if t_col is None:
        df[out_ts_col] = np.nan
    else:
        df[out_ts_col] = df[t_col]

    return df

# 숫자 형태의 시간을 우리가 읽기 편한 '날짜와 시간'으로 변환
def to_dt_date_hour_from_ms(
    df: pd.DataFrame,
    ts_col: str = "ts_std",
    dt_col: str = "dt",
) -> pd.DataFrame:
    """
    ts(ms/sec) or datetime/string -> dt/date/hour 파생.
    - numeric이면 자릿수 기반으로 sec vs ms 자동 판별
    - 아니면 일반 to_datetime
    """
    if ts_col not in df.columns:
        df[dt_col] = pd.NaT
        return df

    x = df[ts_col]

    if pd.api.types.is_numeric_dtype(x):
        xn = pd.to_numeric(x, errors="coerce")
        if xn.dropna().empty:
            df[dt_col] = pd.NaT
            return df

        med_len = xn.dropna().astype("int64").astype(str).str.len().median()
        if pd.notna(med_len) and med_len <= 10:
            dt = pd.to_datetime(xn, unit="s", errors="coerce")
        else:
            dt = pd.to_datetime(xn, unit="ms", errors="coerce")
    else:
        dt = pd.to_datetime(x, errors="coerce")

    df = df.assign(**{dt_col: dt}).dropna(subset=[dt_col])
    if df.empty:
        return df

    df["date"] = df[dt_col].dt.normalize()
    df["hour"] = df[dt_col].dt.hour
    return df

# 복잡한 원본 로그 이름을 우리가 정한 '쉬운 카테고리'로 바꿈 ('SCREEN_INTERACTIVE' -> 'Screen')
def map_event_to_cat(event_std: pd.Series, event_map: Dict[str, str]) -> pd.Series:
    e = event_std.astype("string").str.strip().str.upper()
    return e.map(event_map).astype("string")

## 1.3 chunk 로딩 (있는 컬럼만 읽기)

In [6]:
# 대용량 파일을 메모리 방지를 위해 30만 줄씩 끊어서 읽기
def read_in_chunks(
    path: Path,
    usecols_candidates: List[str],
    chunksize: int = 300_000,
) -> Iterable[pd.DataFrame]:
    
    cols = read_header_cols(path)
    usecols = [c for c in usecols_candidates if c in cols]
    if not usecols:
        return iter(())
    for chunk in pd.read_csv(path, usecols=usecols, chunksize=chunksize, low_memory=False):
        if chunk is None or chunk.empty:
            continue
        yield chunk

## 1.4 chunk 전처리: 표준화 + dt/date/hour 파생 + uuid 부여

In [7]:
# 이름 표준화, 시간 변환, 사용자ID 부여, 카테고리 매핑을 한 번에 수행
def preprocess_chunk(
    chunk: pd.DataFrame,
    uid: str,
    event_candidates: List[str],
    ts_candidates: List[str],
    event_map: Optional[Dict[str, str]] = None,
) -> pd.DataFrame:
    if event_map is None:
        event_map = EVENT_MAP

    if chunk is None or chunk.empty:
        return pd.DataFrame()

    # 1) 표준 컬럼(event_std, ts_std)
    chunk = standardize_event_ts_cols(
        chunk,
        event_candidates=event_candidates,
        ts_candidates=ts_candidates,
        out_event_col="event_std",
        out_ts_col="ts_std",
    )

    # 2) dt/date/hour
    chunk = to_dt_date_hour_from_ms(chunk, ts_col="ts_std", dt_col="dt")
    if chunk.empty:
        return chunk

    # 3) uuid 결정 (StringDtype 유지)
    if "uuid" in chunk.columns:
        chunk["uuid"] = chunk["uuid"].astype("string")
        chunk["uuid"] = chunk["uuid"].fillna(uid)
    else:
        chunk["uuid"] = pd.Series([uid] * len(chunk), dtype="string")

    # 4) ✅ event_cat 생성 (raw 이벤트 -> 카테고리)
    #    - core/rhythm/gap는 event_cat 기준으로 갈거라서 여기서 붙여두는게 제일 안정적
    chunk["event_cat"] = map_event_to_cat(chunk["event_std"], event_map)

    return chunk

## 1.5 이벤트 필터링 helper

In [8]:
# 원본 이벤트명(raw)이 리스트에 포함된 행만 추출
def filter_by_events(df: pd.DataFrame, events: List[str]) -> pd.DataFrame:
    if df is None or df.empty:
        return pd.DataFrame(columns=df.columns if df is not None else [])
    if "event_std" in df.columns:
        s = df["event_std"].astype("string")
        m = s.isin(events)
    elif "sensor_id" in df.columns:
        s = df["sensor_id"].astype("string")
        m = s.isin(events)
    else:
        return pd.DataFrame(columns=df.columns)
    return df[m].copy()

def filter_by_event_prefix(df: pd.DataFrame, prefixes: List[str]) -> pd.DataFrame:
    """event_std(raw)가 prefixes로 시작하는 행만."""
    if df is None or df.empty:
        return pd.DataFrame(columns=df.columns if df is not None else [])
    s = df["event_std"].astype("string")
    mask = pd.Series(False, index=df.index)
    for p in prefixes:
        mask |= s.str.startswith(p, na=False)
    return df[mask].copy()

# 변환된 카테고리(Screen, UserAct 등)가 리스트에 포함된 행만 추출
def filter_by_cats(df: pd.DataFrame, cats: List[str]) -> pd.DataFrame:
    """event_cat(category)가 cats에 포함되는 행만."""
    if df is None or df.empty:
        return pd.DataFrame(columns=df.columns if df is not None else [])
    if "event_cat" not in df.columns:
        return pd.DataFrame(columns=df.columns)
    s = df["event_cat"].astype("string")
    return df[s.isin(cats)].copy()

## 1.6 하루 단위 집계 편의 함수

In [9]:
def ensure_date_normalized(df: pd.DataFrame) -> pd.DataFrame:
    if df is None or df.empty:
        return df
    df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.normalize()
    return df

# 날짜/시간별 로그 발생 횟수를 요약표로 작성
def group_counts(df: pd.DataFrame, keys: List[str], name: str = "cnt") -> pd.DataFrame:
    """groupby size()를 DataFrame으로 반환."""
    if df is None or df.empty:
        return pd.DataFrame(columns=keys + [name])
    out = df.groupby(keys).size().reset_index(name=name)
    return out

print("✅ FE_v3. Load/Utils ready (event_cat included)")

✅ FE_v3. Load/Utils ready (event_cat included)


# 2. 피처 엔지니어링

In [10]:
from typing import Any

def safe_quantile(x: np.ndarray, q: float) -> float:
    x = np.asarray(x, dtype=float)
    x = x[np.isfinite(x)]
    if x.size == 0:
        return np.nan
    return float(np.quantile(x, q))

## 2.1 활동 기반 피처 (Activity)

In [11]:
# 하루 동안 발생한 핵심 센서(Screen, UserAct 등)의 횟수를 집계
# 각 센서별로 '얼마나 많이' 움직였는지 활동의 총량을 측정
def build_daily_activity(
    csv_paths: List[Path],
    core_sensors: List[str] = CORE_SENSORS,
    chunksize: int = 300_000,
) -> pd.DataFrame:
    rows: List[pd.DataFrame] = []
    usecols = BASE_USECOLS

    for p in csv_paths:
        uid = p.stem
        # 핵심: 청크별 요약본을 담을 리스트
        chunk_summaries: List[pd.DataFrame] = []

        for chunk in read_in_chunks(p, usecols_candidates=usecols, chunksize=chunksize):
            ch = preprocess_chunk(chunk, uid=uid, 
                                  event_candidates=EVENT_COL_CANDIDATES, 
                                  ts_candidates=TS_COL_CANDIDATES)
            if ch.empty: continue

            # 1차 필터링
            ch = filter_by_cats(ch, core_sensors)
            if ch.empty: continue

            # [보완 1] 메모리 절약: 청크 안에서 즉시 그룹화해서 숫자로 변환
            # 행(row)을 들고 있지 말고 '개수'만 들고 있는다.
            summary = ch.groupby(["uuid", "date", "event_cat"]).size().reset_index(name="cnt")
            chunk_summaries.append(summary)

        if not chunk_summaries: continue

        # [보완 2] 합친 후 다시 요약 (청크 간 중복된 date/cat 합치기)
        df_u = pd.concat(chunk_summaries, ignore_index=True)
        daily = (
            df_u.groupby(["uuid", "date", "event_cat"])["cnt"]
                .sum()
                .unstack(fill_value=0)
                .reset_index()
        )

        # [보완 3] 컬럼 보장 (reindex 활용으로 더 깔끔하게)
        # core_sensors 중 로그에 없는 컬럼은 자동으로 0으로 채워짐
        target_cols = ["uuid", "date"] + core_sensors
        for c in core_sensors:
            if c not in daily.columns:
                daily[c] = 0
        
        rows.append(daily[target_cols])

    return safe_concat(rows, ["uuid", "date"] + core_sensors)


## 2.2 리듬 기반 피처 (Rhythm)

In [12]:
def build_daily_rhythm(
    csv_paths: List[Path],
    rhythm_sensors: List[str] = RHYTHM_SENSORS,
    chunksize: int = 300_000,
    min_rhythm_events: int = MIN_RHYTHM_EVENTS,
) -> pd.DataFrame:
    out_rows: List[Dict[str, Any]] = []
    
    # 시간대 정의를 내부 변수로 추출 (유지보수 용이)
    NIGHT_RANGE = (0, 6)
    DAY_RANGE = (6, 18)
    EVE_RANGE = (18, 24)

    for p in csv_paths:
        uid = p.stem
        hour_counts: Dict[pd.Timestamp, np.ndarray] = {}

        for chunk in read_in_chunks(p, usecols_candidates=BASE_USECOLS, chunksize=chunksize):
            ch = preprocess_chunk(chunk, uid=uid, 
                                  event_candidates=EVENT_COL_CANDIDATES, 
                                  ts_candidates=TS_COL_CANDIDATES)
            if ch.empty: continue

            ch = filter_by_cats(ch, rhythm_sensors)
            if ch.empty: continue

            # 효율적인 누적 (이 부분은 기존 로직이 좋습니다)
            grp = ch.groupby(["date", "hour"]).size().reset_index(name="cnt")
            for d, sub in grp.groupby("date"):
                arr = hour_counts.setdefault(d, np.zeros(24, dtype=np.int64))
                arr[sub["hour"].to_numpy()] += sub["cnt"].to_numpy()

        for d in sorted(hour_counts.keys()):
            counts24 = hour_counts[d]
            total = int(counts24.sum())
            
            # [보완] Coverage 체크
            if total < min_rhythm_events:
                out_rows.append({
                    "uuid": uid, "date": d, "rhythm_event_cnt": total,
                    "rhythm_low_coverage": True,
                    **{k: np.nan for k in ["night_ratio", "hour_entropy", 
                                          "day_ratio", "evening_ratio", "peak_hour", "peak_ratio"]}
                })
                continue

            # [보완] 피크 시간 계산 시 활동량 확인
            max_val = np.max(counts24)
            peak_h = int(np.argmax(counts24)) if max_val > 0 else np.nan
            peak_r = float(max_val) / total if total > 0 else np.nan

            # 엔트로피 계산
            h_ent = entropy_from_counts(counts24.astype(float))
            
            # 카운트 미리 계산
            night_cnt = int(counts24[NIGHT_RANGE[0]:NIGHT_RANGE[1]].sum())
            day_cnt   = int(counts24[DAY_RANGE[0]:DAY_RANGE[1]].sum())
            eve_cnt   = int(counts24[EVE_RANGE[0]:EVE_RANGE[1]].sum())

            out_rows.append({
                "uuid": uid, "date": d,
                "rhythm_event_cnt": total,
                # 1. 스무딩된 야간 비율 (주요 지표)
                "night_ratio": (night_cnt + 1) / (total + 24),
                "hour_entropy": h_ent if np.isfinite(h_ent) else np.nan,
                # 2. 단순 비율들 (night_ratio는 삭제하고 day/eve만 남김)
                "day_ratio": float(day_cnt / total),
                "evening_ratio": float(eve_cnt / total),
                "peak_hour": float(peak_h),
                "peak_ratio": float(peak_r),
                "rhythm_low_coverage": False,
            })
    if not out_rows:
        return pd.DataFrame(columns=[
            "uuid","date",
            "rhythm_event_cnt","rhythm_low_coverage",
            "night_ratio","hour_entropy",
            "day_ratio","evening_ratio",
            "peak_hour","peak_ratio",
        ])
    return pd.DataFrame(out_rows)

## 2.3 무활동 기반 피처 (Inactivity / Gap)


In [13]:
def build_daily_gap(
    csv_paths: List[Path],
    gap_sensors: List[str] = GAP_SENSORS,
    chunksize: int = 300_000,
    gap_thr_hours: float = GAP_THR_HOURS,
    gap_thr_6hours: float = GAP_THR_6HOURS,
    min_gap_events: int = MIN_GAP_EVENTS,
    include_overnight: bool = True,
) -> pd.DataFrame:
    out_rows: List[Dict[str, Any]] = []

    for p in csv_paths:
        uid = p.stem
        gaps_by_date = {}
        event_cnt_by_date = {}
        first_dt_by_date = {}
        last_dt_by_date = {}
        # [수정] 청크 간 연속성을 위해 날짜별 마지막 타임스탬프를 엄격히 관리
        last_ts_per_date = {} 

        for chunk in read_in_chunks(p, usecols_candidates=BASE_USECOLS, chunksize=chunksize):
            ch = preprocess_chunk(chunk, uid=uid, 
                                  event_candidates=EVENT_COL_CANDIDATES, 
                                  ts_candidates=TS_COL_CANDIDATES)
            if ch.empty: continue
            ch = filter_by_cats(ch, gap_sensors)
            if ch.empty: continue

            # 청크 내 시간 정렬
            ch = ch.sort_values("dt")
            
            for d, sub in ch.groupby("date"):
                d_idx = pd.Timestamp(d)
                event_cnt_by_date[d_idx] = event_cnt_by_date.get(d_idx, 0) + len(sub)
                
                s_first, s_last = sub["dt"].iloc[0], sub["dt"].iloc[-1]
                
                # 전역 처음/마지막 업데이트
                first_dt_by_date[d_idx] = min(first_dt_by_date.get(d_idx, s_first), s_first)
                last_dt_by_date[d_idx]  = max(last_dt_by_date.get(d_idx, s_last), s_last)

                # [보완] 청크 사이의 Gap 계산 (이전 청크의 마지막 시간 기준)
                if d_idx in last_ts_per_date:
                    cross = (s_first - last_ts_per_date[d_idx]).total_seconds() / 3600.0
                    if cross > 0:
                        gaps_by_date.setdefault(d_idx, []).append(cross)

                # 청크 내부 Gap 계산
                if len(sub) > 1:
                    diffs = sub["dt"].diff().dt.total_seconds().dropna().to_numpy() / 3600.0
                    gaps_by_date.setdefault(d_idx, []).extend(diffs[diffs > 0].tolist())

                # 다음 청크를 위해 현재 청크의 마지막 시간 저장
                last_ts_per_date[d_idx] = s_last

        # Overnight Gap (전일 -> 당일)
        overnight_gap_by_date = {}
        if include_overnight:
            sorted_days = sorted(first_dt_by_date.keys())
            for i in range(1, len(sorted_days)):
                prev_d, curr_d = sorted_days[i-1], sorted_days[i]
                if (curr_d - prev_d).days == 1:
                    og = (first_dt_by_date[curr_d] - last_dt_by_date[prev_d]).total_seconds() / 3600.0
                    if og >= 0: overnight_gap_by_date[curr_d] = og

        # 최종 집계
        for d in sorted(event_cnt_by_date.keys()):
            total = event_cnt_by_date[d]
            og_val = overnight_gap_by_date.get(d, np.nan)
            
            # 기본값 셋업
            res = {
                "uuid": uid, "date": d, "gap_event_cnt": total,
                "overnight_gap": og_val,
                "first_hour": float(first_dt_by_date[d].hour),
                "last_hour": float(last_dt_by_date[d].hour),
                "gap_low_coverage": False
            }

            if total < min_gap_events:
                res.update({"gap_low_coverage": True, "gap_max": np.nan, "gap_p95": np.nan, 
                            "gap_cnt_2h": np.nan, "gap_cnt_6h": np.nan, "gap_long_ratio": np.nan})
            else:
                combined_gaps = gaps_by_date.get(d, []).copy()
                if not np.isnan(og_val): combined_gaps.append(og_val)
                gaps = np.array(combined_gaps, dtype=float)

                if gaps.size == 0:
                    stats = {"gap_max": np.nan, "gap_p95": np.nan, "gap_cnt_2h": 0, "gap_cnt_6h": 0, "gap_long_ratio": 0.0}
                else:
                    l_sum = float(np.sum(gaps[gaps >= gap_thr_hours]))
                    stats = {
                        "gap_max": float(np.max(gaps)),
                        "gap_p95": float(safe_quantile(gaps, 0.95)),
                        "gap_cnt_2h": int(np.sum(gaps >= gap_thr_hours)),
                        "gap_cnt_6h": int(np.sum(gaps >= gap_thr_6hours)),
                        "gap_long_ratio": min(l_sum / 24.0, 1.0)
                    }
                res.update(stats)
            
            out_rows.append(res)
    if not out_rows:
        return pd.DataFrame(columns=[
            "uuid","date","gap_event_cnt","gap_low_coverage",
            "gap_max","gap_p95","gap_cnt_2h","gap_cnt_6h",
            "gap_long_ratio","overnight_gap","first_hour","last_hour"
        ])
    return pd.DataFrame(out_rows)

## 2.4 세션 기반 피처(Session)

In [14]:
# 앱 사용의 시작(RESUMED)과 끝(PAUSED) 사이의 지속 시간 계산
# 한 번 폰을 잡으면 얼마나 오래 쓰는지, 30분 이상의 '긴 세션'은 몇 번인지 집계
def build_daily_session(
    csv_paths: List[Path],
    session_events: List[str] = SESSION_EVENTS,
    chunksize: int = 300_000,
    long_session_sec: int = 30 * 60,
) -> pd.DataFrame:
    out_rows: List[Dict[str, Any]] = []

    START = "SCREEN_INTERACTIVE"
    END   = "SCREEN_NON_INTERACTIVE"

    for p in csv_paths:
        uid = p.stem
        seq_by_date: Dict[pd.Timestamp, List[Tuple[pd.Timestamp, str]]] = {}

        for chunk in read_in_chunks(p, usecols_candidates=BASE_USECOLS, chunksize=chunksize):
            ch = preprocess_chunk(chunk, uid=uid, 
                                  event_candidates=EVENT_COL_CANDIDATES, 
                                  ts_candidates=TS_COL_CANDIDATES)
            if ch.empty: continue
            ch = filter_by_events(ch, session_events)
            if ch.empty: continue

            # 정렬 및 데이터 수집
            ch = ch.sort_values(["date", "dt"])
            for d, sub in ch.groupby("date"):
                d_idx = pd.Timestamp(d)
                seq_by_date.setdefault(d_idx, []).extend(list(zip(sub["dt"], sub["event_std"])))

        for d in sorted(seq_by_date.keys()):
            # [보완] 시계열 순서 보장
            seq = sorted(seq_by_date[d], key=lambda x: x[0])
            
            durations: List[float] = []
            open_start: Optional[pd.Timestamp] = None
            
            for t, ev in seq:
                if ev == START:
                    # 이미 열려있다면 무시 (중복 방어)
                    if open_start is None:
                        open_start = t
                elif ev == END:
                    if open_start is not None:
                        dur = (t - open_start).total_seconds()
                        # [보완] 0초보다 크고, 사람이 현실적으로 할 수 없는 긴 세션(예: 6시간) 방어
                        if 0 < dur < (6 * 3600): 
                            durations.append(dur)
                        open_start = None

            # 집계 로직
            session_cnt = len(durations)
            if session_cnt == 0:
                out_rows.append({
                    "uuid": uid, "date": d,
                    "session_cnt": 0, "session_total_sec": 0.0,
                    "session_mean_sec": np.nan, "long_session_cnt": 0,
                })
                continue

            dur_arr = np.array(durations)
            out_rows.append({
                "uuid": uid, "date": d,
                "session_cnt": int(session_cnt),
                "session_total_sec": float(np.sum(dur_arr)),
                "session_mean_sec": float(np.mean(dur_arr)),
                "long_session_cnt": int(np.sum(dur_arr >= long_session_sec)),
            })
    if not out_rows:
        return pd.DataFrame(columns=[
            "uuid","date","session_cnt","session_total_sec","session_mean_sec","long_session_cnt"
        ])
    return pd.DataFrame(out_rows)

## 2.5 이동성 기반 피처(Mobility)

In [15]:
def build_daily_mobility(
    csv_paths: List[Path],
    mobility_events: List[str] = MOBILITY_EVENTS, 
    step_aliases: List[str] = STEP_SENSOR_ALIASES,
    chunksize: int = 300_000,
) -> pd.DataFrame:
    out_rows: List[Dict[str, Any]] = []

    for p in csv_paths:
        uid = p.stem
        cell_cnt, cell_uniqs, step_sum = {}, {}, {}
        unique_wifi_ssids = {}
        wifi_change_cnt_est = {}
        
        # [추가] 연속 SSID 변화를 추적하기 위한 청크 간 상태 저장
        last_ssid_seen = {} 

        for chunk in read_in_chunks(p, usecols_candidates=BASE_USECOLS, chunksize=chunksize):
            ch = preprocess_chunk(chunk, uid=uid, 
                                  event_candidates=EVENT_COL_CANDIDATES, 
                                  ts_candidates=TS_COL_CANDIDATES)
            if ch.empty: continue

            # --- 1) CELL / WIFI 기반 이동성 분석 ---
            ch_m = filter_by_events(ch, mobility_events).sort_values("dt")
            if not ch_m.empty:
                for d, sub in ch_m.groupby("date"):
                    d_idx = pd.Timestamp(d)
                    
                    # (1) cell_change_cnt: CELL_CHANGE 이벤트 단순 합산
                    c_cnt = len(sub[sub["event_std"] == "CELL_CHANGE"])
                    cell_cnt[d_idx] = cell_cnt.get(d_idx, 0) + c_cnt
                    
                    # (2) unique_cell_cnt: cell_lac의 고유 집합 크기
                    if "cell_lac" in sub.columns:
                        cells = sub[sub["event_std"] == "CELL_CHANGE"]["cell_lac"].dropna().astype(str)
                        cell_uniqs.setdefault(d_idx, set()).update(cells.tolist())

                    # (3) wifi_change_cnt_est & unique_wifi_cnt
                    if "wifi_ssid" in sub.columns:
                        wifi_sub = sub[sub["event_std"] == "WIFI_SSID"].copy()
                        wifi_sub["wifi_ssid"] = wifi_sub["wifi_ssid"].astype(str)
                        
                        if not wifi_sub.empty:
                            # unique_wifi_cnt용 set 업데이트
                            unique_wifi_ssids.setdefault(d_idx, set()).update(wifi_sub["wifi_ssid"].tolist())
                            
                            # 연속 SSID 변화 카운트 (환경 변화)
                            ssids = wifi_sub["wifi_ssid"].values
                            # 이전 청크의 마지막 SSID와 현재 청크의 첫 SSID 비교
                            current_last = last_ssid_seen.get(d_idx)
                            
                            changes = 0
                            for i in range(len(ssids)):
                                if current_last is not None and ssids[i] != current_last:
                                    changes += 1
                                current_last = ssids[i]
                            
                            wifi_change_cnt_est[d_idx] = wifi_change_cnt_est.get(d_idx, 0) + changes
                            last_ssid_seen[d_idx] = current_last

            # --- 2) Step Count 기반 활동량 분석 ---
            is_step = ch["event_std"].isin(step_aliases)
            if "sensor_id" in ch.columns:
                is_step |= ch["sensor_id"].astype(str).isin(step_aliases)
            
            ch_s = ch[is_step].copy()
            if not ch_s.empty:
                val_col = next((c for c in ["step_count", "value"] if c in ch_s.columns), None)
                if val_col:
                    ch_s[val_col] = pd.to_numeric(ch_s[val_col], errors="coerce")
                    # 중복 로그 방어
                    ch_s = ch_s.loc[~ch_s.index.duplicated(keep='first')]
                    for d, s in ch_s.groupby("date")[val_col].sum().items():
                        d_idx = pd.Timestamp(d)
                        step_sum[d_idx] = step_sum.get(d_idx, 0.0) + float(s)

        # 최종 결과 구성
        all_dates = sorted(set(cell_cnt) | set(step_sum) | set(unique_wifi_ssids))
        for d in all_dates:
            out_rows.append({
                "uuid": uid,
                "date": d,
                "cell_change_cnt": int(cell_cnt.get(d, 0)),             # 이동량
                "wifi_change_cnt_est": int(wifi_change_cnt_est.get(d, 0)), # 환경 변화
                "unique_wifi_cnt": float(len(unique_wifi_ssids.get(d, set()))), # 장소 다양성
                "unique_cell_cnt": float(len(cell_uniqs.get(d, set()))),       # 광역 장소 수
                "step_sum": step_sum.get(d, np.nan),                    # 활동량
            })
    if not out_rows:
        return pd.DataFrame(columns=[
            "uuid","date","cell_change_cnt","wifi_change_cnt_est","unique_wifi_cnt","unique_cell_cnt","step_sum"
        ])
    return pd.DataFrame(out_rows)

##  2.6 운영 메타/QC 피처(Meta/QC)

In [16]:
# 하트비트 신호와 시스템 상태(배터리, 큐 사이즈, 리트라이) 확인
# 데이터가 누락 없이 잘 전송되었는지, 타임존이 바뀌었는지(해외여행 등) 품질을 체크
def build_daily_meta_qc(
    csv_paths: List[Path],
    meta_event_type: str = META_EVENT_TYPE,   # "heartbeat"
    chunksize: int = 300_000,
) -> pd.DataFrame:
    out_rows: List[Dict[str, Any]] = []

    for p in csv_paths:
        uid = p.stem
        # 집계용 딕셔너리
        hb_cnt, retry_max, queue_max = {}, {}, {}
        tz_min, tz_max, last_event_max = {}, {}, {}

        for chunk in read_in_chunks(p, usecols_candidates=BASE_USECOLS, chunksize=chunksize):
            if chunk is None or chunk.empty: continue
            ch = chunk.copy()

            # (1) Heartbeat 행 필터링
            ch_hb = pd.DataFrame()
            if "type" in ch.columns:
                m = ch["type"].astype("string").str.lower() == str(meta_event_type).lower()
                ch_hb = ch[m].copy()
            
            if ch_hb.empty:
                ch2 = standardize_event_ts_cols(ch, 
                                                event_candidates=EVENT_COL_CANDIDATES,
                                                ts_candidates=HEARTBEAT_TS_CANDIDATES)
                m = ch2["event_std"].astype("string").str.strip().str.lower() == str(meta_event_type).lower()
                ch_hb = ch2[m].copy()

            if ch_hb.empty: continue

            # (2) 시간 변환 및 날짜 추출
            ts_col = pick_first_existing_col(ch_hb.columns.tolist(), HEARTBEAT_TS_CANDIDATES)
            if ts_col is None: continue

            x = pd.to_numeric(ch_hb[ts_col], errors="coerce")
            if x.dropna().empty: continue
            
            # 단위(s/ms) 판별 및 변환
            med_len = x.dropna().astype("int64").astype(str).str.len().median()
            unit = "s" if (pd.notna(med_len) and med_len <= 10) else "ms"
            ch_hb["dt_hb"] = pd.to_datetime(x, unit=unit, errors="coerce")
            ch_hb = ch_hb.dropna(subset=["dt_hb"])
            if ch_hb.empty: continue
            ch_hb["date"] = ch_hb["dt_hb"].dt.normalize()

            # (3) 집계 로직
            for d, sub in ch_hb.groupby("date"):
                d_idx = pd.Timestamp(d)
                hb_cnt[d_idx] = hb_cnt.get(d_idx, 0) + len(sub)

                # 수치 데이터 업데이트 헬퍼
                def update_target(col_name, target_dict, kind: str):
                    if col_name not in sub.columns:
                        return
                    vals = pd.to_numeric(sub[col_name], errors="coerce").dropna()
                    if vals.empty:
                        return

                    new_val = float(vals.max() if kind == "max" else vals.min())
                    prev_val = target_dict.get(d_idx, np.nan)

                    if not np.isfinite(prev_val):
                        target_dict[d_idx] = new_val
                    else:
                        target_dict[d_idx] = max(prev_val, new_val) if kind == "max" else min(prev_val, new_val)

                update_target("retry_count", retry_max, "max")
                update_target("queue_size", queue_max, "max")
                update_target("tz_offset_minutes", tz_min, "min")
                update_target("tz_offset_minutes", tz_max, "max")
                update_target("client_last_event_ts", last_event_max, "max")

        # (4) 최종 데이터 구성
        for d in sorted(hb_cnt.keys()):
            tmin = tz_min.get(d, np.nan)
            tmax = tz_max.get(d, np.nan)
            
            # 타임존 변경 감지 (기본 임계값 30분~60분)
            tz_changed = False
            if pd.notna(tmin) and pd.notna(tmax):
                tz_changed = bool(abs(tmax - tmin) >= TZ_CHANGE_THR_MINUTES)

            out_rows.append({
                "uuid": uid, "date": d,
                "heartbeat_cnt": int(hb_cnt[d]),
                "retry_max": retry_max.get(d, np.nan),
                "queue_max": queue_max.get(d, np.nan),
                "tz_offset_min": tmin,
                "tz_offset_max": tmax,
                "tz_changed": tz_changed,
                "qc_last_ts_max": last_event_max.get(d, np.nan)
            })
    if not out_rows:
        return pd.DataFrame(columns=[
            "uuid","date","heartbeat_cnt","retry_max","queue_max",
            "tz_offset_min","tz_offset_max","tz_changed","qc_last_ts_max"
        ])
    return pd.DataFrame(out_rows)
print("✅ build_daily_meta_qc patched (heartbeat ts candidates + dual heartbeat detection)")

✅ build_daily_meta_qc patched (heartbeat ts candidates + dual heartbeat detection)


# 3. 병합 + 파생 피처


In [17]:
def _assert_or_dedup_uuid_date(df: pd.DataFrame, name: str) -> pd.DataFrame:
    """(uuid,date) 중복이 있으면 폭발 방지용으로 강제 정리."""
    if df is None or df.empty:
        return df
    if not {"uuid", "date"}.issubset(df.columns):
        raise ValueError(f"{name}: missing uuid/date columns")

    df = df.copy()
    df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.normalize()

    dup = int(df.duplicated(["uuid", "date"]).sum())
    if dup == 0:
        return df

    out = (
        df.sort_values(["uuid", "date"])
        .drop_duplicates(["uuid", "date"], keep="last")
        .reset_index(drop=True)
    )
    print(f"⚠️ [{name}] duplicated (uuid,date)={dup} -> keep last row")
    return out


def _outer_merge_on_uuid_date(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
    if left is None or left.empty:
        return right.copy()
    if right is None or right.empty:
        return left.copy()
    return left.merge(right, on=["uuid", "date"], how="outer")


def _to01_or_nan(s: pd.Series) -> pd.Series:
    """bool/float/nan 섞여도 0/1/NaN 형태로 정리."""
    if s is None:
        return s
    x = s.copy()
    x = x.where(~pd.isna(x), np.nan)
    if x.dtype == bool:
        return x.astype(float)
    if pd.api.types.is_object_dtype(x):
        x = x.replace({"True": 1, "False": 0, True: 1, False: 0})
    return pd.to_numeric(x, errors="coerce")

## 3.1 outer merge (uuid,date union)

In [18]:
def build_daily_feature_table_v3(csv_paths: List[Path]) -> pd.DataFrame:
    # 2.x 블록별 피처 생성
    activity_sensors = CORE_SENSORS + PASSIVE_SENSORS + AUX_SENSORS
    activity_df = build_daily_activity(csv_paths, core_sensors=activity_sensors)  
    rhythm_df   = build_daily_rhythm(csv_paths)
    gap_df      = build_daily_gap(csv_paths)
    session_df  = build_daily_session(csv_paths)
    mob_df      = build_daily_mobility(csv_paths)
    meta_df     = build_daily_meta_qc(csv_paths)

    # ✅ merge 폭발 방지: (uuid,date) 유니크 보장
    activity_df = _assert_or_dedup_uuid_date(activity_df, "activity")
    rhythm_df   = _assert_or_dedup_uuid_date(rhythm_df,   "rhythm")
    gap_df      = _assert_or_dedup_uuid_date(gap_df,      "gap")
    session_df  = _assert_or_dedup_uuid_date(session_df,  "session")
    mob_df      = _assert_or_dedup_uuid_date(mob_df,      "mobility")
    meta_df     = _assert_or_dedup_uuid_date(meta_df,     "meta")

    # outer merge (uuid,date union)
    df = activity_df.copy()
    df = _outer_merge_on_uuid_date(df, rhythm_df)
    df = _outer_merge_on_uuid_date(df, gap_df)
    df = _outer_merge_on_uuid_date(df, session_df)
    df = _outer_merge_on_uuid_date(df, mob_df)
    df = _outer_merge_on_uuid_date(df, meta_df)

    # date normalize + sort
    df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.normalize()
    df = df.dropna(subset=["uuid", "date"]).sort_values(["uuid", "date"]).reset_index(drop=True)

    # Unlock count 표준화 (없으면 0)
    if "Unlock" not in df.columns:
        df["Unlock"] = 0
    df["Unlock"] = pd.to_numeric(df["Unlock"], errors="coerce").fillna(0).astype(int)
    df["unlock_cnt"] = df["Unlock"]

    for c in CORE_SENSORS:
        if c not in df.columns:
            df[c] = 0
        df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0).astype(int)
    # PASSIVE도 dtype/결측 정리
    for c in PASSIVE_SENSORS:
        if c not in df.columns:
            df[c] = 0
        df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0).astype(int)

    # unlock_cnt도 dtype 확정
    if "unlock_cnt" in df.columns:
        df["unlock_cnt"] = pd.to_numeric(df["unlock_cnt"], errors="coerce").fillna(0).astype(int)

    # daily event (core 합)
    core_cols_present = [c for c in CORE_SENSORS if c in df.columns]
    df["daily_event_cnt"] = df[core_cols_present].sum(axis=1).astype(int)
    df["has_activity"] = (df["daily_event_cnt"] > 0)

    # 3.2 QC flags
    df = add_qc_flags_v3(df)

    # 3.3 soft context
    df = add_soft_context_signals_v3(df)

    # 3.4 delta features
    df = add_delta_features_v3(df)

    return df

## 3.2 QC flags 분리(core/rhythm/gap/meta)

In [19]:
def add_qc_flags_v3(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    # (A) core QC
    df["qc_core_low_cov"] = df["daily_event_cnt"] < MIN_DAILY_EVENTS
    df["qc_core_very_low_activity"] = (df["daily_event_cnt"] > 0) & (df["daily_event_cnt"] < PARTIAL_CORE_MIN_EVENTS)

    # (B) rhythm QC
    if "rhythm_low_coverage" not in df.columns:
        df["rhythm_low_coverage"] = True
    df["rhythm_low_coverage"] = df["rhythm_low_coverage"].fillna(True).astype(bool)
    df["qc_rhythm_low_cov"] = df["rhythm_low_coverage"]

    # (C) gap QC
    if "gap_low_coverage" not in df.columns:
        df["gap_low_coverage"] = True
    df["gap_low_coverage"] = df["gap_low_coverage"].fillna(True).astype(bool)
    df["qc_gap_low_cov"] = df["gap_low_coverage"]

    # (D) meta QC
    if "heartbeat_cnt" in df.columns:
        df["qc_meta_low_heartbeat"] = (
            pd.to_numeric(df["heartbeat_cnt"], errors="coerce").fillna(0).astype(int) < MIN_HEARTBEAT_PER_DAY
        )
    else:
        df["qc_meta_low_heartbeat"] = np.nan

    if "retry_max" in df.columns:
        df["qc_meta_retry_warn"] = pd.to_numeric(df["retry_max"], errors="coerce").fillna(-np.inf) >= RETRY_WARN_THR
    else:
        df["qc_meta_retry_warn"] = np.nan

    if "queue_max" in df.columns:
        df["qc_meta_queue_warn"] = pd.to_numeric(df["queue_max"], errors="coerce").fillna(-np.inf) >= QUEUE_WARN_THR
    else:
        df["qc_meta_queue_warn"] = np.nan

    # tz_changed fallback
    if "tz_changed" not in df.columns and ("tz_offset_min" in df.columns) and ("tz_offset_max" in df.columns):
        tmin = pd.to_numeric(df["tz_offset_min"], errors="coerce")
        tmax = pd.to_numeric(df["tz_offset_max"], errors="coerce")
        df["tz_changed"] = (tmax - tmin).abs() >= TZ_CHANGE_THR_MINUTES

    return df

## 3.3 soft context signals

In [20]:
def add_soft_context_signals_v3(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    # (1) tz 변화 신호
    if "tz_changed" in df.columns:
        df["tz_change_signal"] = _to01_or_nan(df["tz_changed"].astype("boolean"))
    else:
        df["tz_change_signal"] = 0.0

    # (2) partial 신호
    core_very_low = df.get("qc_core_very_low_activity", False).fillna(False).astype(bool)
    retry_warn = df.get("qc_meta_retry_warn", np.nan)
    queue_warn = df.get("qc_meta_queue_warn", np.nan)

    retry01 = _to01_or_nan(retry_warn)
    queue01 = _to01_or_nan(queue_warn)

    base = core_very_low.astype(int)
    df["partial_signal_raw"] = (
        base
        + pd.to_numeric(retry01, errors="coerce").fillna(0).astype(int)
        + pd.to_numeric(queue01, errors="coerce").fillna(0).astype(int)
    ).astype(float)
    df.loc[~core_very_low, "partial_signal_raw"] = 0.0

    # (3) travel 신호
    travel = pd.Series(0, index=df.index, dtype=float)

    tz01 = pd.to_numeric(df["tz_change_signal"], errors="coerce").fillna(0).astype(int)
    travel += (tz01 * 2).astype(float)

    if "cell_change_cnt" in df.columns:
        c = pd.to_numeric(df["cell_change_cnt"], errors="coerce")
        c_prev = c.groupby(df["uuid"]).shift(1)
        travel += ((c_prev.notna()) & (c >= (c_prev * 2 + 10))).astype(float)

    if "wifi_change_cnt_est" in df.columns:
        w = pd.to_numeric(df["wifi_change_cnt_est"], errors="coerce")
        w_prev = w.groupby(df["uuid"]).shift(1)
        travel += ((w_prev.notna()) & (w >= (w_prev * 2 + 10))).astype(float)

    if "unique_cell_cnt" in df.columns:
        u1 = pd.to_numeric(df.get("unique_cell_cnt", np.nan), errors="coerce").fillna(0)
        u1_prev = u1.groupby(df["uuid"]).shift(1)
        travel += ((u1_prev.notna()) & (u1 >= (u1_prev + 3))).astype(float)

    df["travel_signal_raw"] = travel
    return df

 ## 3.4 전일 대비 변화량(delta features)

In [21]:
def add_delta_features_v3(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df = df.sort_values(["uuid", "date"]).reset_index(drop=True)

    delta_cols = [
        "daily_event_cnt",
        "night_ratio",
        "hour_entropy",
        "gap_max",
        "gap_p95",
        "gap_cnt_2h",
        "gap_cnt_6h",
        "session_total_sec",
        "session_cnt",
        "step_sum",
        "cell_change_cnt",
        "wifi_change_cnt_est",
        "travel_signal_raw",
        "partial_signal_raw",
        "tz_change_signal",
    ]
    delta_cols = [c for c in delta_cols if c in df.columns]

    for c in delta_cols:
        x = pd.to_numeric(df[c], errors="coerce")
        prev = x.groupby(df["uuid"]).shift(1)
        df[f"{c}_d1"] = x - prev

    ratio_cols = ["daily_event_cnt", "session_total_sec", "step_sum"]
    ratio_cols = [c for c in ratio_cols if c in df.columns]

    eps = 1e-6
    for c in ratio_cols:
        x = pd.to_numeric(df[c], errors="coerce")
        prev = x.groupby(df["uuid"]).shift(1)
        df[f"{c}_r1"] = (x - prev) / (prev.abs() + eps)

    return df

print("✅ FE_v3 merge/derive ready")

✅ FE_v3 merge/derive ready


# 4. 저장용 컬럼 선택

In [22]:
def save_daily_feature_v3(df: pd.DataFrame, out_dir: Path) -> Path:
    df = df.copy()

    # meta 컬럼 없으면 NaN 채워서 스키마 고정
    for c in [
        "tz_changed","tz_offset_min","tz_offset_max",
        "heartbeat_cnt","retry_max","queue_max","qc_last_ts_max"
    ]:
        if c not in df.columns:
            df[c] = np.nan

    out_dir.mkdir(parents=True, exist_ok=True)

    cols = [
        "uuid", "date",

        *CORE_SENSORS, *PASSIVE_SENSORS, "unlock_cnt",
        "daily_event_cnt", "has_activity",

        # qc
        "qc_core_low_cov","qc_core_very_low_activity","qc_rhythm_low_cov","qc_gap_low_cov",
        "qc_meta_low_heartbeat","qc_meta_retry_warn","qc_meta_queue_warn",

        # signals (context)
        "partial_signal_raw","travel_signal_raw","tz_change_signal",

        # rhythm
        "rhythm_event_cnt","rhythm_low_coverage","night_ratio","hour_entropy",
        "day_ratio","evening_ratio","peak_hour","peak_ratio",

        # gap
        "gap_event_cnt","gap_low_coverage","gap_max","gap_p95","gap_cnt_2h","gap_cnt_6h",
        "gap_long_ratio","overnight_gap","first_hour","last_hour",

        # session
        "session_cnt","session_total_sec","session_mean_sec","long_session_cnt",

        # mobility
        "cell_change_cnt","wifi_change_cnt_est", "unique_wifi_cnt", "unique_cell_cnt","step_sum",

        # meta
        "heartbeat_cnt","retry_max","queue_max","tz_offset_min","tz_offset_max","tz_changed",
        "qc_last_ts_max",
    ]

    # delta 자동 추가
    delta_cols = [c for c in df.columns if c.endswith("_d1") or c.endswith("_r1")]
    cols = cols + delta_cols

    # 실제 존재 컬럼만
    cols = [c for c in cols if c in df.columns]

    out = df[cols].copy()

    # flag/signal류는 float로 통일 (0/1/NaN 유지)
    for c in ["partial_signal_raw","travel_signal_raw","tz_change_signal"]:
        if c in out.columns:
            out[c] = pd.to_numeric(out[c], errors="coerce")

    path = out_dir / "daily_feature_v3.csv"
    out.to_csv(path, index=False, encoding="utf-8-sig")
    print("✅ saved:", path, "| shape:", out.shape)
    return path

# 5. 저장 전 QC

In [23]:
def feature_qc_report(df: pd.DataFrame, name="daily_feature_v3", topk=10, exclude_meta_in_missing=True):
    assert {"uuid","date"}.issubset(df.columns), "missing uuid/date"
    d = df.copy()
    d["date"] = pd.to_datetime(d["date"], errors="coerce").dt.normalize()

    print(f"\n===== QC REPORT: {name} =====")
    print("shape:", d.shape)
    print("users:", d["uuid"].nunique(), "days:", d["date"].nunique())
    print("uuid-date dup:", int(d.duplicated(["uuid","date"]).sum()))
    print("date null:", int(d["date"].isna().sum()))

    # 1) 기본 결측률 (meta 제외 옵션)
    META_COLS = [
        "heartbeat_cnt","retry_max","queue_max","qc_last_ts_max",
        "tz_offset_min","tz_offset_max","tz_changed","tz_change_signal",
        "qc_meta_low_heartbeat","qc_meta_retry_warn","qc_meta_queue_warn",
    ]

    d_for_missing = d
    if exclude_meta_in_missing:
        drop_cols = [c for c in META_COLS if c in d_for_missing.columns]
        d_for_missing = d_for_missing.drop(columns=drop_cols)

    na = (d_for_missing.isna().mean().sort_values(ascending=False) * 100).round(2)
    print("\n[Missing Rate %] top:")
    print(na.head(topk))

    core_cols = [c for c in CORE_SENSORS if c in d.columns]
    if core_cols:
        d["core_sum_check"] = d[core_cols].sum(axis=1)
        if "daily_event_cnt" in d.columns:
            mismatch = (d["core_sum_check"] != d["daily_event_cnt"]).sum()
            print("\n[Core Sum Check] mismatch rows:", int(mismatch))

        print("\n[Core Stats]")
        show_cols = core_cols + (["daily_event_cnt"] if "daily_event_cnt" in d.columns else [])
        print(d[show_cols].describe(percentiles=[.5,.9,.99]).T)

        if "daily_event_cnt" in d.columns:
            zero_days = (d["daily_event_cnt"] == 0).mean() * 100
            print(f"daily_event_cnt==0 ratio: {zero_days:.2f}%")

    # 3) QC flag 비율
    qc_cols = [c for c in d.columns if c.startswith("qc_")]
    if qc_cols:
        print("\n[QC Flag Rates %]")
        for c in qc_cols:
            x = d[c]
            if x.dtype == bool:
                rate = x.mean() * 100
            else:
                rate = pd.to_numeric(x, errors="coerce").mean() * 100
            print(f"{c}: {rate:.2f}%")

    # 4) 값 범위 검증(이상치/논리 위반)
    checks = []

    def add_check(col, cond, msg):
        if col in d.columns:
            n = int(cond(d[col]).sum())
            checks.append((col, n, msg))

    # ratio류 0~1 기대
    add_check("night_ratio", lambda s: (s<0) | (s>1), "expected [0,1]")
    add_check("day_ratio", lambda s: (s<0) | (s>1), "expected [0,1]")
    add_check("evening_ratio", lambda s: (s<0) | (s>1), "expected [0,1]")
    add_check("peak_ratio", lambda s: (s<0) | (s>1), "expected [0,1]")
    add_check("gap_long_ratio", lambda s: (s<0) | (s>1.5), "expected ~[0,1] (allow 1.5 slack)")

    # 시간 관련
    add_check("gap_max", lambda s: s<0, "gap should be >=0")
    add_check("gap_p95", lambda s: s<0, "gap should be >=0")
    add_check("overnight_gap", lambda s: s<0, "overnight_gap should be >=0")
    add_check("first_hour", lambda s: (s<0) | (s>23), "expected 0~23")
    add_check("last_hour", lambda s: (s<0) | (s>23), "expected 0~23")
    add_check("peak_hour", lambda s: (s<0) | (s>23), "expected 0~23")

    # 세션
    add_check("session_total_sec", lambda s: s<0, "session_total_sec should be >=0")
    add_check("session_cnt", lambda s: s<0, "session_cnt should be >=0")
    add_check("session_mean_sec", lambda s: s<0, "session_mean_sec should be >=0")

    # meta
    add_check("heartbeat_cnt", lambda s: s<0, "heartbeat_cnt should be >=0")
    add_check("retry_max", lambda s: s<0, "retry_max should be >=0")
    add_check("queue_max", lambda s: s<0, "queue_max should be >=0")
    add_check("tz_offset_min", lambda s: (s<-720) | (s>840), "tz offset minutes suspicious")
    add_check("tz_offset_max", lambda s: (s<-720) | (s>840), "tz offset minutes suspicious")

    if checks:
        print("\n[Range / Logic Violations] (count)")
        any_bad = False
        for col, n, msg in checks:
            if n > 0:
                any_bad = True
                print(f"{col}: {n} rows -> {msg}")
        if not any_bad:
            print("No obvious violations found.")

    # 5) delta sanity
    delta_cols = [c for c in d.columns if c.endswith("_d1") or c.endswith("_r1")]
    if delta_cols:
        print("\n[Delta Missing %] top:")
        delta_na = (d[delta_cols].isna().mean().sort_values(ascending=False) * 100).round(2)
        print(delta_na.head(topk))

    # 6) “의심 user-day” 샘플
    suspect_cols = [c for c in [
        "daily_event_cnt","heartbeat_cnt","partial_signal_raw","travel_signal_raw",
        "gap_max","gap_cnt_6h","night_ratio","hour_entropy"
    ] if c in d.columns]
    if suspect_cols:
        cond = pd.Series(False, index=d.index)

        if "daily_event_cnt" in d.columns and "heartbeat_cnt" in d.columns:
            cond |= (d["daily_event_cnt"]==0) & (d["heartbeat_cnt"].fillna(0)>0)
        if "partial_signal_raw" in d.columns:
            cond |= (d["partial_signal_raw"].fillna(0)>0)

        sus = d.loc[cond, ["uuid","date"]+suspect_cols].head(10)
        print("\n[Suspect Samples] (first 10 rows)")
        print(sus.to_string(index=False))

daily_feature_v3 = build_daily_feature_table_v3(CSV_PATHS)
feature_qc_report(daily_feature_v3, name="daily_feature_v3", exclude_meta_in_missing=True)


===== QC REPORT: daily_feature_v3 =====
shape: (9057, 48)
users: 342 days: 90
uuid-date dup: 0
date null: 0

[Missing Rate %] top:
gap_cnt_6h_d1      9.29
gap_cnt_2h_d1      9.29
gap_p95_d1         9.29
gap_max_d1         9.29
night_ratio_d1     9.29
hour_entropy_d1    9.29
overnight_gap      5.21
peak_hour          3.92
hour_entropy       3.92
day_ratio          3.92
dtype: float64

[Core Sum Check] mismatch rows: 0

[Core Stats]
                  count        mean         std  min    50%    90%      99%  \
Screen           9057.0  183.138788  132.310715  0.0  155.0  349.4   641.88   
UserAct          9057.0  311.780170  256.841632  0.0  266.0  649.0  1138.88   
daily_event_cnt  9057.0  494.918958  343.518544  0.0  439.0  947.4  1580.72   

                    max  
Screen           1280.0  
UserAct          2073.0  
daily_event_cnt  2501.0  
daily_event_cnt==0 ratio: 0.13%

[QC Flag Rates %]
qc_core_low_cov: 3.92%
qc_core_very_low_activity: 0.86%
qc_rhythm_low_cov: 3.92%
qc_gap_low_

  df["rhythm_low_coverage"] = df["rhythm_low_coverage"].fillna(True).astype(bool)
  df["gap_low_coverage"] = df["gap_low_coverage"].fillna(True).astype(bool)


# 6. 파일 저장

In [24]:
OUT_DIR = Path("../artifacts/features")
_ = save_daily_feature_v3(daily_feature_v3, OUT_DIR)

✅ saved: ..\artifacts\features\daily_feature_v3.csv | shape: (9057, 54)
