# 02. Process Data
Weather processing, merge, feature engineering.

In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
import utils

# Load Data
base_df = utils.load_data()
if base_df is not None:
    base_df_filtered = base_df[(base_df['arrival_delay'] >= -600) & (base_df['arrival_delay'] <= 1800)]

同じ日のtripでも2分以上最寄りのバス停が同じパターンが存在する。始発の停車、長距離移動、終点など。加工して１つに制限する必要がある。

In [2]:
# counts = base_df.groupby(['start_date', 'trip_id', 'stop_sequence']).size()
# counts[counts > 1]

同じ日のtripで2分間で複数のバス停を通過するとその値は穴あきになる。それを調査する。

In [3]:
# stop_sequenceの連番に穴あきがあるかチェック
def check_sequence_gaps(df):
    """各trip（日付+trip_id）ごとにstop_sequenceの欠番をチェック"""
    gaps = []
    for (date, trip_id), group in df.groupby(['start_date', 'trip_id']):
        seqs = sorted(group['stop_sequence'].unique())
        expected = list(range(min(seqs), max(seqs) + 1))
        missing = set(expected) - set(seqs)
        if missing:
            gaps.append({
                'start_date': date,
                'trip_id': trip_id,
                'min_seq': min(seqs),
                'max_seq': max(seqs),
                'missing_seqs': sorted(missing),
                'gap_count': len(missing)
            })
    return pd.DataFrame(gaps)

# gaps_df = check_sequence_gaps(base_df)
# print(f"穴あきがあるtrip数: {len(gaps_df)}")
# gaps_df.head(10)

In [4]:
# Prepare Trip Data (Aggregation)
def prepare_trip_data(df, direction_id=None):
    """
    Trip単位でデータを整理
    Args:
        df: 入力データフレーム
        direction_id: 方向ID（None=全方向、特定値で絞り込み）
    """
    if direction_id is not None:
        data = df[df['direction_id'] == direction_id].copy()
    else:
        data = df.copy()

    # route_id + direction_id + trip_id でユニークなtrip_keyを作成
    data['trip_key'] = (
        data['start_date'].astype(str) + '_' +
        data['route_id'].astype(str) + '_' +
        data['direction_id'].astype(str) + '_' +
        data['trip_id'].astype(str)
    )

    # route_direction_key（route_id + direction_idの組み合わせ）
    data['route_direction_key'] = (
        data['route_id'].astype(str) + '_' +
        data['direction_id'].astype(str)
    )

    # バス停タイプの判定
    trip_seq_stats = data.groupby('trip_key')['stop_sequence'].agg(['min', 'max'])
    trip_seq_stats.columns = ['seq_min', 'seq_max']
    data = data.merge(trip_seq_stats, on='trip_key', how='left')

    data['stop_type'] = 'middle'
    data.loc[data['stop_sequence'] == data['seq_min'], 'stop_type'] = 'first'
    data.loc[data['stop_sequence'] == data['seq_max'], 'stop_type'] = 'last'

    # 集計
    group_cols = ['trip_key', 'stop_sequence']
    first_stops = data[data['stop_type'] == 'first'].groupby(group_cols)['arrival_delay'].max().reset_index()
    first_stops.columns = ['trip_key', 'stop_sequence', 'arrival_delay_agg']
    last_stops = data[data['stop_type'] == 'last'].groupby(group_cols)['arrival_delay'].min().reset_index()
    last_stops.columns = ['trip_key', 'stop_sequence', 'arrival_delay_agg']
    middle_stops = data[data['stop_type'] == 'middle'].groupby(group_cols)['arrival_delay'].first().reset_index()
    middle_stops.columns = ['trip_key', 'stop_sequence', 'arrival_delay_agg']

    agg_delays = pd.concat([first_stops, middle_stops, last_stops])

    exclude_cols = ['arrival_delay', 'stop_type', 'seq_min', 'seq_max']
    other_cols = [c for c in data.columns if c not in exclude_cols and c not in group_cols]
    data_unique = data.groupby(group_cols, as_index=False)[other_cols].first()
    data_unique = data_unique.merge(agg_delays, on=['trip_key', 'stop_sequence'], how='left')
    data_unique = data_unique.sort_values(['trip_key', 'stop_sequence'])

    # 欠損値を線形補間
    data_unique['arrival_delay_agg'] = data_unique.groupby('trip_key')['arrival_delay_agg'].transform(
        lambda x: x.interpolate(method='linear', limit_direction='both')
    )
    
    # float32に変換
    data_unique['arrival_delay_agg'] = data_unique['arrival_delay_agg'].astype('float32')

    return data_unique

