# Przetwarzanie Danych - Przygotowanie do Modelowania

Ten notebook zawiera wszystkie kroki przetwarzania danych niezbędne do trenowania modeli prognozowania popytu.

## Cele przetwarzania:
- Czyszczenie i filtrowanie danych
- Tworzenie cech czasowych
- Agregacja danych do poziomia dziennego/tygodniowego
- Przygotowanie sekwencji dla modeli LSTM
- Podział na zbiory treningowe/walidacyjne/testowe

In [2]:
from pathlib import Path
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd
import joblib
from sklearn.preprocessing import StandardScaler

DATA_DIR = Path('/content/data')
OUT_DIR  = Path('/content/data_processed')
OUT_DIR.mkdir(parents=True, exist_ok=True)

LAG_LIST      = [7, 14, 28]
ROLL_WINDOWS  = [7, 28]
SEQ_LENGTH    = 60
PRED_LENGTH   = 15

## 1. Load raw CSV files

In [3]:
def load_raw(path: Path = DATA_DIR) -> Dict[str, pd.DataFrame]:
    """Read every CSV we need and parse dates."""
    files = {f.stem: f for f in path.glob('*.csv')}
    required = ["train", "test", "stores", "oil", "holidays_events", "transactions"]
    missing  = [k for k in required if k not in files]
    if missing:
        raise FileNotFoundError(f"Missing files: {missing}")

    df = {
        'train':  pd.read_csv(files['train'],  parse_dates=['date']),
        'test':   pd.read_csv(files['test'],   parse_dates=['date']),
        'stores': pd.read_csv(files['stores']),
        'oil':    pd.read_csv(files['oil'],    parse_dates=['date']),
        'holidays':      pd.read_csv(files['holidays_events'], parse_dates=['date']),
        'transactions':  pd.read_csv(files['transactions'],   parse_dates=['date']),
    }
    return df

## 2. Basic cleaning

In [4]:
def basic_clean(train: pd.DataFrame) -> pd.DataFrame:
    before = len(train)
    train = train.drop_duplicates(subset=['date', 'store_nbr', 'family'])
    print(f"Dropped {before - len(train):,} duplicate rows")

    upper = train['sales'].quantile(0.999)
    train['sales'] = train['sales'].clip(upper=upper)
    return train

## 3. Calendar features

In [5]:
def add_calendar(df: pd.DataFrame) -> pd.DataFrame:
    d = df['date']
    df['year']      = d.dt.year.astype('int16')
    df['month']     = d.dt.month.astype('int8')
    df['dow']       = d.dt.weekday.astype('int8')
    df['weekofyr']  = d.dt.isocalendar().week.astype('int8')

    df['dow_sin']   = np.sin(2 * np.pi * df['dow']   / 7)
    df['dow_cos']   = np.cos(2 * np.pi * df['dow']   / 7)
    df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)
    df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)
    return df

## 4. Merge external tables

In [6]:
def merge_external(train: pd.DataFrame, raw: Dict[str, pd.DataFrame]) -> pd.DataFrame:
    oil = raw['oil'].set_index('date')
    oil = oil.reindex(pd.date_range('2013-01-01', '2017-08-31', freq='D'))
    oil['dcoilwtico'] = oil['dcoilwtico'].interpolate().ffill().bfill()
    oil['oil_ma30']   = oil['dcoilwtico'].rolling(30).mean()
    oil['oil_pct_7']  = oil['dcoilwtico'].pct_change(7)
    oil = oil.reset_index().rename(columns={'index': 'date'})

    hol = raw['holidays']
    nat_hol = hol[(hol['locale'] == 'National') & (hol['transferred'] == False)]
    nat_flag = pd.DataFrame({'date': nat_hol['date'].dt.normalize(), 'is_natl_holiday': 1})

    tx = (raw['transactions']
          .groupby('date')['transactions']
          .sum()
          .rolling(7).mean()
          .reset_index())

    stores = raw['stores']

    df = (train
          .merge(oil, on='date', how='left')
          .merge(nat_flag, on='date', how='left')
          .merge(tx, on='date', how='left')
          .merge(stores, on='store_nbr', how='left'))

    df['is_natl_holiday'] = df['is_natl_holiday'].fillna(0).astype('int8')
    df['transactions']    = df['transactions'].fillna(method='ffill').fillna(0)
    return df

## 5. Lag & rolling statistics

In [7]:
def add_lags(df: pd.DataFrame, group_cols: List[str]) -> pd.DataFrame:
    g = df.sort_values('date').groupby(group_cols)
    for lag in LAG_LIST:
        df[f'lag_{lag}'] = g['sales'].shift(lag)
    for win in ROLL_WINDOWS:
        df[f'roll_mean_{win}'] = g['sales'].shift(1).rolling(win).mean()
        df[f'roll_std_{win}']  = g['sales'].shift(1).rolling(win).std()
    df['is_zero'] = (df['sales'] == 0).astype('int8')
    return df

In [8]:
from sklearn.preprocessing import LabelEncoder

def encode_categorical(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, LabelEncoder]]:
    categorical_cols = ['family', 'city', 'state', 'type']
    encoders = {}

    for col in categorical_cols:
        if col in df.columns:
            print(f"  Encoding {col}: {df[col].nunique()} unique values")
            encoder = LabelEncoder()
            df[col] = encoder.fit_transform(df[col].astype(str))
            encoders[col] = encoder
        else:
            print(f"  Warning: Column {col} not found in dataframe")

    joblib.dump(encoders, OUT_DIR / 'categorical_encoders.pkl')
    print(f"Saved {len(encoders)} categorical encoders")

    return df, encoders

