# Pipeline (Regressão Linear): EDA → chaves → limpeza (NaN/negativos) → CV (WMAPE) → forecast


In [None]:

from pathlib import Path
from sklearn.model_selection import TimeSeriesSplit, GridSearchCV
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_absolute_error, make_scorer
import pandas as pd, numpy as np
import numpy as np



## 1) Pasta dos dados 


In [4]:
CANDIDATES = [
    Path(r"C:\Users\gabri\Downloads\big data inner join\dados"),
    Path.cwd() / "dados",
    Path.cwd(),
]
FOLDER = next((p for p in CANDIDATES if p.exists()), None)
if FOLDER is None:
    raise FileNotFoundError("Edite CANDIDATES para apontar para sua pasta de dados.")
print("Pasta selecionada:", FOLDER.resolve())

parquets = list(FOLDER.rglob("*.parquet"))
csvs = list(FOLDER.rglob("*.csv"))
print(f"Parquet encontrados: {len(parquets)} | CSV encontrados: {len(csvs)}")
for i,p in enumerate(parquets[:10]): print(f"[pq {i}] {p}")

Pasta selecionada: C:\Users\gabri\Downloads\big data inner join\dados
Parquet encontrados: 3 | CSV encontrados: 2
[pq 0] C:\Users\gabri\Downloads\big data inner join\dados\part-00000-tid-2779033056155408584-f6316110-4c9a-4061-ae48-69b77c7c8c36-4-1-c000.snappy.parquet
[pq 1] C:\Users\gabri\Downloads\big data inner join\dados\part-00000-tid-5196563791502273604-c90d3a24-52f2-4955-b4ec-fb143aae74d8-4-1-c000.snappy.parquet
[pq 2] C:\Users\gabri\Downloads\big data inner join\dados\part-00000-tid-7173294866425216458-eae53fbf-d19e-4130-ba74-78f96b9675f1-4-1-c000.snappy.parquet


## 2) Carregamento e EDA leve

In [5]:
import pandas as pd, numpy as np
parts = [p for p in FOLDER.rglob("*") if p.suffix.lower() in (".parquet",".pq",".csv")]
assert parts, f"Nenhum .parquet/.csv encontrado em {FOLDER}"

dfs = []
for p in parts:
    if p.suffix.lower()==".csv": dfs.append(pd.read_csv(p))
    else: dfs.append(pd.read_parquet(p))
df_all = pd.concat(dfs, ignore_index=True)
print("Shape concatenado:", df_all.shape)
print("Colunas:", list(df_all.columns))


non_null = df_all.notna().mean().sort_values(ascending=False).to_frame('non_null_ratio')
card     = df_all.nunique(dropna=True).sort_values(ascending=False).to_frame('n_unique')
eda = non_null.join(card, how='outer')
display(eda.head(30))

Shape concatenado: (6604387, 25)
Colunas: ['id', 'valor', 'pdv', 'premise', 'categoria_pdv', 'zipcode', 'internal_store_id', 'internal_product_id', 'distributor_id', 'transaction_date', 'reference_date', 'quantity', 'gross_value', 'net_value', 'gross_profit', 'discount', 'taxes', 'produto', 'categoria', 'descricao', 'tipos', 'label', 'subcategoria', 'marca', 'fabricante']


Unnamed: 0,non_null_ratio,n_unique
categoria,0.001074,7
categoria_pdv,0.002183,54
descricao,0.001074,7092
discount,0.993385,121528
distributor_id,0.993385,8
fabricante,0.001074,343
gross_profit,0.993385,363451
gross_value,0.993385,173883
id,0.003358,15086
internal_product_id,0.993385,7092


## 3) Chaves (loja / produto) — robustas (com fallbacks)

In [None]:
import pandas as pd, numpy as np


FORCE_LOJA_COL = None          
FORCE_PROD_COL = None           

def pick_col(df, preferred, exclude=set(), min_nonnull=0.05, max_unique_ratio=0.98):
    cols = [c for c in df.columns if c not in exclude]
    pool = [c for c in preferred if c in df.columns] or cols
    best, best_score = None, (-1, -1)
    n = len(df)
    for c in pool:
        nonnull = df[c].notna().mean()
        nunique = df[c].nunique(dropna=True)
        if nonnull < min_nonnull: 
            continue
        if n>0 and (nunique / n) > max_unique_ratio:  
            continue
        score = (nonnull, nunique)
        if score > best_score:
            best_score, best = score, c
    return best

loja_priority = ["pdv","internal_store_id","loja","store"]
prod_priority = ["internal_product_id","produto","sku","product_id","ean","upc","item","cod_produto","codigo_produto"]

loja_col = FORCE_LOJA_COL if (FORCE_LOJA_COL in df_all.columns) else pick_col(df_all, loja_priority)
if loja_col is None:
   
    candidates = [c for c in ["zipcode","premise","categoria_pdv","distributor_id"] if c in df_all.columns]
    if not candidates:
        raise ValueError("Não consegui inferir LOJA e não há colunas para criar assinatura. Defina FORCE_LOJA_COL.")
    print("Criando loja_fingerprint a partir de:", candidates)
    tmp = df_all[candidates].astype(str).fillna("NA").agg("|".join, axis=1)
    df_all["__loja_fingerprint__"] = tmp
    loja_col = "__loja_fingerprint__"

