# Пункт 1. Генерация признаков (7 фичей на категорию) по истории транзакций.
Выполнено локально из-за ограничений по памяти в kaggle.


Идея:
Для каждой тройки (client_pin_hash, period_start, day_period) и каждой категории (eng_cat)
строим 7 поведенческих признаков на основе транзакций клиента за последние 30 дней
до момента period_start.


Признаки (f1–f7) считаются отдельно для каждой категории cat:

  - f1_amount_share_30d  — доля суммы по категории в общей сумме клиента за последние 30 дней;
  - f2_days_since_last_cat — число дней с момента последней операции в этой категории;
  - f3_halfdays_since_any  — число “полудней” (12 часов) с момента последней операции
                           в любой категории;
  - f4_sum_amt_cat_30d     — суммарная сумма по категории за последние 30 дней;
  - f5_cnt_cat_30d         — число операций по категории за последние 30 дней;
  - f6_cnt_share_30d       — доля количества операций по категории в общем количестве операций
                           клиента за последние 30 дней;
  - f7_is_morning          — бинарный признак: 1, если day_period == 'morning', иначе 0.

In [None]:
import os
os.environ['KAGGLE_USERNAME'] = ''
os.environ['KAGGLE_KEY'] = ''

In [None]:
!kaggle competitions download -c alfa-predict-next-transactions-set

In [None]:
import pandas as pd


import zipfile
with zipfile.ZipFile('alfa-predict-next-transactions-set.zip', 'r') as zip_ref:
    zip_ref.extractall('unzipped')

In [None]:
from pathlib import Path
from datetime import date, timedelta
import duckdb, sys

In [None]:
# ===== ПУТИ =====
DATA_DIR = Path(r"D:\Merzliakova\unzipped")
TRAIN1   = DATA_DIR / r"train\train_1.parquet"
TRAIN2   = DATA_DIR / r"train\train_2.parquet"   # опционально
TRANS    = DATA_DIR / r"df_trans\df_trans.parquet"

OUT_DIR  = Path(r"D:\Data\duck_out"); OUT_DIR.mkdir(parents=True, exist_ok=True)
TMP_DIR  = Path(r"D:\Data\duck_tmp"); TMP_DIR.mkdir(parents=True, exist_ok=True)

DB_PATH  = OUT_DIR / "duck_feat7.db"   # БД на диске, чтобы не держать всё в RAM
OUT_FILE = OUT_DIR / "train_feat7_all.parquet"

# ===== Категории (ровно 32, порядок фикс) =====
cats = [
    'alkogol','arenda_avto','avto','azs','blagotvoritelnost','dom_i_remont','fastfud',
    'juvelirnye_izdelija','kafe_i_restorany','knigi','kommunalnye_uslugi','krasota','kredity',
    'obrazovanie','odezhda_i_obuv','prochie_rashody','puteshestvija','razvlechenija','sber_ecosystem',
    'shtrafy_i_nalogi','sportivnye_tovary','supermarkety','svjaz_internet_i_tv','tabak','taksi',
    'tehnika','transport','tsifrovye_tovary','tsvety','yandex_ecosystem','zdorove','zhivotnye'
]
cat_to_idx = {c:i for i,c in enumerate(cats)}

# Основной код генерации фичей

In [None]:
# ===== DuckDB =====
con = duckdb.connect(str(DB_PATH),
    config={
        "threads": "2",                          
        "preserve_insertion_order": "false",
        "temp_directory": str(TMP_DIR),          
    },
)
con.execute("SET memory_limit='32GB';")          

# train-файлы (train_2 — опционально)
train_paths = [TRAIN1]
if TRAIN2.exists():
    train_paths.append(TRAIN2)

# удобный литерал для read_parquet([...])
train_list_sql = "[" + ",".join(f"'{p.as_posix()}'" for p in train_paths) + "]"

# проверим входы
if not TRAIN1.exists():
    sys.exit(f"Не найден {TRAIN1}")
if not TRANS.exists():
    sys.exit(f"Не найден {TRANS}")

# список месяцев, которые встречаются в train (по полю period_start)
months = [r[0] for r in con.execute(f"""
SELECT DISTINCT date_trunc('month', CAST(period_start AS TIMESTAMP))::DATE AS mon
FROM read_parquet({train_list_sql})
WHERE period_start IS NOT NULL
ORDER BY mon
""").fetchall()]
if not months:
    sys.exit("В train_* не найдены данные (period_start NULL?)")

# основная итоговая таблица в БД (будем дописывать в out_all помесячно)
con.execute("DROP TABLE IF EXISTS out_all;")
con.execute("""
CREATE TABLE out_all (
  client_pin_hash             VARCHAR,
  period_start                TIMESTAMP,
  day_period                  VARCHAR,
  cat                         VARCHAR,
  cat_id                      INTEGER,
  y                           INTEGER,
  f1_amount_share_30d         DOUBLE,
  f2_days_since_last_cat      DOUBLE,
  f3_halfdays_since_any       INTEGER,
  f4_sum_amt_cat_30d          DOUBLE,
  f5_cnt_cat_30d              BIGINT,
  f6_cnt_share_30d            DOUBLE,
  f7_is_morning               INTEGER
);
""")

