# 4. Feature engineering - v2

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import os

from notebooks.utils import save_features, save_dataframes

In [None]:
upstream = {
    'split_data': {
        'train': 'data/interim/x_train_split.parquet.gzip',
        'val': 'data/interim/x_val_split.parquet.gzip',
        'test': 'data/interim/x_test_split.parquet.gzip'
    }
}
product = None
ROOT = Path(os.path.abspath("")).resolve().parents[0]
DATA = os.path.join(ROOT, "data")
INTERIM_DATA = os.path.join(DATA, "interim")
VERSION = "v2"
eps = 1e-9

In [None]:
X_train = pd.read_parquet(os.path.join(INTERIM_DATA, "x_train_split.parquet.gzip"))
X_val = pd.read_parquet(os.path.join(INTERIM_DATA, "x_val_split.parquet.gzip"))
X_test = pd.read_parquet(os.path.join(INTERIM_DATA, "x_test_split.parquet.gzip"))

In [4]:
DATAFRAMES = [X_train, X_val, X_test]

## Features to build

In [5]:
for i, df in enumerate(DATAFRAMES):
    df["tx_amount_log_mean"] = df.groupby("customer_id")["tx_amount_log"].transform(
        "mean"
    )
    df["tx_amount_log_std"] = df.groupby("customer_id")["tx_amount_log"].transform(
        "std"
    )
    df["tx_amount_log_deviates"] = (
        (df["tx_amount_log"] < (df["tx_amount_log_mean"] - df["tx_amount_log_std"]))
        | (df["tx_amount_log"] > (df["tx_amount_log_mean"] + df["tx_amount_log_std"]))
    ).astype(int)
    DATAFRAMES[i] = df

In [6]:
for i, df in enumerate(DATAFRAMES):
    df["secs_since_prev_tx"] = (
        df.groupby("customer_id")["tx_datetime"].diff().dt.total_seconds().fillna(-1)
    )
    DATAFRAMES[i] = df

In [7]:
for i, df in enumerate(DATAFRAMES):
    df["burst_id"] = df.groupby("customer_id")["secs_since_prev_tx"].transform(
        lambda x: (x > 3600).cumsum()
    )
    df["n_tx_in_burst"] = df.groupby(["customer_id", "burst_id"])[
        "tx_amount_log"
    ].transform("count")
    df["burst_mean"] = (
        df.groupby("customer_id")["n_tx_in_burst"].transform("mean").fillna(0)
    )
    df["burst_std"] = (
        df.groupby("customer_id")["n_tx_in_burst"].transform("std").fillna(0)
    )
    df["n_trx_per_burst_deviates"] = (
        (df["n_tx_in_burst"] < (df["burst_mean"] - df["burst_std"]))
        | (df["n_tx_in_burst"] > (df["burst_mean"] + df["burst_std"]))
    ).astype(int)
    DATAFRAMES[i] = df

In [None]:
eps = 1e-9

for i, df in enumerate(DATAFRAMES):
    df["zscore"] = (
        df.groupby("customer_id")
        .apply(
            lambda x: (x["tx_amount_log"] - x["tx_amount_log_mean"])
            / x["tx_amount_log_std"]
        )
        .to_numpy()
    )
    df["is_zscore_outlier"] = (df["zscore"] > 3).astype(int)
    q1 = df.groupby("customer_id")["tx_amount_log"].transform(
        lambda s: s.quantile(0.25)
    )
    q3 = df.groupby("customer_id")["tx_amount_log"].transform(
        lambda s: s.quantile(0.75)
    )
    med = df.groupby("customer_id")["tx_amount_log"].transform("median")
    iqr = (q3 - q1).replace(0, np.nan).fillna(eps)
    df["is_iqr_outlier"] = (
        ~df["tx_amount_log"].between(q1 - 0.5 * iqr, q3 + 0.5 * iqr)
    ).astype(int)

    df["tx_amount_log_scaled"] = (df["tx_amount_log"] - med) / iqr
    df["is_rs_anomaly"] = (df["tx_amount_log_scaled"].abs() > 1.5).astype(int)
    DATAFRAMES[i] = df

  df['zscore'] = df.groupby('customer_id').apply(lambda x: (x['tx_amount_log'] - x['tx_amount_log_mean']) / x['tx_amount_log_std']).to_numpy()
  df['zscore'] = df.groupby('customer_id').apply(lambda x: (x['tx_amount_log'] - x['tx_amount_log_mean']) / x['tx_amount_log_std']).to_numpy()
  df['zscore'] = df.groupby('customer_id').apply(lambda x: (x['tx_amount_log'] - x['tx_amount_log_mean']) / x['tx_amount_log_std']).to_numpy()