prod_col = FORCE_PROD_COL if (FORCE_PROD_COL in df_all.columns) else pick_col(df_all, prod_priority, exclude={loja_col})
if (prod_col is None) or df_all[prod_col].isna().all():
    print("⚠️ Produto ausente/vazio ➜ modo 1D: id_produto=0 para todos.")
    prod_col = "__produto_fallback__"
    df_all[prod_col] = "UNICO"

print(f"Escolhido -> loja_col: {loja_col} | prod_col: {prod_col}")


before = len(df_all)
df_all = df_all[df_all[loja_col].notna() & df_all[prod_col].notna()].copy()
print("Removidos por falta de loja/prod:", before - len(df_all), "| shape:", df_all.shape)


def make_id(series):
    vals = series.astype(str).fillna("__NA__")
    codes, uniq = pd.factorize(vals, sort=True)
    return codes.astype("int64"), pd.DataFrame({"id": range(len(uniq)), "valor": uniq})

df_final = df_all.copy()
df_final["id_loja"], dim_lojas = make_id(df_all[loja_col])
df_final["id_produto"], dim_produtos = make_id(df_all[prod_col])
display(df_final[[loja_col,"id_loja",prod_col,"id_produto"]].head())


dim_lojas.to_csv(FOLDER / "dim_lojas_derivada_linear.csv", index=False)
dim_produtos.to_csv(FOLDER / "dim_produtos_derivada_linear.csv", index=False)
print("Dimensões derivadas salvas.")

Escolhido -> loja_col: internal_store_id | prod_col: internal_product_id
Removidos por falta de loja/prod: 43689 | shape: (6560698, 25)


Unnamed: 0,internal_store_id,id_loja,internal_product_id,id_produto
36597,7384367747233276219,11548,328903483604537190,1936
36598,3536908514005606262,4619,5418855670645487653,3710
36599,3138231730993449825,3907,1087005562675741887,73
36600,3681167389484217654,4874,1401422983880045188,343
36601,7762413312337359369,12281,6614994347738381720,4801


Dimensões derivadas salvas.


## 4) Data e Alvo: limpeza + contagem/remoção de negativos (Janeiro)

In [None]:
import pandas as pd, numpy as np

DATE_COL = next((c for c in ["reference_date","transaction_date","data","date"] if c in df_final.columns), None)
TARGET_COL = next((c for c in ["quantity","quantidade","sales","vendas","qtd","valor"] if c in df_final.columns), None)

print("Usando -> DATE_COL:", DATE_COL, "| TARGET_COL:", TARGET_COL)
assert DATE_COL and TARGET_COL, "Defina DATE_COL/TARGET_COL manualmente aqui."

df_final[DATE_COL] = pd.to_datetime(df_final[DATE_COL], errors="coerce", dayfirst=True)
df_final[TARGET_COL] = pd.to_numeric(df_final[TARGET_COL], errors="coerce")

aux = df_final[[DATE_COL, TARGET_COL]].copy()
aux["mes"] = aux[DATE_COL].dt.month
neg_total = (aux[TARGET_COL] < 0).sum()
neg_jan   = ((aux["mes"]==1) & (aux[TARGET_COL] < 0)).sum()
print(f"Negativos totais: {int(neg_total)} | em janeiro: {int(neg_jan)}")

before = len(df_final)
df_final = df_final[df_final[DATE_COL].notna() & df_final[TARGET_COL].notna()].copy()
print("Removidos por NaN (data/alvo):", before - len(df_final))
before2 = len(df_final)
df_final = df_final[df_final[TARGET_COL] >= 0].copy()
print("Removidos por alvo negativo:", before2 - len(df_final))

Usando -> DATE_COL: reference_date | TARGET_COL: quantity
Negativos totais: 70114 | em janeiro: 4703
Removidos por NaN (data/alvo): 0
Removidos por alvo negativo: 70114


## 5) Agregação semanal + outliers de janeiro + features

In [None]:
import pandas as pd, numpy as np

df = df_final.copy()
iso = df[DATE_COL].dt.isocalendar()
df["_year"] = iso.year.astype(int); df["_week"] = iso.week.astype(int)

sem = df.groupby(["id_loja","id_produto","_year","_week"], as_index=False)[TARGET_COL].sum().rename(columns={TARGET_COL:"y"})

def iso_week_start(y,w):
    return pd.to_datetime(f"{y}-W{int(w):02}-1", format="%G-W%V-%u")
sem["week_start"] = [iso_week_start(y,w) for y,w in zip(sem["_year"], sem["_week"])]
sem["mes"] = sem["week_start"].dt.month