if 'base_df_filtered' in locals() and base_df_filtered is not None:
    # メモリ節約のため、使用済みのbase_dfを削除
    if 'base_df' in locals():
        del base_df
        import gc
        gc.collect()
        print("Released base_df memory.")

    print("Preparing trip data...")
    df_process = prepare_trip_data(base_df_filtered, direction_id=None)
    print(f"Processed trips: {df_process['trip_key'].nunique()}")

Released base_df memory.
Preparing trip data...
Processed trips: 453676


# Feature Engineering

In [5]:
def process_features(df_process):
    """Apply feature engineering"""
    scheduled_time = pd.to_datetime(df_process['scheduled_arrival_time'], utc=True)
    df_process['time_of_day'] = scheduled_time.dt.hour + scheduled_time.dt.minute / 60
    df_process['hour'] = scheduled_time.dt.hour
    df_process['time_sin'] = np.sin(2 * np.pi * df_process['time_of_day'] / 24)
    df_process['time_cos'] = np.cos(2 * np.pi * df_process['time_of_day'] / 24)
    df_process['day_of_week'] = pd.to_datetime(df_process['start_date'], format='%Y%m%d').dt.dayofweek
    df_process['is_weekend'] = (df_process['day_of_week'] >= 6).astype(int)

    # v2 features
    df_process['is_rush_hour'] = ((df_process['hour'] >= 14) & (df_process['hour'] <= 18)).astype(int)

    if 'alert_effect_detour' in df_process.columns:
        df_process['has_detour'] = (df_process['alert_effect_detour'] > 0).astype(int)
    else:
        df_process['has_detour'] = 0

    if 'alert_police_activity' in df_process.columns:
        df_process['has_police_alert'] = (df_process['alert_police_activity'] > 0).astype(int)
    else:
        df_process['has_police_alert'] = 0

    rd_encoder = LabelEncoder()
    df_process['route_direction_encoded'] = rd_encoder.fit_transform(df_process['route_direction_key'])
    
    return df_process

if 'df_process' in locals() and df_process is not None:
    print("Applying feature engineering...")
    df_process = process_features(df_process)

    print("New features created:")
    print(f"  is_rush_hour: {df_process['is_rush_hour'].mean():.1%}")
    print(f"  has_detour: {df_process['has_detour'].mean():.1%}")
    print(f"  has_police_alert: {df_process['has_police_alert'].mean():.1%}")

Applying feature engineering...
New features created:
  is_rush_hour: 26.3%
  has_detour: 0.5%
  has_police_alert: 1.1%


In [6]:
df_process.head()

Unnamed: 0,trip_key,stop_sequence,route_id,trip_id,start_date,arrival_day_offset,direction_id,stop_id,region_id,lat_sin,...,arrival_delay_agg,time_of_day,hour,time_sin,time_cos,is_weekend,is_rush_hour,has_detour,has_police_alert,route_direction_encoded
0,20251203_10232_0_14895984,1,10232,14895984,20251203,0,0,10947,west_vancouver,0.758439,...,139.0,14.083333,14,-0.518773,-0.854912,0,1,0,0,0
1,20251203_10232_0_14895984,2,10232,14895984,20251203,0,0,4782,west_vancouver,0.758494,...,127.0,14.116667,14,-0.526214,-0.850352,0,1,0,0,0
2,20251203_10232_0_14895984,3,10232,14895984,20251203,0,0,12883,west_vancouver,0.758526,...,165.0,14.133333,14,-0.529919,-0.848048,0,1,0,0,0
3,20251203_10232_0_14895984,4,10232,14895984,20251203,0,0,11118,west_vancouver,0.758502,...,135.0,14.216667,14,-0.548293,-0.836286,0,1,0,0,0
4,20251203_10232_0_14895984,5,10232,14895984,20251203,0,0,4491,west_vancouver,0.758443,...,314.0,14.35,14,-0.577145,-0.816642,0,1,0,0,0