In [9]:
for i, df in enumerate(DATAFRAMES):
    df["hour_zscore"] = (
        df.groupby("customer_id")
        .apply(lambda x: (x["hour"] - x["hour"].mean()) / x["hour"].std())
        .to_numpy()
    )
    df["hour_zscore_deviates"] = (df["hour_zscore"].abs() > 2).astype(int)
    DATAFRAMES[i] = df

  df['hour_zscore'] = df.groupby('customer_id').apply(lambda x: (x['hour'] - x['hour'].mean()) / x['hour'].std()).to_numpy()
  df['hour_zscore'] = df.groupby('customer_id').apply(lambda x: (x['hour'] - x['hour'].mean()) / x['hour'].std()).to_numpy()
  df['hour_zscore'] = df.groupby('customer_id').apply(lambda x: (x['hour'] - x['hour'].mean()) / x['hour'].std()).to_numpy()


In [10]:
window = 20
min_periods = 5

for i, df in enumerate(DATAFRAMES):
    df = df.sort_values(["customer_id", "tx_datetime"]).copy()
    g = df.groupby("customer_id", sort=False)
    r5 = g["tx_amount_log"].rolling(window=window, min_periods=min_periods)
    r1 = g["tx_amount_log"].rolling(window=window, min_periods=1)

    # базовые rolling-метрики (могут быть NaN на первых minp-1 строках)
    med5 = r5.median().reset_index(level=0, drop=True)
    q1_5 = r5.quantile(0.25).reset_index(level=0, drop=True)
    q3_5 = r5.quantile(0.75).reset_index(level=0, drop=True)

    # фолбэк-метрики (без NaN из-за min_periods)
    med1 = r1.median().reset_index(level=0, drop=True)
    q1_1 = r1.quantile(0.25).reset_index(level=0, drop=True)
    q3_1 = r1.quantile(0.75).reset_index(level=0, drop=True)

    # подставляем фолбэк туда, где базовые NaN
    df["rolling_median"] = med5.fillna(med1)
    df["q1"] = q1_5.fillna(q1_1)
    df["q3"] = q3_5.fillna(q3_1)

    df["iqr"] = df["q3"] - df["q1"]

    df["amount_robust_rolling20"] = (df["tx_amount_log"] - df["rolling_median"]) / (
        df["iqr"] + eps
    )

    df["amount_robust_rolling20"] = df["amount_robust_rolling20"].fillna(0)
    df["is_amount_robust_rolling_outlier"] = (df["amount_robust_rolling20"] > 3).astype(
        int
    )
    DATAFRAMES[i] = df

In [11]:
for i, df in enumerate(DATAFRAMES):
    mean_ = df.groupby("customer_id")["day_of_week"].transform("mean")
    std_ = df.groupby("customer_id")["day_of_week"].transform("std")
    df["day_of_week_mean"] = mean_.fillna(0)
    df["day_of_week_std"] = std_.fillna(0)
    df["is_day_of_week_mean_outlier"] = (
        (df["day_of_week"] < (df["day_of_week_mean"] - df["day_of_week_std"]))
        | (df["day_of_week"] > (df["day_of_week_mean"] + df["day_of_week_std"]))
    ).astype(int)
    df["day_of_week_zscore"] = (
        df.groupby("customer_id")
        .apply(
            lambda x: (x["day_of_week"] - x["day_of_week_mean"]) / x["day_of_week_std"]
        )
        .to_numpy()
    )
    df["is_day_of_week_zscore_outlier"] = (df["day_of_week_zscore"].abs() > 2).astype(
        int
    )
    DATAFRAMES[i] = df

  df['day_of_week_zscore'] =  df.groupby('customer_id').apply(lambda x: (x['day_of_week'] - x['day_of_week_mean']) / x['day_of_week_std']).to_numpy()
  df['day_of_week_zscore'] =  df.groupby('customer_id').apply(lambda x: (x['day_of_week'] - x['day_of_week_mean']) / x['day_of_week_std']).to_numpy()
  df['day_of_week_zscore'] =  df.groupby('customer_id').apply(lambda x: (x['day_of_week'] - x['day_of_week_mean']) / x['day_of_week_std']).to_numpy()