def flag_out_jan(g):
    gj = g[g["mes"]==1]
    if len(gj) < 5:
        g["outlier_jan"] = False; return g
    q1,q3 = gj["y"].quantile([0.25,0.75])
    iqr = max(q3-q1,0); low, high = q1-1.5*iqr, q3+1.5*iqr
    g["outlier_jan"] = (g["mes"]==1) & ((g["y"]<low)|(g["y"]>high))
    return g

sem = sem.groupby(["id_loja","id_produto"], group_keys=False).apply(flag_out_jan)


CLIP_OUTLIERS = True
if CLIP_OUTLIERS:
    def clip_jan(g):
        gj = g[g["mes"]==1]
        if len(gj) >= 5:
            q1,q3 = gj["y"].quantile([0.25,0.75])
            iqr = max(q3-q1,0); low, high = q1-1.5*iqr, q3+1.5*iqr
            g.loc[g["outlier_jan"], "y"] = np.clip(g.loc[g["outlier_jan"], "y"], low, high)
        return g
    sem = sem.groupby(["id_loja","id_produto"], group_keys=False).apply(clip_jan)

def add_feats(g):
    g = g.sort_values(["_year","_week"]).copy()
    for L in (1,2,3): g[f"lag{L}"] = g["y"].shift(L)
    for k in (3,4): g[f"ma{k}"]  = g["y"].rolling(k, min_periods=1).mean().shift(1)
    g["weekofyear"] = g["_week"].astype(int); g["year"] = g["_year"].astype(int)
    return g

sem = sem.groupby(["id_loja","id_produto"], group_keys=False).apply(add_feats)
sem_model = sem.dropna(subset=["lag1","lag2","ma3"]).copy()
print("Amostras (pré-limpeza final):", len(sem_model))

  sem = sem.groupby(["id_loja","id_produto"], group_keys=False).apply(flag_out_jan)
  sem = sem.groupby(["id_loja","id_produto"], group_keys=False).apply(clip_jan)
  sem = sem.groupby(["id_loja","id_produto"], group_keys=False).apply(add_feats)


Amostras (pré-limpeza final): 2259836


## 6) Cross-Validation (TimeSeriesSplit) com **Regressão Linear (Ridge)** + WMAPE

In [None]:
def wmape(y_true, y_pred, eps=1e-8):
    y_true = np.asarray(y_true, dtype=float)
    y_pred = np.asarray(y_pred, dtype=float)
    mask = np.isfinite(y_true) & np.isfinite(y_pred)
    y_true = y_true[mask]; y_pred = y_pred[mask]
    denom = np.abs(y_true).sum()
    return np.nan if denom < eps or y_true.size == 0 else np.abs(y_true - y_pred).sum() / (denom + eps)

feature_cols = [c for c in ["lag1","lag2","lag3","ma3","ma4","weekofyear","year"] if c in sem_model.columns]
X = sem_model[feature_cols].values; y = sem_model["y"].values
mask = (~np.isnan(X).any(axis=1)) & (~np.isnan(y))
X, y = X[mask], y[mask]
print("Amostras após limpar NaN em features:", X.shape[0])
if X.shape[0] < 50:
    raise ValueError(f"Amostras insuficientes para CV após limpeza: {X.shape[0]}.")

tscv = TimeSeriesSplit(n_splits=4)
wmape_scorer = make_scorer(wmape, greater_is_better=False)

grid = {"alpha": [1e-4, 1e-3, 1e-2, 1e-1, 1, 3, 10, 30]}
search = GridSearchCV(Ridge(fit_intercept=True), grid, scoring=wmape_scorer, cv=tscv, n_jobs=-1, verbose=1)
search.fit(X, y)
best_model = search.best_estimator_
print("Melhor WMAPE (CV):", -float(search.best_score_))
print("Melhores parâmetros:", search.best_params_)


maes, wmapes = [], []
for fold, (tr, va) in enumerate(tscv.split(X), 1):
    m = Ridge(alpha=search.best_params_["alpha"], fit_intercept=True)
    m.fit(X[tr], y[tr]); pred = m.predict(X[va])
    from sklearn.metrics import mean_absolute_error
    maes.append(mean_absolute_error(y[va], pred))
    wmapes.append(wmape(y[va], pred))
    print(f"Fold {fold}: MAE={maes[-1]:.4f} | WMAPE={wmapes[-1]:.4f}")
print("\nMédia CV -> MAE={:.4f} | WMAPE={:.4f}".format(np.mean(maes), np.mean(wmapes)))

Amostras após limpar NaN em features: 1798135
Fitting 4 folds for each of 8 candidates, totalling 32 fits
Melhor WMAPE (CV): 0.40221946958415905
Melhores parâmetros: {'alpha': 0.0001}
Fold 1: MAE=3.0345 | WMAPE=0.4038
Fold 2: MAE=2.9571 | WMAPE=0.4048
Fold 3: MAE=2.9478 | WMAPE=0.4021
Fold 4: MAE=2.9951 | WMAPE=0.3981

Média CV -> MAE=2.9836 | WMAPE=0.4022