def dsql(d: date) -> str:
    return f"DATE '{d.isoformat()}'"

# ===== Основной цикл по месяцам =====

for mon in months:
    mon_start = mon
    mon_end = date(mon_start.year + (1 if mon_start.month == 12 else 0),
                   1 if mon_start.month == 12 else mon_start.month + 1, 1)
    trans_from = mon_start - timedelta(days=30)
    print(f"[{mon_start} .. {mon_end}) | trans window: [{trans_from} .. {mon_end})")

   # 1) отбираем строки train за текущий месяц
    con.execute(f"""
    CREATE TEMP TABLE train_m AS
    SELECT
      client_pin_hash                   AS cid,
      CAST(period_start AS TIMESTAMP)   AS ts,
      lower(day_period)                 AS dp,
      target
    FROM read_parquet({train_list_sql})
    WHERE period_start >= {dsql(mon_start)} AND period_start < {dsql(mon_end)};
    """)
    n_rows = con.execute("SELECT COUNT(*) FROM train_m;").fetchone()[0]
    if n_rows == 0:
        con.execute("DROP TABLE train_m;")
        continue

    # 2) загружаем транзакции за окно [mon_start-30d, mon_end)
    #    оставляем только нужные поля: клиент, время, категорию и сумму
    con.execute(f"""
    CREATE TEMP TABLE trans_m AS
    SELECT
      client_pin_hash                        AS cid,
      CAST(local_opdate_time AS TIMESTAMP)   AS ts,
      lower(eng_cat)                         AS cat,
      COALESCE(CAST(summarur_amt AS DOUBLE), 0.0) AS amt
    FROM read_parquet('{TRANS.as_posix()}')
    WHERE local_opdate_time >= {dsql(trans_from)} AND local_opdate_time < {dsql(mon_end)}
      AND eng_cat IS NOT NULL AND local_opdate_time IS NOT NULL
    ORDER BY cid, ts;
    """)

    # 3) базовые агрегаты по клиенту:
    #    - all30_sum / all30_cnt: сумма и количество всех транзакций клиента за 30 дней до ts
    #    - last_any_ts: время последней транзакции клиента до ts (любая категория)
    con.execute("""
    CREATE TEMP TABLE base_m AS
    SELECT
      t.cid, t.ts, t.dp, t.target,
      COALESCE((SELECT SUM(a.amt)  FROM trans_m a
                WHERE a.cid = t.cid AND a.ts >= t.ts - INTERVAL '30 days' AND a.ts < t.ts), 0) AS all30_sum,
      COALESCE((SELECT COUNT(*)    FROM trans_m a
                WHERE a.cid = t.cid AND a.ts >= t.ts - INTERVAL '30 days' AND a.ts < t.ts), 0) AS all30_cnt,
      (SELECT MAX(a.ts) FROM trans_m a
       WHERE a.cid = t.cid AND a.ts < t.ts) AS last_any_ts
    FROM train_m t;
    """)

    # 4) цикл по категориям: считаем 7 признаков на (клиент, ts, категория)
    for cat in cats:

        # транзакции только этой категории за текущий месяц (для ускорения подзапросов)
        idx = cat_to_idx[cat]
        con.execute(f"CREATE TEMP TABLE trans_cat_m AS SELECT cid, ts, amt FROM trans_m WHERE cat = '{cat}';")

        # вставляем строки для всех (cid, ts) из train_m:
        # считаем фичи по категории и извлекаем соответствующий таргет (с ним возникли проблемы)
        con.execute(f"""
        INSERT INTO out_all
        SELECT
          b.cid                                        AS client_pin_hash,
          b.ts                                         AS period_start,
          b.dp                                         AS day_period,
          '{cat}'                                      AS cat,
          {idx}                                        AS cat_id,
          CAST(list_extract(b.target, {idx}) AS INTEGER) AS y,

          -- 1) сумма(кат)/сумма(все) за 30д
          CASE WHEN b.all30_sum > 0
               THEN CAST((SELECT COALESCE(SUM(x.amt), 0.0)
                         FROM trans_cat_m x
                         WHERE x.cid = b.cid AND x.ts >= b.ts - INTERVAL '30 days' AND x.ts < b.ts) / b.all30_sum AS DOUBLE)
               ELSE NULL END                           AS f1_amount_share_30d,

          -- 2) дни с последней транзакции в этой категории
          CASE WHEN (SELECT MAX(x.ts) FROM trans_cat_m x WHERE x.cid = b.cid AND x.ts < b.ts) IS NOT NULL
               THEN CAST(date_diff('day',
                                   (SELECT MAX(x.ts) FROM trans_cat_m x WHERE x.cid = b.cid AND x.ts < b.ts),
                                   b.ts) AS DOUBLE)
               ELSE NULL END                           AS f2_days_since_last_cat,

          -- 3) полудни (12ч) с последней транзакции в любой категории
          CASE WHEN b.last_any_ts IS NOT NULL
               THEN CAST(floor(date_diff('hour', b.last_any_ts, b.ts) / 12.0) AS INTEGER)
               ELSE NULL END                           AS f3_halfdays_since_any,

          -- 4) сумма по категории за 30д
          CAST((SELECT COALESCE(SUM(x.amt), 0.0)
                FROM trans_cat_m x
                WHERE x.cid = b.cid AND x.ts >= b.ts - INTERVAL '30 days' AND x.ts < b.ts) AS DOUBLE) AS f4_sum_amt_cat_30d,

          -- 5) кол-во по категории за 30д
          CAST((SELECT COALESCE(COUNT(*), 0)
                FROM trans_cat_m x
                WHERE x.cid = b.cid AND x.ts >= b.ts - INTERVAL '30 days' AND x.ts < b.ts) AS BIGINT) AS f5_cnt_cat_30d,

          -- 6) доля кол-ва
          CASE WHEN b.all30_cnt > 0
               THEN CAST((SELECT COALESCE(COUNT(*), 0)
                          FROM trans_cat_m x
                          WHERE x.cid = b.cid AND x.ts >= b.ts - INTERVAL '30 days' AND x.ts < b.ts)::DOUBLE
                        / b.all30_cnt AS DOUBLE)
               ELSE NULL END                           AS f6_cnt_share_30d,

          -- 7) бинарный признак утро/вечер
          CASE WHEN b.dp = 'morning' THEN 1 ELSE 0 END AS f7_is_morning
        FROM base_m b;
        """)

        con.execute("DROP TABLE trans_cat_m;")

    # очистка месячных временных таблиц
    con.execute("DROP TABLE base_m;")
    con.execute("DROP TABLE trans_m;")
    con.execute("DROP TABLE train_m;")