## 6. Scaling numeric columns

In [9]:
def scale_numeric(df: pd.DataFrame) -> Dict[str, StandardScaler]:
    num_cols = (
        df.select_dtypes(include=['float64', 'float32'])
          .columns.difference(['sales'])
          .tolist()
    )

    scalers = {}
    for col in num_cols:
        sc = StandardScaler()
        df[col] = sc.fit_transform(df[[col]])
        scalers[col] = sc
    joblib.dump(scalers, OUT_DIR / 'scalers.pkl')
    return scalers

## 7. Chronological split

In [10]:
def chrono_split(df: pd.DataFrame,
                 train_end: str = '2016-12-31',
                 val_end: str   = '2017-06-30') -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    train = df[df['date'] <= train_end]
    val   = df[(df['date'] > train_end) & (df['date'] <= val_end)]
    test  = df[df['date'] > val_end]
    return train, val, test

## 8. Sequence builder (optional for LSTM)

In [11]:
def build_sequences(df: pd.DataFrame, group_cols: List[str]) -> Tuple[np.ndarray, np.ndarray]:
    X_list, y_list = [], []
    feature_cols = df.columns.difference(['sales', 'date']).tolist()

    for _, grp in df.groupby(group_cols):
        grp = grp.sort_values('date')
        Xv = grp[feature_cols].values.astype(np.float32)
        yv = grp['sales'].values.astype(np.float32)
        for i in range(SEQ_LENGTH, len(grp) - PRED_LENGTH + 1):
            X_list.append(Xv[i - SEQ_LENGTH:i])
            y_list.append(yv[i:i + PRED_LENGTH])

    return np.stack(X_list), np.stack(y_list)

## 9. Run the full pipeline

In [12]:
def run_pipeline(build_seq: bool = False):
    raw = load_raw()
    train = basic_clean(raw['train'])

    train = add_calendar(train)
    train = merge_external(train, raw)

    train = add_lags(train, ['store_nbr', 'family'])

    train.dropna(inplace=True)

    train, categorical_encoders = encode_categorical(train)

    scale_numeric(train)

    train_df, val_df, test_df = chrono_split(train)

    if build_seq:
        Xtr, ytr = build_sequences(train_df, ['store_nbr', 'family'])
        Xval, yval = build_sequences(val_df,   ['store_nbr', 'family'])
        Xte, yte  = build_sequences(test_df,  ['store_nbr', 'family'])
        np.savez_compressed(OUT_DIR / 'lstm_data.npz',
                            X_train=Xtr, y_train=ytr,
                            X_val=Xval, y_val=yval,
                            X_test=Xte, y_test=yte)
    else:
        train_df.to_parquet(OUT_DIR / 'train.parquet')
        val_df.to_parquet(OUT_DIR / 'val.parquet')
        test_df.to_parquet(OUT_DIR / 'test.parquet')

    print('Pipeline complete. Files saved to', OUT_DIR.resolve())
    print(f'Encoded categorical columns: {list(categorical_encoders.keys())}')
    return train_df, val_df, test_df

In [13]:
train_df, val_df, test_df = run_pipeline(build_seq=False)

Dropped 0 duplicate rows


  df['transactions']    = df['transactions'].fillna(method='ffill').fillna(0)


  Encoding family: 33 unique values
  Encoding city: 22 unique values
  Encoding state: 16 unique values
  Encoding type: 5 unique values
Saved 4 categorical encoders
Pipeline complete. Files saved to /content/data_processed
Encoded categorical columns: ['family', 'city', 'state', 'type']


### Pre-processing Pipeline – key steps

- **Load raw CSVs** – read `train`, `test`, `stores`, `oil`, `holidays_events`, `transactions`; parse the `date` column.

- **Basic cleaning** – drop duplicate `(date, store_nbr, family)` rows and cap the top 0.1 % of `sales` values to reduce extreme spikes.

- **Calendar features** – add `year`, `month`, `dow`, `weekofyr` plus cyclic encodings `dow_sin`, `dow_cos`, `month_sin`, `month_cos` to capture weekly and annual seasonality.

- **Merge external tables** – join  
  * interpolated WTI price (`dcoilwtico`) + 30-day moving average + 7-day % change,  
  * national-holiday flag,  
  * 7-day-smoothed `transactions`,  
  * store metadata (`city`, `state`, `type`, `cluster`).

- **Lag & rolling statistics** – per `(store_nbr, family)` create `lag_7/14/28`, rolling mean & std for 7/28-day windows, and an `is_zero` flag.

- **Drop warm-up NaNs** – remove rows that lack full lag/rolling history.

- **Scale numeric features** – apply `StandardScaler` to all float columns; save fitted scalers to `scalers.pkl`.

- **Chronological split** – slice into  
  * **train** ≤ 2016-12-31,  
  * **validation** 2017-01-01 → 2017-06-30,  
  * **test** ≥ 2017-07-01,  
  ensuring no future leakage.

- **Save processed data** – by default write three Parquet files (`train/val/test.parquet`); if `build_seq=True`, also export 60-day LSTM tensors in `lstm_data.npz`.

- **Return ready datasets** – the function returns `train_df`, `val_df`, `test_df` in memory, giving clean, feature-rich, leak-free inputs for baselines or neural nets.