In [7]:
# Select columns including Feature Store features
base_cols = [
    'trip_key', 'stop_sequence', 'route_id', 'trip_id', 'start_date', 'direction_id', 'stop_id', 'region_id',
    'scheduled_arrival_time', 'actual_arrival_time', 'time_bucket',
    'hour_of_day', 'day_of_week', 'has_active_alert',
    'route_direction_key', 'arrival_delay_agg',
    'time_of_day', 'hour', 'time_sin', 'time_cos', 'is_weekend',
    'is_rush_hour', 'has_detour', 'has_police_alert',
    'route_direction_encoded'
]

# Feature Store columns
feature_store_cols = [
    # Lag features
    'delay_lag_1', 'delay_lag_2', 'delay_lag_3', 'delay_lag_5',
    # Rolling features
    'delay_rolling_mean_3', 'delay_rolling_std_3',
    'delay_rolling_mean_5', 'delay_rolling_std_5',
    'delay_rolling_mean_10', 'delay_rolling_std_10',
    # Time period features
    'time_period', 'is_morning_rush', 'is_evening_rush'
]

# Filter to existing columns only
available_cols = [c for c in base_cols + feature_store_cols if c in df_process.columns]
df_process_selected = df_process[available_cols].copy()

print(f"Selected {len(available_cols)} columns")
print(f"  - Base features: {len([c for c in base_cols if c in available_cols])}")
print(f"  - Feature Store features: {len([c for c in feature_store_cols if c in available_cols])}")

Selected 25 columns
  - Base features: 25
  - Feature Store features: 0


# Feature Store

計算コストの高い特徴量（ラグ特徴量、移動平均など）を事前計算し、Parquet形式で保存します。
これにより再計算の無駄を省き、モデル学習時に高速に読み込むことができます。

In [8]:
# Feature Store計算の実行（最適化版）
from feature_store_optimized import (
    compute_all_features_optimized,
    compute_time_features,
    print_memory_usage
)
import gc

if 'df_process' in locals() and df_process is not None:
    print_memory_usage(df_process, "df_process (before)")
    
    print("\nComputing Feature Store features (optimized)...")
    
    # ラグ・移動統計量を一括計算
    df_process = compute_all_features_optimized(
        df_process,
        lags=[1, 2, 3, 5],
        windows=[3, 5, 10],
        chunk_size=50000  # メモリに応じて調整
    )
    
    gc.collect()
    
    # 時間帯特徴量
    print("\nComputing time-based features...")
    df_process = compute_time_features(df_process)
    
    print_memory_usage(df_process, "df_process (after)")
    
    print(f"\nFeature Store features computed:")
    print(f"  - Lag features: {[c for c in df_process.columns if 'lag_' in c]}")
    print(f"  - Rolling features: {[c for c in df_process.columns if 'rolling_' in c]}")


df_process (before): 6,276,723 rows, 4524.5 MB

Computing Feature Store features (optimized)...
Input data shape: (6276723, 38)
Memory usage: 4524.5 MB

[Step 1] Computing trip-level aggregates...
  Trip aggregates: 453676 rows

[Step 2] Sorting by group and time...

[Step 3] Computing lag features...
  - delay_lag_1 computed
  - delay_lag_2 computed
  - delay_lag_3 computed
  - delay_lag_5 computed

[Step 4] Computing rolling features...
  - delay_rolling_mean_3, delay_rolling_std_3 computed
  - delay_rolling_mean_5, delay_rolling_std_5 computed
  - delay_rolling_mean_10, delay_rolling_std_10 computed

[Step 5] Merging features back to original dataframe...
  Processing in 125 chunks...


  return bound(*args, **kwds)


  - Chunk 10/125 done
  - Chunk 20/125 done
  - Chunk 30/125 done
  - Chunk 40/125 done
  - Chunk 50/125 done
  - Chunk 60/125 done
  - Chunk 70/125 done
  - Chunk 80/125 done
  - Chunk 90/125 done
  - Chunk 100/125 done
  - Chunk 110/125 done
  - Chunk 120/125 done
  - Chunk 125/125 done

[Step 6] Concatenating chunks...

Output data shape: (6276723, 48)
Memory usage: 4764.0 MB

Computing time-based features...
df_process (after): 6,276,723 rows, 5162.8 MB

Feature Store features computed:
  - Lag features: ['delay_lag_1', 'delay_lag_2', 'delay_lag_3', 'delay_lag_5']
  - Rolling features: ['delay_rolling_mean_3', 'delay_rolling_std_3', 'delay_rolling_mean_5', 'delay_rolling_std_5', 'delay_rolling_mean_10', 'delay_rolling_std_10']