In [12]:
for i, df in enumerate(DATAFRAMES):
    dfc = df.sort_values(["customer_id", "tx_datetime"]).reset_index(drop=True).copy()
    dfc["n_tx_in_prev_24h"] = 0
    one_day = np.timedelta64(1, "D")

    for cid, grp in dfc.groupby("customer_id"):
        t = grp["tx_datetime"].to_numpy("datetime64[ns]")
        left = t - one_day
        j = np.searchsorted(t, left, side="left")
        cnt = np.arange(len(t)) - j
        dfc.loc[grp.index, "n_tx_in_prev_24h"] = cnt.astype(int)

    g = dfc.groupby("customer_id")
    dfc["q90_prev"] = g["n_tx_in_prev_24h"].transform(
        lambda s: s.shift(1).expanding().quantile(0.90).fillna(0)
    )
    dfc["is_24h_burst"] = (
        (dfc["n_tx_in_prev_24h"] >= dfc["q90_prev"]).fillna(False).astype(int)
    )
    dfc["is_24h_burst_fixed"] = (dfc["n_tx_in_prev_24h"] >= 3).astype(int)
    dfc["day"] = dfc["tx_datetime"].dt.date
    dg = dfc.groupby(["customer_id", "day"])["tx_amount_log"]

    day_median = dg.transform("median")
    day_mad = dg.transform(lambda s: (s - s.median()).abs().median())
    dfc["z_in_day_robust"] = (dfc["tx_amount_log"] - day_median) / (
        1.4826 * day_mad + eps
    )

    dfc["is_anomalous_in_day"] = (dfc["z_in_day_robust"].abs() > 2.5).astype(int)
    dfc["fraud_burst_candidate"] = (
        (dfc["is_24h_burst"] == 1) & (dfc["z_in_day_robust"] > 1.0)
    ).astype(int)
    DATAFRAMES[i] = dfc

## Save features

In [13]:
columns = [
    "tx_amount",
    "sector_id",
    "tx_amount_log",
    "hour",
    "month",
    "is_month_start",
    "is_month_end",
    "is_weekend",
    "tx_amount_log_mean",
    "tx_amount_log_std",
    "tx_amount_log_deviates",
    "secs_since_prev_tx",
    "burst_id",
    "n_tx_in_burst",
    "burst_mean",
    "burst_std",
    "n_trx_per_burst_deviates",
    "hour_zscore",
    "hour_zscore_deviates",
    "rolling_median",
    "q1",
    "q3",
    "iqr",
    "amount_robust_rolling20",
    "is_amount_robust_rolling_outlier",
    "day_of_week_mean",
    "day_of_week_std",
    "is_day_of_week_mean_outlier",
    "day_of_week_zscore",
    "is_day_of_week_zscore_outlier",
    "n_tx_in_prev_24h",
    "q90_prev",
    "is_24h_burst",
    "is_24h_burst_fixed",
    "day_of_week",
    "z_in_day_robust",
    "is_anomalous_in_day",
    "fraud_burst_candidate",
]

In [14]:
X_train, X_val, X_test = DATAFRAMES

In [15]:
X_train.isna().sum()

customer_id                         0
tx_datetime                         0
tx_amount                           0
sector_id                           0
tx_fraud                            0
tx_amount_log                       0
ones                                0
day_of_week                         0
hour                                0
month                               0
is_month_start                      0
is_month_end                        0
is_weekend                          0
tx_amount_log_mean                  0
tx_amount_log_std                   0
tx_amount_log_deviates              0
secs_since_prev_tx                  0
burst_id                            0
n_tx_in_burst                       0
burst_mean                          0
burst_std                           0
n_trx_per_burst_deviates            0
zscore                              0
is_zscore_outlier                   0
is_iqr_outlier                      0
tx_amount_log_scaled                0
is_rs_anomal

In [None]:
save_features(os.path.join(INTERIM_DATA, f"features_{VERSION}.yaml"), columns)

In [17]:
X_train, X_val, X_test = DATAFRAMES

In [18]:
import os

save_dataframes(
    {
        os.path.join(INTERIM_DATA, f"x_train_features_{VERSION}.parquet.gzip"): X_train,
        os.path.join(INTERIM_DATA, f"x_val_features_{VERSION}.parquet.gzip"): X_val,
        os.path.join(INTERIM_DATA, f"x_test_features_{VERSION}.parquet.gzip"): X_test,
    }
)