In [1]:
!pip -q install --only-binary=:all: duckdb==1.0.0 lightgbm==4.3.0

In [2]:
import numpy as np
import pandas as pd
import duckdb
import lightgbm as lgb
from lightgbm import LGBMRegressor

In [3]:
print(duckdb.__version__)

1.0.0


# 1. Importando os arquivos

In [4]:
df27 = pd.read_parquet('part27.snappy.parquet')
df51 = pd.read_parquet('part51.snappy.parquet')
df71 = pd.read_parquet('part71.snappy.parquet')

In [5]:
df27.info()
df51.info()
df71.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14419 entries, 0 to 14418
Data columns (total 4 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   pdv            14419 non-null  object
 1   premise        14419 non-null  object
 2   categoria_pdv  14419 non-null  object
 3   zipcode        14419 non-null  int32 
dtypes: int32(1), object(3)
memory usage: 394.4+ KB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6560698 entries, 0 to 6560697
Data columns (total 11 columns):
 #   Column               Dtype  
---  ------               -----  
 0   internal_store_id    object 
 1   internal_product_id  object 
 2   distributor_id       object 
 3   transaction_date     object 
 4   reference_date       object 
 5   quantity             float64
 6   gross_value          float64
 7   net_value            float64
 8   gross_profit         float64
 9   discount             float64
 10  taxes                float64
dtypes: float64(6), object(

# 2. Fazendo o join entre as tabelas

Utilizamos Duck DB nessa versão ao invés de PySpark e Pandas para fazer os joins e agregações

In [6]:
con = duckdb.connect()
con.register("tx", df51)
con.register("st", df27.rename(columns={"pdv":"internal_store_id"}))
con.register("pr", df71.rename(columns={"produto":"internal_product_id"}))

# Query built by the AI
weekly = con.execute("""
WITH txc AS (
  SELECT
    internal_store_id,
    internal_product_id,
    CAST(transaction_date AS TIMESTAMP) AS dt,
    CAST(quantity       AS DOUBLE) AS qty,
    CAST(gross_value    AS DOUBLE) AS gross_value,
    CAST(net_value      AS DOUBLE) AS net_value,
    CAST(gross_profit   AS DOUBLE) AS gross_profit,
    CAST(discount       AS DOUBLE) AS discount,
    CAST(taxes          AS DOUBLE) AS taxes
  FROM tx
  -- don't rely on the SELECT alias 'dt' in WHERE; filter on the original column
  WHERE transaction_date >= DATE '2022-01-01'
    AND transaction_date <  DATE '2023-01-01'
),
txw AS (
  SELECT
    internal_store_id,
    internal_product_id,
    (CAST(date_trunc('week', dt) AS DATE) + INTERVAL 6 DAY) AS week_end,  -- W-SAT
    SUM(qty) AS quantidade,
    AVG(CASE WHEN qty > 0 THEN gross_value/qty END) AS price_gross,
    AVG(CASE WHEN qty > 0 THEN net_value/qty   END) AS price_net,
    AVG(CASE WHEN qty > 0 THEN gross_profit/qty END) AS margin,
    AVG(discount) AS disc,
    AVG(taxes)    AS taxes
  FROM txc
  GROUP BY 1,2,3
),
joined AS (
  SELECT
    w.internal_store_id AS pdv,
    w.internal_product_id AS produto,
    w.week_end, w.quantidade, w.price_gross, w.price_net, w.margin, w.disc, w.taxes,
    s.premise, s.categoria_pdv, s.zipcode,
    p.categoria, p.tipos, p.label, p.subcategoria, p.marca, p.fabricante
  FROM txw w
  LEFT JOIN st s ON w.internal_store_id  = s.internal_store_id
  LEFT JOIN pr p ON w.internal_product_id = p.internal_product_id
)
SELECT * FROM joined
ORDER BY pdv, produto, week_end
""").df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [7]:
# basic cleaning / types
num_cols = ["quantidade","price_gross","price_net","margin","disc","taxes","zipcode"]
for c in num_cols:
    if c in weekly.columns:
        weekly[c] = pd.to_numeric(weekly[c], errors="coerce").fillna(0)

weekly["pdv"] = weekly["pdv"].astype(str)
weekly["produto"] = weekly["produto"].astype(str)
weekly["week_end"] = pd.to_datetime(weekly["week_end"])

print("weekly rows:", len(weekly), "pairs:", weekly[["pdv","produto"]].drop_duplicates().shape[0])

weekly rows: 6247301 pairs: 1044310


In [8]:
# keep only pairs with any 2022 sales
pair_sum = weekly.groupby(["pdv","produto"])["quantidade"].sum()
active_pairs = set(pair_sum[pair_sum > 0].index)
weekly = weekly[weekly[["pdv","produto"]].apply(tuple, axis=1).isin(active_pairs)].reset_index(drop=True)

# downcast numerics
for c in weekly.select_dtypes("float64").columns:
    weekly[c] = pd.to_numeric(weekly[c], downcast="float")
for c in weekly.select_dtypes("int64").columns:
    weekly[c] = pd.to_numeric(weekly[c], downcast="integer")

print("after prune:", weekly.shape)

after prune: (6183034, 18)


In [9]:
# Clip negatives at source
neg_before = int((weekly["quantidade"] < 0).sum())
weekly["quantidade_raw"] = weekly["quantidade"]
weekly["quantidade"] = weekly["quantidade"].clip(lower=0)
print(f"Negative weekly sums before clip: {neg_before}  | after clip: {(weekly['quantidade'] < 0).sum()}")

Negative weekly sums before clip: 35463  | after clip: 0


Feature engineering (lags/rolls): still important

In [10]:
def add_feats(df):
    df = df.sort_values(["pdv","produto","week_end"]).copy()
    g = df.groupby(["pdv","produto"], sort=False)
    df["lag1"] = g["quantidade"].shift(1)
    df["lag2"] = g["quantidade"].shift(2)
    df["lag3"] = g["quantidade"].shift(3)
    df["lag4"] = g["quantidade"].shift(4)
    df["rmean4"]  = g["quantidade"].rolling(4, min_periods=1).mean().reset_index(level=[0,1], drop=True).shift(1)
    df["rmean12"] = g["quantidade"].rolling(12, min_periods=1).mean().reset_index(level=[0,1], drop=True).shift(1)
    for c in ["price_gross","price_net","margin","disc","taxes"]:
        if c in df.columns:
            df[f"{c}_lag1"] = g[c].shift(1)
    # context (fallbacks)
    df["store_rmean4"] = df.groupby("pdv")["quantidade"].rolling(4, min_periods=1).mean().reset_index(level=0, drop=True).shift(1)
    df["prod_rmean4"]  = df.groupby("produto")["quantidade"].rolling(4, min_periods=1).mean().reset_index(level=0, drop=True).shift(1)
    return df

weekly = add_feats(weekly)

In [11]:
# One-step-ahead target (and clip target to non-negative)
weekly["target_next"] = weekly.groupby(["pdv","produto"])["quantidade"].shift(-1)
weekly["target_next"] = weekly["target_next"].clip(lower=0)

In [12]:
train_df = weekly.dropna(subset=["lag1","target_next"]).copy()

In [13]:
# Categoricals
cat_cols = ["pdv","produto","premise","categoria_pdv","categoria","tipos","label","subcategoria","marca","fabricante"]
for c in cat_cols:
    if c in train_df.columns:
        train_df[c] = train_df[c].astype("category")

In [14]:
# Time index (no holidays)
train_df["week_ord"] = train_df["week_end"].astype("int64") // 10**9

In [15]:
feature_cols = [
    "pdv","produto","week_ord",
    "lag1","lag2","lag3","lag4","rmean4","rmean12",
    "price_gross","price_net","margin","disc","taxes",
    "price_gross_lag1","price_net_lag1","margin_lag1","disc_lag1","taxes_lag1",
    "store_rmean4","prod_rmean4",
    "premise","categoria_pdv","categoria","tipos","label","subcategoria","marca","fabricante",
    "zipcode",
]
feature_cols = [c for c in feature_cols if c in train_df.columns]

X = train_df[feature_cols].copy()
y = train_df["target_next"].astype(float).values

cutoff = weekly["week_end"].max() - pd.Timedelta(weeks=4)
mask_val = train_df["week_end"] > cutoff
X_tr, y_tr = X[~mask_val], y[~mask_val]
X_val, y_val = X[mask_val],  y[mask_val]

# Downcast to save RAM
for df_ in (X_tr, X_val):
    for c in df_.select_dtypes("float64").columns:
        df_[c] = pd.to_numeric(df_[c], downcast="float")
    for c in df_.select_dtypes("int64").columns:
        df_[c] = pd.to_numeric(df_[c], downcast="integer")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_[c] = pd.to_numeric(df_[c], downcast="float")
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_[c] = pd.to_numeric(df_[c], downcast="float")
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_[c] = pd.to_numeric(df_[c], downcast="float")
A value is trying to be set on a copy of a slice from a Da

# 3. Treinando o Modelo

early stopping determinado por wmape:



In [16]:
cat_used = [c for c in cat_cols if c in X_tr.columns]

gpu_params = dict(
    objective="poisson" if y_tr.sum() > 0 else "regression",
    metric="rmse",
    n_estimators=2000, learning_rate=0.05,
    num_leaves=127, max_depth=-1, max_bin=255,
    subsample=0.8, colsample_bytree=0.8,
    min_data_in_leaf=100, min_child_weight=1e-2,
    feature_pre_filter=False,
    max_cat_threshold=64, max_cat_to_onehot=4, cat_smooth=10, min_data_per_group=200,
    random_state=42, n_jobs=-1,
    device_type="gpu"   # try GPU, will fallback
)
cpu_params = {k:v for k,v in gpu_params.items() if k!="device_type"}
cpu_params.update(dict(force_col_wise=True))

def lgb_wmape(y_true, y_pred):
    denom = abs(y_true).sum() + 1e-9
    return ("wmape", float(abs(y_true - y_pred).sum()/denom), False)

callbacks = [lgb.early_stopping(100), lgb.log_evaluation(50)]

try:
    lgbm = LGBMRegressor(**gpu_params)
    lgbm.fit(X_tr, y_tr, categorical_feature=cat_used,
             eval_set=[(X_val, y_val)], eval_metric=[lgb_wmape, "rmse"],
             callbacks=callbacks)
    used_gpu = True
except Exception as e:
    print("GPU not available or failed, switching to CPU col-wise. Reason:", str(e)[:160], "...")
    lgbm = LGBMRegressor(**cpu_params)
    lgbm.fit(X_tr, y_tr, categorical_feature=cat_used,
             eval_set=[(X_val, y_val)], eval_metric=[lgb_wmape, "rmse"],
             callbacks=callbacks)
    used_gpu = False

[LightGBM] [Info] This is the GPU trainer!!
[LightGBM] [Info] Total Bins 14557
[LightGBM] [Info] Number of data points in the train set: 4384483, number of used features: 30
GPU not available or failed, switching to CPU col-wise. Reason: No OpenCL device found ...
[LightGBM] [Info] Total Bins 14557
[LightGBM] [Info] Number of data points in the train set: 4384483, number of used features: 30
[LightGBM] [Info] Start training from score 1.380749
Training until validation scores don't improve for 100 rounds
[50]	valid_0's rmse: 9.27843	valid_0's wmape: 0.480578
[100]	valid_0's rmse: 9.14003	valid_0's wmape: 0.433045
[150]	valid_0's rmse: 9.23336	valid_0's wmape: 0.424922
Early stopping, best iteration is:
[83]	valid_0's rmse: 9.10371	valid_0's wmape: 0.439784


In [17]:
def wmape(y_true, y_pred):
    denom = np.abs(y_true).sum() + 1e-9
    return np.abs(y_true - y_pred).sum() / denom

yhat_val = lgbm.predict(X_val)
val_wmape = wmape(y_val, yhat_val)
# least-squares shrink
alpha = float((y_val @ yhat_val) / (yhat_val @ yhat_val + 1e-9))
alpha = float(np.clip(alpha, 0.6, 1.1))
print(f"val WMAPE={val_wmape:.4f}, alpha={alpha:.3f}")


val WMAPE=0.4398, alpha=1.013


# 4. Salvando o Modelo e Resultados

In [18]:
import joblib

filename = 'lgbm_model_opt.pkl'
joblib.dump(lgbm, filename)

['lgbm_model_opt.pkl']

In [19]:
jan_we = pd.to_datetime(["2023-01-07","2023-01-14","2023-01-21","2023-01-28","2023-02-04"])
history = weekly.copy()

def next_inputs(hist, week_end):
    # recompute lags from current history (observed + prior preds)
    hist = hist.sort_values(["pdv","produto","week_end"]).copy()
    hist = add_feats(hist)
    last = (hist.sort_values("week_end").groupby(["pdv","produto"], as_index=False).tail(1)).copy()
    # set future week
    last["week_end"] = pd.to_datetime(week_end)
    last["week_ord"] = last["week_end"].view("int64") // 10**9
    # fill numeric NaNs
    for c in ["lag1","lag2","lag3","lag4","rmean4","rmean12",
              "price_gross","price_net","margin","disc","taxes",
              "price_gross_lag1","price_net_lag1","margin_lag1","disc_lag1","taxes_lag1",
              "store_rmean4","prod_rmean4","zipcode","week_ord"]:
        if c in last.columns:
            last[c] = pd.to_numeric(last[c], errors="coerce").fillna(0)
    # categoricals
    for c in cat_cols:
        if c in last.columns:
            last[c] = last[c].astype("category")
    return last

In [20]:
fcsts = []
for we in jan_we:
    base = next_inputs(history, we)
    X_next = base[[c for c in feature_cols if c in base.columns]].copy()
    yhat = lgbm.predict(X_next)
    yhat = np.maximum(yhat, 0.0)
    add = base[["pdv","produto","week_end"]].copy()
    add["quantidade"] = yhat
    # carry attributes forward (no holiday nonsense needed)
    for c in ["price_gross","price_net","margin","disc","taxes",
              "premise","categoria_pdv","zipcode","categoria","tipos","label","subcategoria","marca","fabricante"]:
        if c in base.columns:
            add[c] = base[c].values
    history = pd.concat([history, add], ignore_index=True)
    fcsts.append(add)

jan_fcst = pd.concat(fcsts, ignore_index=True)

  last["week_ord"] = last["week_end"].view("int64") // 10**9




  last["week_ord"] = last["week_end"].view("int64") // 10**9




  last["week_ord"] = last["week_end"].view("int64") // 10**9




  last["week_ord"] = last["week_end"].view("int64") // 10**9




  last["week_ord"] = last["week_end"].view("int64") // 10**9




In [21]:
week_map = {
    pd.Timestamp("2023-01-07"): 1,
    pd.Timestamp("2023-01-14"): 2,
    pd.Timestamp("2023-01-21"): 3,
    pd.Timestamp("2023-01-28"): 4,
    pd.Timestamp("2023-02-04"): 5,
}
jan_fcst["semana"] = jan_fcst["week_end"].map(week_map)

submission = (
    jan_fcst.assign(quantidade=lambda d: np.rint(d["quantidade"]).astype(int))
            .rename(columns={"pdv":"pdv", "produto":"produto"})
            [["semana","pdv","produto","quantidade"]]
            .sort_values(["semana","pdv","produto"])
            .reset_index(drop=True)
)

# keep only pairs seen in 2022
seen_pairs = set(map(tuple, weekly[["pdv","produto"]].drop_duplicates().values))
submission = submission[submission[["pdv","produto"]].apply(tuple, axis=1).isin(seen_pairs)]

submission.to_parquet("submission.parquet", index=False)
submission.to_csv("submission.csv", sep=";", index=False, encoding="utf-8")

In [22]:
MAX_ROWS = 1_500_000
WEEKS = 5
MAX_PAIRS = MAX_ROWS // WEEKS

In [23]:
last_week = pd.to_datetime(weekly["week_end"].max())
recent_cut = last_week - pd.Timedelta(weeks=12)

weekly_recent = weekly[weekly["week_end"] >= recent_cut]

pair_total = (
    weekly.groupby(["pdv","produto"], as_index=False)["quantidade"]
          .sum().rename(columns={"quantidade":"total_2022"})
)
pair_recent = (
    weekly_recent.groupby(["pdv","produto"], as_index=False)["quantidade"]
                 .sum().rename(columns={"quantidade":"recent_12w"})
)
pair_last_sale = (
    weekly.groupby(["pdv","produto"], as_index=False)["week_end"]
          .max().rename(columns={"week_end":"last_sale"})
)

pair_stats = (
    pair_total.merge(pair_recent, on=["pdv","produto"], how="left")
              .merge(pair_last_sale, on=["pdv","produto"], how="left")
              .fillna({"recent_12w":0})
)

In [24]:
# prioritize recent_12w, then total_2022, then recency of last sale
pair_stats = pair_stats.sort_values(
    ["recent_12w", "total_2022", "last_sale"],
    ascending=[False, False, False]
).reset_index(drop=True)

n_pairs = min(len(pair_stats), MAX_PAIRS)
top_pairs = pair_stats.iloc[:n_pairs, :][["pdv","produto"]]
top_set = set(map(tuple, top_pairs.values))

submission_capped = (
    submission[submission[["pdv","produto"]].apply(tuple, axis=1).isin(top_set)]
      .copy()
)

In [25]:
submission_capped = submission_capped.drop_duplicates(["semana","pdv","produto"], keep="last")

In [26]:
for col in ["semana","pdv","produto","quantidade"]:
    submission_capped[col] = pd.to_numeric(submission_capped[col], errors="coerce")
if submission_capped[["semana","pdv","produto","quantidade"]].isnull().any().any():
    bad = submission_capped[submission_capped[["semana","pdv","produto","quantidade"]].isnull().any(axis=1)]
    print("WARNING: Some IDs/quantities could not be parsed to int. Showing first few bad rows:")
    print(bad.head())

In [27]:
submission_capped = submission_capped.astype({"semana":"int64","pdv":"int64","produto":"int64","quantidade":"int64"})

In [28]:
rows_per_pair = submission_capped.groupby(["pdv","produto"]).size().value_counts()
rows_per_pair.head()

Unnamed: 0,count
5,300000


In [29]:
submission_capped.to_parquet("submission_opt_wmape.parquet", index=False)
submission_capped.to_csv("submission_opt_wmape.csv", sep=";", index=False, encoding="utf-8")

In [32]:
submission_capped.describe()

Unnamed: 0,semana,pdv,produto,quantidade
count,1500000.0,1500000.0,1500000.0,1500000.0
mean,3.0,4.665103e+18,4.366982e+18,7.751807
std,1.414214,2.681667e+18,2.71834e+18,12.79625
min,1.0,1833564000000000.0,7798075000000000.0,1.0
25%,2.0,2.357589e+18,1.835194e+18,2.0
50%,3.0,4.689504e+18,4.101834e+18,4.0
75%,4.0,7.062864e+18,6.760021e+18,9.0
max,5.0,9.22299e+18,9.221123e+18,1279.0