In [None]:
# Save Feature Store (Parquet format for fast loading)
import os

output_dir = 'data/processed_data'
os.makedirs(output_dir, exist_ok=True)

if 'df_process_selected' in locals() and df_process_selected is not None:
    # Ensure start_date is string
    if 'start_date' in df_process_selected.columns:
        df_process_selected['start_date'] = df_process_selected['start_date'].astype(str)

    # CSV形式（互換性のため）
    # csv_file = f'{output_dir}/processed_trip_data.csv'
    # df_process_selected.to_csv(csv_file, index=False)
    # print(f"Saved CSV: {csv_file}")
    
    # Parquet形式（Feature Store - 高速読み込み）
    parquet_file = f'{output_dir}/feature_store.parquet'
    df_process_selected.to_parquet(parquet_file, index=False, compression='snappy')
    print(f"Saved Feature Store (Parquet): {parquet_file}")
    
    # ファイルサイズ比較
    # csv_size = os.path.getsize(csv_file) / (1024 * 1024)
    parquet_size = os.path.getsize(parquet_file) / (1024 * 1024)
    print(f"\nFile size comparison:")
    # print(f"  - CSV: {csv_size:.1f} MB")
    print(f"  - Parquet: {parquet_size:.1f} MB")

Saved Feature Store (Parquet): data/processed_data/feature_store.parquet

File size comparison:
  - Parquet: 132.4 MB


# Train / Valid / Test Split

評価データセットの「完全固定」のため、時系列データを時間軸で分割して保存します。
- 訓練期間: 70%
- 検証期間: 15%
- テスト期間: 15%

すべてのモデル（03〜06）はこの同一のファイルを使用して評価を行います。

In [10]:
# Split data into train/valid/test and save
from data_splitter import (
    temporal_train_valid_test_split,
    print_split_info,
    save_split_data
)

if 'df_process_selected' in locals() and df_process_selected is not None:
    # 時系列分割（70% train, 15% valid, 15% test）
    df_train, df_valid, df_test, split_info = temporal_train_valid_test_split(
        df_process_selected,
        date_column='start_date',
        train_ratio=0.7,
        valid_ratio=0.15,
        test_ratio=0.15
    )
    
    # 分割情報を表示
    print_split_info(split_info)
    
    # ファイルに保存（parquet形式）
    save_split_data(
        df_train, df_valid, df_test, split_info,
        output_dir='data/processed_data',
        format='parquet'
    )
else:
    print("Error: df_process_selected is not available")

Time Series Split Information

TRAIN:
  Date range: 20251203 ~ 20251217
  Samples: 4,292,431 (68.4%)

VALID:
  Date range: 20251218 ~ 20251220
  Samples: 864,598 (13.8%)

TEST:
  Date range: 20251221 ~ 20251224
  Samples: 1,119,694 (17.8%)
Saved split data to data/processed_data/
  - train.parquet: 4,292,431 samples
  - valid.parquet: 864,598 samples
  - test.parquet: 1,119,694 samples
  - split_info.json


In [None]:
# Preview Time Series Cross-Validation folds
from data_splitter import TimeSeriesCrossValidator

if 'df_process_selected' in locals() and df_process_selected is not None:
    # Expanding Window 交差検証のプレビュー
    cv = TimeSeriesCrossValidator(
        n_splits=5,
        test_size=2,  # 2日分をテスト
        gap=0,
        method='expanding',
        date_column='start_date'
    )
    cv.print_fold_summary(df_process_selected)

Time Series Cross-Validation Summary (EXPANDING)

Fold 0:
  Train: 20251203 ~ 20251205 (3 days, 934,277 samples)
  Test:  20251206 ~ 20251207 (2 days, 450,316 samples)

Fold 1:
  Train: 20251203 ~ 20251208 (6 days, 1,692,470 samples)
  Test:  20251209 ~ 20251210 (2 days, 619,814 samples)

Fold 2:
  Train: 20251203 ~ 20251211 (9 days, 2,624,390 samples)
  Test:  20251212 ~ 20251213 (2 days, 547,295 samples)

Fold 3:
  Train: 20251203 ~ 20251214 (12 days, 3,379,155 samples)
  Test:  20251215 ~ 20251216 (2 days, 600,570 samples)

Fold 4:
  Train: 20251203 ~ 20251217 (15 days, 4,292,431 samples)
  Test:  20251218 ~ 20251219 (2 days, 625,058 samples)


: 