# 5) выгружаем одним parquet (ZSTD)
con.execute(f"COPY out_all TO '{OUT_FILE.as_posix()}' (FORMAT PARQUET, COMPRESSION ZSTD);")
con.close()
print(f"✅ Saved: {OUT_FILE}")


[2025-02-01 .. 2025-03-01) | trans window: [2025-01-02 .. 2025-03-01)
[2025-03-01 .. 2025-04-01) | trans window: [2025-01-30 .. 2025-04-01)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [None]:
import pyarrow.parquet as pq

pf = pq.ParquetFile(OUT_FILE)

batch_iter = pf.iter_batches(batch_size=10)
first_batch = next(batch_iter)   # первый батч на 10 строк
df = first_batch.to_pandas()

df

Unnamed: 0,client_pin_hash,period_start,day_period,cat,cat_id,y,f1_amount_share_30d,f2_days_since_last_cat,f3_halfdays_since_any,f4_sum_amt_cat_30d,f5_cnt_cat_30d,f6_cnt_share_30d,f7_is_morning
0,029238aa8dc678d19c99f07bec9f39f1,2025-03-31 00:00:00,morning,alkogol,0,,0.0,,0,0.0,0,0.0,1
1,0292c6232b28254c63aa003987c8119c,2025-03-28 00:00:00,morning,alkogol,0,,0.0,,7,0.0,0,0.0,1
2,0292fbdb9d18cf09a0ce474aee9ee48a,2025-03-31 00:00:00,morning,alkogol,0,,0.0,,2,0.0,0,0.0,1
3,02931df2cabf2f3cf11d93fe3bcde25a,2025-03-31 00:00:00,morning,alkogol,0,,0.0,,0,0.0,0,0.0,1
4,0294010dc61a1cebf857cc5c558ca281,2025-03-25 00:00:00,morning,alkogol,0,,0.0,,1,0.0,0,0.0,1
5,02943b66f38258e7d37b347b7840ee32,2025-03-31 15:00:00,evening,alkogol,0,,0.0,,2,0.0,0,0.0,0
6,029532c2ad861f1772278a3e496f2091,2025-03-28 15:00:00,evening,alkogol,0,,0.0,,0,0.0,0,0.0,0
7,02953318404fbb835a62a0f779240392,2025-03-28 15:00:00,evening,alkogol,0,,0.0,,5,0.0,0,0.0,0
8,02956cecfaa16ddad87d2604efbc8adf,2025-03-31 00:00:00,morning,alkogol,0,,0.0,,0,0.0,0,0.0,1
9,02957cfa9c2cbe95d4379e84a8fa232a,2025-03-31 15:00:00,evening,alkogol,0,,0.0,,1,0.0,0,0.0,0
