In [7]:
!uv add pandas numpy lightgbm duckdb ipywidgets --active
!uv sync --active

[2mResolved [1m77 packages[0m [2min 2ms[0m[0m
[2mAudited [1m71 packages[0m [2min 0.09ms[0m[0m
[2mResolved [1m77 packages[0m [2min 1ms[0m[0m
[2mAudited [1m71 packages[0m [2min 0.10ms[0m[0m


# Imports e Configuração Inicial  
Primeiro, vamos importar todas as bibliotecas que precisaremos. Estamos usando uma stack bem robusta: pandas para manipulação de dados, DuckDB para consultas SQL rápidas, LightGBM para modelagem, e algumas outras ferramentas úteis.

In [8]:
import pandas as pd
import numpy as np
import duckdb
import lightgbm as lgb
import warnings
import time
import os
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import mean_absolute_error
warnings.filterwarnings('ignore')

# Classe Principal - PrevisaoVendasHackathon
Esta é a classe que faz toda a mágica acontecer. Ela encapsula todo o pipeline de machine learning, desde o carregamento dos dados até a geração das previsões finais.
A classe tem vários atributos importantes: conexão com DuckDB para processar dados grandes de forma eficiente, o modelo LightGBM treinado, lista de features selecionadas, encoders para variáveis categóricas, e resultados de validação.


In [9]:
class PrevisaoVendasHackathon:
    """
    Previsão semanal de vendas por PDV/SKU para submissão do Hackathon.
    Objetivo: Prever quantidade para semanas 1-5 de Janeiro 2023 usando dados históricos de 2022.
    Formato de saída: semana;pdv;produto;quantidade (CSV com codificação UTF-8)
    """
    
    def __init__(self, verbose=True):
        self.verbose = verbose
        self.conn = None
        self.model = None
        self.feature_cols = []
        self.label_encoders = {}
        self.validation_results = {}
        self.all_combinations = None
        
    def inicializar_duckdb(self, memory_limit='12GB', threads=10):
        if self.verbose:
            print("Inicializando DuckDB para previsão do hackathon...")
        
        self.conn = duckdb.connect(':memory:')
        self.conn.execute(f"SET memory_limit='{memory_limit}'")
        self.conn.execute(f"SET threads={threads}")

        self.conn.execute("SET preserve_insertion_order=false")
        self.conn.execute("SET temp_directory='/tmp'")  # Usa disco para overflow
    
        
        if self.verbose:
            print(f"   Memória: {memory_limit}, Threads: {threads}")
    
    def carregar_e_juntar_dados(self, arquivo_loja, arquivo_transacao, arquivo_produto):
        if self.verbose:
            print("Carregando e juntando conjuntos de dados...")
        
        # Carrega tabelas individuais - USANDO BIGINT para IDs grandes
        self.conn.execute(f"""
            CREATE VIEW stores AS 
            SELECT CAST(pdv AS BIGINT) as pdv, 
                   premise, 
                   categoria_pdv, 
                   CAST(zipcode AS BIGINT) as zipcode 
            FROM '{arquivo_loja}'
        """)
        
        self.conn.execute(f"""
            CREATE VIEW transactions AS 
            SELECT CAST(internal_store_id AS BIGINT) as internal_store_id, 
                   CAST(internal_product_id AS BIGINT) as internal_product_id, 
                   transaction_date,
                   CAST(quantity AS INTEGER) as quantity,
                   net_value,
                   gross_value
            FROM '{arquivo_transacao}'
        """)
        
        self.conn.execute(f"""
            CREATE VIEW products AS 
            SELECT CAST(produto AS BIGINT) as produto, 
                   categoria, 
                   tipos, 
                   label, 
                   subcategoria 
            FROM '{arquivo_produto}'
        """)
        
        # Cria conjunto de dados principal com joins - focando nos dados de 2022
        master_query = """
        CREATE VIEW master_data AS
        SELECT 
            t.internal_store_id as pdv,
            t.internal_product_id as produto,
            t.transaction_date,
            t.quantity,
            COALESCE(t.net_value, 0) as net_value,
            COALESCE(t.gross_value, 0) as gross_value,
            -- Atributos da loja
            COALESCE(s.premise, 'Unknown') as premise,
            COALESCE(s.categoria_pdv, 'Unknown') as categoria_pdv,
            COALESCE(s.zipcode, 0) as zipcode,
            -- Atributos do produto
            COALESCE(p.categoria, 'Unknown') as product_categoria,
            COALESCE(p.tipos, 'Unknown') as product_type,
            COALESCE(p.label, 'Unknown') as product_label,
            COALESCE(p.subcategoria, 'Unknown') as subcategoria,
            -- Características de tempo
            EXTRACT('year' FROM t.transaction_date) as year,
            EXTRACT('week' FROM t.transaction_date) as week_number,
            EXTRACT('month' FROM t.transaction_date) as month,
            EXTRACT('quarter' FROM t.transaction_date) as quarter,
            EXTRACT('dayofweek' FROM t.transaction_date) as day_of_week
        FROM transactions t
        LEFT JOIN stores s ON t.internal_store_id = s.pdv
        LEFT JOIN products p ON t.internal_product_id = p.produto
        WHERE t.quantity > 0 
          AND t.quantity IS NOT NULL
          AND t.transaction_date IS NOT NULL
          AND EXTRACT('year' FROM t.transaction_date) = 2022
        """
        
        self.conn.execute(master_query)
        
        # Obtém todas as combinações únicas PDV-Produto para previsão
        # Pega apenas combinações de PDVs que têm quantidade significativa de SKUs
        # Obtém apenas combinações PDV-Produto que realmente existem nos dados
        # com filtro para ficar dentro do limite do hackathon
        self.conn.execute("""
            CREATE VIEW valid_combinations AS
                SELECT DISTINCT
                    s.pdv,
                    t.internal_product_id as produto
                FROM (
                    SELECT DISTINCT 
                        internal_store_id,
                        internal_product_id
                    FROM transactions
                    WHERE quantity > 0 
                    AND quantity IS NOT NULL
                    AND transaction_date IS NOT NULL
                    AND EXTRACT('year' FROM transaction_date) = 2022
                    GROUP BY internal_store_id, internal_product_id
                    HAVING SUM(quantity) >= 10
                    AND COUNT(*) >= 3
                    ORDER BY SUM(quantity) DESC
                    LIMIT 300000
                ) t
                INNER JOIN stores s ON t.internal_store_id = s.pdv
            """)

        # Verifica quantas combinações válidas temos
        valid_combos_stats = self.conn.execute("""
            SELECT 
                COUNT(*) as total_combinations,
                COUNT(DISTINCT pdv) as unique_pdvs,
                COUNT(DISTINCT produto) as unique_produtos
            FROM valid_combinations
        """).fetchone()

        if self.verbose:
            print(f"   Combinações válidas PDV-Produto: {valid_combos_stats[0]:,}")
        
        # Obtém estatísticas de qualidade dos dados
        stats = self.conn.execute("""
            SELECT 
                COUNT(*) as total_records,
                COUNT(DISTINCT pdv) as unique_pdvs,
                COUNT(DISTINCT produto) as unique_produtos,
                COUNT(DISTINCT CAST(pdv AS VARCHAR) || '_' || CAST(produto AS VARCHAR)) as unique_combinations,
                MIN(transaction_date) as min_date,
                MAX(transaction_date) as max_date,
                SUM(quantity) as total_quantity,
                AVG(quantity) as avg_quantity
            FROM master_data
        """).fetchone()
        
        if self.verbose:
            print(f"   Registros: {stats[0]:,}")
            print(f"   PDVs: {stats[1]:,}")
            print(f"   Produtos: {stats[2]:,}")
            print(f"   Combinações PDV-Produto: {stats[3]:,}")
            print(f"   Período: {stats[4]} a {stats[5]}")
    
    def criar_agregacao_semanal(self):
        if self.verbose:
            print("Criando agregação semanal...")
        
        # Agregação semanal simplificada
        weekly_agg_query = """
        CREATE VIEW weekly_data AS
        SELECT 
            m.pdv,
            m.produto,
            m.year,
            m.week_number,
            -- Variável objetivo
            SUM(m.quantity) as quantity,
            -- Métricas de transação
            COUNT(*) as transaction_count,
            COUNT(DISTINCT m.transaction_date) as unique_days,
            SUM(m.net_value) as net_value,
            CASE WHEN SUM(m.quantity) > 0 
                 THEN SUM(m.net_value) / SUM(m.quantity) 
                 ELSE 0 END as avg_unit_price,
            -- Características de contexto - usando FIRST() em vez de agregações complexas
            FIRST(m.premise) as premise,
            FIRST(m.categoria_pdv) as categoria_pdv,
            FIRST(m.product_categoria) as product_categoria,
            FIRST(m.product_type) as product_type,
            FIRST(m.product_label) as product_label,
            FIRST(m.subcategoria) as subcategoria,
            FIRST(m.zipcode) as zipcode,
            -- Características de tempo
            m.month,
            m.quarter
        FROM master_data m
        GROUP BY m.pdv, m.produto, m.year, m.week_number, m.month, m.quarter
        ORDER BY pdv, produto, year, week_number
        """
        
        self.conn.execute(weekly_agg_query)
        
        # Estatísticas
        agg_stats = self.conn.execute("""
        SELECT 
            COUNT(*) as weekly_records,
            COUNT(DISTINCT pdv) as unique_pdvs,
            COUNT(DISTINCT produto) as unique_produtos,
            MIN(week_number) as min_week,
            MAX(week_number) as max_week,
            SUM(quantity) as total_quantity
        FROM weekly_data
        """).fetchone()
        
        if self.verbose:
            print(f"   Registros semanais: {agg_stats[0]:,}")
            print(f"   Intervalo de semanas: {agg_stats[3]}-{agg_stats[4]}")
            print(f"   Quantidade total: {agg_stats[5]:,}")
    
    def criar_caracteristicas_ml(self):
        if self.verbose:
            print("Criando características de ML...")
        
        features_query = """
        CREATE VIEW ml_features AS
        SELECT 
            *,
            -- Características de sazonalidade (codificação cíclica)
            SIN(2 * PI() * week_number / 52.0) as week_sin,
            COS(2 * PI() * week_number / 52.0) as week_cos,
            SIN(2 * PI() * month / 12.0) as month_sin,
            COS(2 * PI() * month / 12.0) as month_cos,
            
            -- Características de atraso por PDV-Produto
            LAG(quantity, 1) OVER pdv_produto_window as lag_1,
            LAG(quantity, 2) OVER pdv_produto_window as lag_2,
            LAG(quantity, 4) OVER pdv_produto_window as lag_4,
            
            -- Estatísticas móveis (simplificadas)
            AVG(quantity) OVER (PARTITION BY pdv, produto ORDER BY year, week_number 
                               ROWS BETWEEN 3 PRECEDING AND 1 PRECEDING) as rolling_mean_4,
            AVG(quantity) OVER (PARTITION BY pdv, produto ORDER BY year, week_number 
                               ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING) as rolling_mean_8,
            
            -- Características transversais (simplificadas)
            AVG(quantity) OVER produto_window as produto_avg_quantity,
            AVG(quantity) OVER pdv_window as pdv_avg_quantity,
            
            -- Características de preço e momentum
            COALESCE(avg_unit_price, 0) as unit_price,
            LAG(COALESCE(avg_unit_price, 0), 1) OVER pdv_produto_window as price_lag_1,
            COALESCE(LAG(quantity, 1) OVER pdv_produto_window, 0) - 
            COALESCE(LAG(quantity, 2) OVER pdv_produto_window, 0) as momentum_1,
            
            -- Intensidade de transação
            COALESCE(transaction_count, 1) * COALESCE(avg_unit_price, 0) as transaction_value,
            COALESCE(unique_days, 1) / 7.0 as days_active_ratio
            
        FROM weekly_data
        WINDOW 
            pdv_produto_window AS (PARTITION BY pdv, produto ORDER BY year, week_number),
            produto_window AS (PARTITION BY produto),
            pdv_window AS (PARTITION BY pdv)
        """
        
        self.conn.execute(features_query)
        
        if self.verbose:
            print(f"   Engenharia de características completa")
    
    def preparar_dados_treinamento(self):
        if self.verbose:
            print("Preparando dados de treinamento...")
        
        ml_data = self.conn.execute("SELECT * FROM ml_features ORDER BY pdv, produto, year, week_number").fetchdf()
        
        if self.verbose:
            print(f"   Extraídos {len(ml_data):,} registros")
        
        # Codificação de rótulos para variáveis categóricas
        categorical_cols = ['premise', 'categoria_pdv', 'product_categoria', 
                           'product_type', 'product_label', 'subcategoria']
        
        for col in categorical_cols:
            if col in ml_data.columns:
                le = LabelEncoder()
                valid_mask = ml_data[col].notna()
                if valid_mask.sum() > 0:
                    ml_data.loc[valid_mask, f'{col}_encoded'] = le.fit_transform(
                        ml_data.loc[valid_mask, col].astype(str)
                    )
                    ml_data[f'{col}_encoded'] = ml_data[f'{col}_encoded'].fillna(-1)
                    self.label_encoders[col] = le
        
        # Seleciona características para modelagem (exclui alvo e identificadores)
        exclude_cols = ['quantity', 'pdv', 'produto', 'year', 'week_number', 'net_value',
                       'premise', 'categoria_pdv', 'product_categoria', 'product_type', 
                       'product_label', 'subcategoria', 'month', 'quarter']
        
        self.feature_cols = []
        for col in ml_data.columns:
            if (col not in exclude_cols and 
                ml_data[col].dtype in ['int64', 'float64', 'int32', 'float32'] and
                not ml_data[col].isna().all()):
                self.feature_cols.append(col)
        
        # Limpa conjunto de dados - trata valores NaN
        for col in self.feature_cols:
            ml_data[col] = ml_data[col].fillna(0)
        
        clean_data = ml_data.dropna(subset=['quantity']).copy()
        
        if self.verbose:
            print(f"   Características selecionadas: {len(self.feature_cols)}")
            print(f"   Registros limpos: {len(clean_data):,}")
        
        return clean_data
    
    def divisao_validacao_temporal(self, data):
        if self.verbose:
            print("Criando divisão de validação temporal...")
        
        # Usa as últimas 4 semanas de 2022 para validação (semanas 49-52)
        train_data = data[data['week_number'] <= 48].copy()
        val_data = data[data['week_number'] >= 49].copy()
        
        if self.verbose:
            print(f"   Treinamento: {len(train_data):,} registros (semanas 1-48)")
            print(f"   Validação: {len(val_data):,} registros (semanas 49-52)")
        
        return train_data, val_data

    def treinar_modelo_lightgbm(self, train_data, val_data=None):
        if self.verbose:
            print("Treinando modelo LightGBM...")

        X_train = train_data[self.feature_cols]
        y_train = train_data['quantity']

        # Configuração do LightGBM otimizada para previsão de vendas
        lgb_params = {
            'objective': 'poisson',
            'metric': 'poisson',
            'num_leaves': 127,
            'learning_rate': 0.05,
            'feature_fraction': 0.8,
            'bagging_fraction': 0.8,
            'bagging_freq': 1,
            'min_data_in_leaf': 20,
            'n_estimators': 1000,
            'random_state': 42,
            'n_jobs': -1,
        }

        start_time = time.time()
        try:
            self.model = lgb.LGBMRegressor(**lgb_params)

            callbacks = []
            if val_data is not None:
                X_val = val_data[self.feature_cols]
                y_val = val_data['quantity']
                
                # Parada antecipada
                callbacks.append(lgb.early_stopping(stopping_rounds=50))
                
                # Log do progresso
                if self.verbose:
                    callbacks.append(lgb.log_evaluation(period=100))

                self.model.fit(
                    X_train, y_train,
                    eval_set=[(X_val, y_val)],
                    eval_metric='poisson',
                    callbacks=callbacks
                )
            else:
                # Treina sem parada antecipada
                self.model.fit(X_train, y_train)

            train_time = time.time() - start_time
            if self.verbose:
                print(f"   Treinamento concluído em {train_time:.1f}s")

        except Exception as e:
            if self.verbose:
                print(f"   Erro no treinamento: {e}")
            raise e

    def validar_modelo(self, val_data):
        if self.verbose:
            print("Validando modelo...")
        
        X_val = val_data[self.feature_cols]
        y_val = val_data['quantity']
        
        # Faz previsões
        pred = self.model.predict(X_val)
        pred = np.maximum(0, pred)  
        
        # Calcula WMAPE
        wmape = np.sum(np.abs(y_val - pred)) / np.sum(np.abs(y_val)) if np.sum(np.abs(y_val)) > 0 else 0
        mae = mean_absolute_error(y_val, pred)
        
        self.validation_results = {
            'wmape': wmape,
            'mae': mae,
            'predictions': pred
        }
        
        if self.verbose:
            print(f"   WMAPE = {wmape:.4f} ({wmape*100:.2f}%)")
            print(f"   MAE = {mae:.2f}")
        
        return self.validation_results
    
    def gerar_previsao_janeiro_2023(self, trained_data):
        if self.verbose:
            print("Gerando previsões para Janeiro 2023 (semanas 1-5)...")
        
        # Obtém apenas combinações que existiam nos dados originais
        # e seus estados mais recentes
        last_states_query = """
        WITH ranked_data AS (
            SELECT mf.*,
                ROW_NUMBER() OVER (PARTITION BY mf.pdv, mf.produto ORDER BY mf.year DESC, mf.week_number DESC) as rn
            FROM ml_features mf
            INNER JOIN valid_combinations vc ON mf.pdv = vc.pdv AND mf.produto = vc.produto
        ),
        latest_data AS (
            SELECT 
                pdv, produto, premise, categoria_pdv, product_categoria,
                product_type, product_label, subcategoria, zipcode,
                quantity as last_quantity,
                avg_unit_price as last_price,
                rolling_mean_4 as last_rolling_mean_4,
                rolling_mean_8 as last_rolling_mean_8,
                produto_avg_quantity,
                pdv_avg_quantity,
                transaction_count,
                unique_days
            FROM ranked_data 
            WHERE rn = 1
        )
        SELECT * FROM latest_data
        """
        
        if self.verbose:
            print("   Extraindo estados mais recentes apenas para combinações PDV-Produto originais...")
        
        last_states = self.conn.execute(last_states_query).fetchdf()
        
        if self.verbose:
            print(f"   Encontradas {len(last_states):,} combinações PDV-Produto originais")
            print("   Nota: Prevendo apenas para combinações que já existiam nos dados históricos")
        
        # Adiciona codificações categóricas aos last_states
        categorical_cols = [
            'premise', 'categoria_pdv', 'product_categoria',
            'product_type', 'product_label', 'subcategoria'
        ]
        
        for col in categorical_cols:
            if col in last_states.columns and col in self.label_encoders:
                le = self.label_encoders[col]
                encoded_col = f'{col}_encoded'
                try:
                    # Trata categorias não vistas
                    encoded_values = []
                    for val in last_states[col].astype(str):
                        try:
                            encoded_values.append(le.transform([val])[0])
                        except ValueError:
                            encoded_values.append(-1)  # Categoria desconhecida
                    last_states[encoded_col] = encoded_values
                except Exception as e:
                    if self.verbose:
                        print(f"   Aviso na codificação {col}: {e}")
                    last_states[encoded_col] = -1
        
        # Abordagem de previsão em lote
        target_weeks = [1, 2, 3, 4, 5]  # Semanas de Janeiro 2023
        forecasts = []
        
        if self.verbose:
            print(f"   Criando características de previsão para semanas {target_weeks}...")
        
        # Pré-calcula características de tempo para todas as semanas
        time_features = {}
        for week in target_weeks:
            time_features[week] = {
                'week_sin': np.sin(2 * np.pi * week / 52),
                'week_cos': np.cos(2 * np.pi * week / 52),
                'month_sin': np.sin(2 * np.pi * 1 / 12),  # Janeiro
                'month_cos': np.cos(2 * np.pi * 1 / 12)
            }
        
        # Cria matriz de características para todas as combinações e semanas de uma vez
        total_predictions = len(last_states) * len(target_weeks)
        if self.verbose:
            print(f"   Preparando {total_predictions:,} previsões...")
        
        # Processa em lotes para evitar problemas de memória
        batch_size = 500000
        n_batches = (len(last_states) + batch_size - 1) // batch_size
        
        for batch_idx in range(n_batches):
            start_idx = batch_idx * batch_size
            end_idx = min((batch_idx + 1) * batch_size, len(last_states))
            batch_states = last_states.iloc[start_idx:end_idx]
            
            if self.verbose and n_batches > 1:
                print(f"   Processando lote {batch_idx + 1}/{n_batches} ({len(batch_states):,} combinações)")
            
            # Cria dados de previsão para este lote
            batch_predictions = []
            
            for week in target_weeks:
                week_features = []
                
                for _, row in batch_states.iterrows():
                    try:
                        # Cria vetor de características
                        feature_dict = {}
                        
                        # Características de tempo
                        feature_dict.update(time_features[week])
                        
                        # Características de atraso (usa últimos valores conhecidos)
                        feature_dict['lag_1'] = row.get('last_quantity', 0)
                        feature_dict['lag_2'] = row.get('last_quantity', 0)
                        feature_dict['lag_4'] = row.get('last_quantity', 0)
                        
                        # Estatísticas móveis
                        feature_dict['rolling_mean_4'] = row.get('last_rolling_mean_4', 0)
                        feature_dict['rolling_mean_8'] = row.get('last_rolling_mean_8', 0)
                        
                        # Características transversais
                        feature_dict['produto_avg_quantity'] = row.get('produto_avg_quantity', 0)
                        feature_dict['pdv_avg_quantity'] = row.get('pdv_avg_quantity', 0)
                        
                        # Outras características
                        feature_dict['unit_price'] = row.get('last_price', 0)
                        feature_dict['price_lag_1'] = row.get('last_price', 0)
                        feature_dict['momentum_1'] = 0
                        feature_dict['transaction_value'] = row.get('transaction_count', 1) * row.get('last_price', 0)
                        feature_dict['days_active_ratio'] = row.get('unique_days', 1) / 7.0
                        feature_dict['zipcode'] = row.get('zipcode', 0)
                        feature_dict['transaction_count'] = row.get('transaction_count', 1)
                        feature_dict['unique_days'] = row.get('unique_days', 1)
                        
                        # Adiciona características categóricas codificadas
                        for col in categorical_cols:
                            encoded_col = f'{col}_encoded'
                            feature_dict[encoded_col] = row.get(encoded_col, -1)
                        
                        # Cria vetor de características na ordem correta
                        feature_vector = [feature_dict.get(col, 0) for col in self.feature_cols]
                        week_features.append((row['pdv'], row['produto'], week, feature_vector))
                        
                    except Exception as e:
                        if self.verbose:
                            print(f"   Erro ao criar características para PDV {row['pdv']}, Produto {row['produto']}: {e}")
                        continue
                
                # Faz previsões em lote para esta semana
                if week_features:
                    try:
                        X_batch = pd.DataFrame([f[3] for f in week_features], columns=self.feature_cols).fillna(0)
                        predictions = self.model.predict(X_batch)
                        
                        # Armazena resultados
                        for i, (pdv, produto, semana, _) in enumerate(week_features):
                            pred = max(0, round(predictions[i]))
                            batch_predictions.append({
                                'semana': int(semana),
                                'pdv': int(pdv),
                                'produto': int(produto),
                                'quantidade': int(pred)
                            })
                    except Exception as e:
                        if self.verbose:
                            print(f"   Erro na previsão em lote para semana {week}: {e}")
                        continue
            
            forecasts.extend(batch_predictions)
            
            # Atualização de progresso
            if self.verbose and len(forecasts) > 0:
                progress = (batch_idx + 1) / n_batches * 100
                print(f"   Progresso: {progress:.1f}% ({len(forecasts):,} previsões geradas)")
        
        forecast_df = pd.DataFrame(forecasts)
        
        if self.verbose:
            print(f"   Geradas {len(forecast_df):,} previsões")
            if len(forecast_df) > 0:
                print(f"   PDVs únicos: {forecast_df['pdv'].nunique():,}")
                print(f"   Produtos únicos: {forecast_df['produto'].nunique():,}")
                print(f"   Quantidade média: {forecast_df['quantidade'].mean():.1f}")
                
                # Resumo semanal
                weekly_summary = forecast_df.groupby('semana')['quantidade'].agg(['count', 'sum', 'mean']).round(1)
                print("   Resumo semanal:")
                for week, (count, total, avg) in weekly_summary.iterrows():
                    print(f"      Semana {week}: {int(count):,} previsões, {int(total):,} qtd total, {avg:.1f} qtd média")
        
        return forecast_df
        
    def salvar_formato_hackathon(self, forecast_df, output_path):
        """Salva previsão no formato exato do hackathon"""
        if self.verbose:
            print("Salvando previsão no formato do hackathon...")
        
        # Garante ordem correta das colunas e tipos
        hackathon_df = forecast_df[['semana', 'pdv', 'produto', 'quantidade']].copy()
        
        hackathon_df.to_parquet(
        output_path, 
        index=False,
        compression='snappy'
        )
        
        if self.verbose:
            print(f"   Salvo em: {output_path}")
            print(f"   Formato: CSV com separador ';', codificação UTF-8")
            print(f"   Registros: {len(hackathon_df):,}")
    
    def executar_pipeline_completo(self, arquivo_loja, arquivo_transacao, arquivo_produto, 
                             output_path="hackathon_v1.parquet"):
        """Executa pipeline completo de previsão do hackathon"""
        start_time = time.time()
        
        print("PIPELINE DE PREVISÃO DE VENDAS - HACKATHON")
        print("=" * 60)
        print("Objetivo: Prever 5 semanas de vendas de Janeiro 2023")
        print("Treinamento: dados históricos de 2022")
        print("=" * 60)
        
        try:
            # 1. Inicializa DuckDB
            self.inicializar_duckdb()
            
            # 2. Carrega e junta dados
            self.carregar_e_juntar_dados(arquivo_loja, arquivo_transacao, arquivo_produto)
            
            # 3. Cria agregação semanal
            self.criar_agregacao_semanal()
            
            # 4. Cria características de ML
            self.criar_caracteristicas_ml()
            
            # 5. Prepara dados de treinamento
            training_data = self.preparar_dados_treinamento()
            
            # 6. Cria divisão temporal para validação
            train_data, val_data = self.divisao_validacao_temporal(training_data)
            
            # 7. Treina modelo com validação
            self.treinar_modelo_lightgbm(train_data, val_data)
            
            # 8. Valida modelo
            validation_results = self.validar_modelo(val_data)
            
            # 9. Retreina com dados completos de 2022 para previsão final (sem parada antecipada)
            if self.verbose:
                print("Retreinando com conjunto completo de dados de 2022...")
            self.treinar_modelo_lightgbm(training_data)
            
            # 10. Gera previsões para Janeiro 2023
            forecast_df = self.gerar_previsao_janeiro_2023(training_data)
            
            # 11. Salva no formato do hackathon
            self.salvar_formato_hackathon(forecast_df, output_path)
            
            # 12. Limpeza
            self.conn.close()
            
            total_time = time.time() - start_time
            print("=" * 60)
            print("PIPELINE DO HACKATHON CONCLUÍDO!")
            print(f"Tempo total: {total_time:.1f}s ({total_time/60:.1f}min)")
            print(f"Previsões: {len(forecast_df):,}")
            print(f"Saída: {output_path}")
            
            if validation_results:
                print(f"\nRESULTADOS DA VALIDAÇÃO (semanas 49-52 de 2022):")
                wmape_pct = validation_results['wmape'] * 100
                print(f"   LightGBM: {wmape_pct:.2f}% WMAPE")
                print(f"   MAE: {validation_results['mae']:.2f}")
            
            return forecast_df, validation_results
            
        except Exception as e:
            print(f"Erro no pipeline: {e}")
            if self.conn:
                self.conn.close()
            return None, None

# Função Principal de Execução  
Esta função orquestra todo o processo: carrega os dados, treina o modelo, valida os resultados e gera as previsões para Janeiro 2023.

In [10]:
def executar_previsao_hackathon(arquivo_loja, arquivo_transacao, arquivo_produto, arquivo_saida="hackathon_v1.parquet"):
    print("PREVISÃO DE VENDAS - HACKATHON")
    print("Prevendo 5 semanas de vendas de Janeiro 2023 por PDV-Produto")
    print("Saída: semana;pdv;produto;quantidade")
    print()
    
    hackathon_pipeline = PrevisaoVendasHackathon(verbose=True)
    
    # Executa pipeline do hackathon
    forecast_result, validation_metrics = hackathon_pipeline.executar_pipeline_completo(
        arquivo_loja=arquivo_loja,
        arquivo_transacao=arquivo_transacao,
        arquivo_produto=arquivo_produto,
        output_path=arquivo_saida
    )
    
    if forecast_result is not None:
        print("\nSUBMISSÃO DO HACKATHON PRONTA!")
        print("=" * 50)
        print(f"Arquivo: {arquivo_saida}")
        print("Formato: semana;pdv;produto;quantidade")
        print("Codificação: UTF-8")
        print("Semanas: 1-5 (Janeiro 2023)")
        
        # Exibe amostra dos resultados
        print("\nAMOSTRA DAS PREVISÕES:")
        print(forecast_result.head(10).to_string(index=False))
        
        # Resumo semanal para validação
        print("\nRESUMO SEMANAL:")
        weekly_summary = forecast_result.groupby('semana').agg({
            'quantidade': ['count', 'sum', 'mean']
        }).round(1)
        weekly_summary.columns = ['Previsões', 'Qtd_Total', 'Qtd_Média']
        print(weekly_summary.to_string())
        
        # Verificações finais de qualidade
        print("\nVERIFICAÇÕES DE QUALIDADE:")
        print(f"   Total de previsões: {len(forecast_result):,}")
        print(f"   PDVs únicos: {forecast_result['pdv'].nunique():,}")
        print(f"   Produtos únicos: {forecast_result['produto'].nunique():,}")
        print(f"   Semanas cobertas: {sorted(forecast_result['semana'].unique())}")
        print(f"   Tipos de dados: Todos inteiros = {all(forecast_result.dtypes == 'int64')}")
        print(f"   Valores faltantes: {forecast_result.isnull().sum().sum()}")
        print(f"   Quantidades negativas: {(forecast_result['quantidade'] < 0).sum()}")
        
        if os.path.exists(arquivo_saida):
            file_size = os.path.getsize(arquivo_saida) / 1024 / 1024  # MB
            print(f"   Tamanho do arquivo: {file_size:.1f} MB")
        
        return forecast_result, validation_metrics
        
    else:
        print("Pipeline falhou. Verifique seus arquivos de dados e caminhos.")
        return None, None

# Funções Utilitárias de Validação
A função principal aqui verifica se sua submissão está no formato certinho: separador correto, encoding UTF-8, tipos de dados adequados, sem valores negativos ou faltantes.

In [11]:
def validar_saida_hackathon(parquet_path="hackathon_v1.parquet"):
    print("VALIDANDO SAÍDA DO HACKATHON")
    print("=" * 40)
    
    try:
        df = pd.read_parquet(parquet_path)
        
        print(f"Arquivo carregado com sucesso")
        print(f"Forma: {df.shape}")
        
        # Verifica nomes das colunas
        expected_cols = ['semana', 'pdv', 'produto', 'quantidade']
        actual_cols = list(df.columns)
        
        if actual_cols == expected_cols:
            print(f"Nomes das colunas corretos: {actual_cols}")
        else:
            print(f"Nomes das colunas incorretos!")
            print(f"   Esperado: {expected_cols}")
            print(f"   Atual: {actual_cols}")
            return False
        
        print(f"Tipos de dados:")
        for col in df.columns:
            dtype = df[col].dtype
            is_integer = pd.api.types.is_integer_dtype(dtype)
            status = "✓" if is_integer else "✗"
            print(f"   {status} {col}: {dtype}")
            
            # Verifica IDs grandes
            if col in ['pdv', 'produto'] and is_integer:
                max_val = df[col].max()
                if max_val > 2**31:
                    print(f"      ID grande detectado (máx: {max_val}) - usando 64-bit")
        
        # Verifica intervalos de valores
        print(f"Intervalos de valores:")
        for col in df.columns:
            min_val, max_val = df[col].min(), df[col].max()
            print(f"   {col}: {min_val} a {max_val}")
        
        # Verifica valores faltantes
        missing = df.isnull().sum().sum()
        if missing == 0:
            print(f"Sem valores faltantes")
        else:
            print(f"Valores faltantes encontrados: {missing}")
        
        # Verifica valores das semanas
        unique_weeks = sorted(df['semana'].unique())
        expected_weeks = [1, 2, 3, 4, 5]
        if list(unique_weeks) == expected_weeks:
            print(f"Semanas corretas: {unique_weeks}")
        else:
            print(f"Semanas incorretas: {unique_weeks}")
            print(f"   Esperado: {expected_weeks}")
        
        # Verifica quantidades negativas
        negative_qty = (df['quantidade'] < 0).sum()
        if negative_qty == 0:
            print(f"Sem quantidades negativas")
        else:
            print(f"Quantidades negativas encontradas: {negative_qtysubmission}")
        
        print(f"\nESTATÍSTICAS RESUMIDAS:")
        print(f"   Registros: {len(df):,}")
        print(f"   PDVs únicos: {df['pdv'].nunique():,}")
        print(f"   Produtos únicos: {df['produto'].nunique():,}")
        print(f"   Quantidade total: {df['quantidade'].sum():,}")
        print(f"   Quantidade média: {df['quantidade'].mean():.1f}")
        
        print(f"\nVALIDAÇÃO DO FORMATO HACKATHON: PASSOU ✓")
        return True
        
    except Exception as e:
        print(f"Validação falhou: {e}")
        return False

# Script de Execução Principal
Defina os caminhos dos seus arquivos aqui e rode tudo. O script verifica se os arquivos existem, executa todo o pipeline, e no final valida se está tudo certinho para submissão.

In [12]:
if __name__ == "__main__":
    arquivo_loja = "../data/raw/file1.parquet"        # Dados da loja
    arquivo_transacao = "../data/raw/file2.parquet"   # Dados de transação  
    arquivo_produto = "../data/raw/file3.parquet"     # Dados do produto
    arquivo_saida = "hackathon_v1.parquet"
    
    # Verifica se os arquivos existem
    arquivos_existem = all(os.path.exists(f) for f in [arquivo_loja, arquivo_transacao, arquivo_produto])
    
    if arquivos_existem:
        print("Arquivos de dados encontrados. Executando pipeline do hackathon...")
        
        forecast_result, validation_metrics = executar_previsao_hackathon(
            arquivo_loja=arquivo_loja,
            arquivo_transacao=arquivo_transacao, 
            arquivo_produto=arquivo_produto,
            arquivo_saida=arquivo_saida
        )
        
        if forecast_result is not None:
            # Valida a saída
            print("\n" + "="*60)
            validar_saida_hackathon(arquivo_saida)
            
        else:
            print("Previsão falhou!")
    else:
        print("Arquivos de dados não encontrados. Verifique os caminhos dos arquivos:")
        for f in [arquivo_loja, arquivo_transacao, arquivo_produto]:
            exists = "✓" if os.path.exists(f) else "✗"
            print(f"   {exists} {f}")
        print("\nAjuste os caminhos dos arquivos no script e tente novamente.")

Arquivos de dados encontrados. Executando pipeline do hackathon...
PREVISÃO DE VENDAS - HACKATHON
Prevendo 5 semanas de vendas de Janeiro 2023 por PDV-Produto
Saída: semana;pdv;produto;quantidade

PIPELINE DE PREVISÃO DE VENDAS - HACKATHON
Objetivo: Prever 5 semanas de vendas de Janeiro 2023
Treinamento: dados históricos de 2022
Inicializando DuckDB para previsão do hackathon...
   Memória: 12GB, Threads: 10
Carregando e juntando conjuntos de dados...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

   Combinações válidas PDV-Produto: 1,015,952
Erro no pipeline: Query interrupted
Pipeline falhou. Verifique seus arquivos de dados e caminhos.
Previsão falhou!
