# 0 - Clonando Repositorio

In [None]:
!git clone https://github.com/marcos-s1/Big-Data-forecast.git

Cloning into 'Big-Data-forecast'...
fatal: could not read Username for 'https://github.com': No such device or address


In [None]:
%cd /content/Big-Data-forecast

/content/Big-Data-forecast


# 0 - Montando drive e configurando ambiente

Vamos começar montando o Google Drive. Isso nos permitirá ler os arquivos da pasta data/

In [1]:
# Célula de Execução do 01_load_data.py no Colab

import os
import sys
from pyspark.sql import SparkSession
from google.colab import drive

# 1. Monta o Google Drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# 2. Configura os caminhos e o ambiente
project_root = '/content/Big-Data-forecast'
project_root_base = '/content/drive/MyDrive/Hackaton - Big Data'

# Navega para a raiz do projeto (opcional, mas boa prática)
%cd {project_root}
# Adiciona a pasta 'src' ao caminho de busca do Python
if './src' not in sys.path:
    sys.path.append('./src')

/content/Big-Data-forecast


# 1 - Leitura e Join das bases

Leitura da base .zip de treinamento e join das informações com base nas chaves e valor

In [None]:
from _01_load_data import read_and_join_dataframes

output_path = os.path.join(project_root_base, "data", "processed", "base_consolidada.parquet")

zip_file_path = "/content/drive/MyDrive/Hackaton - Big Data/data/raw/hackathon_2025_templates.zip"
final_df = read_and_join_dataframes(zip_file_path)

if final_df is not None:
    print("\n--- Processo concluído. DataFrame final criado. ---")

    print(f"\nSalvando o DataFrame final em {output_path}...")
    final_df.to_parquet(output_path, engine='pyarrow')
    print("Arquivo salvo com sucesso.")

--- Descompactando arquivos .parquet do .zip ---
--- Lendo arquivos .parquet com Pandas ---
Arquivo 'part-00000-tid-2779033056155408584-f6316110-4c9a-4061-ae48-69b77c7c8c36-4-1-c000.snappy.parquet' lido com sucesso.
Arquivo 'part-00000-tid-5196563791502273604-c90d3a24-52f2-4955-b4ec-fb143aae74d8-4-1-c000.snappy.parquet' lido com sucesso.
Arquivo 'part-00000-tid-7173294866425216458-eae53fbf-d19e-4130-ba74-78f96b9675f1-4-1-c000.snappy.parquet' lido com sucesso.

--- Realizando o join das bases ---

DataFrames carregados:
df_1: 14419 linhas, 4 colunas
                   pdv     premise categoria_pdv  zipcode
0  2204965430669363375  On Premise  Mexican Rest    30741
1  5211957289528622910  On Premise   Hotel/Motel    80011
df_2: 6560698 linhas, 11 colunas
     internal_store_id  internal_product_id distributor_id transaction_date  \
0  7384367747233276219   328903483604537190              9       2022-07-13   
1  3536908514005606262  5418855670645487653              5       2022-03-21   



# 2 - Engenharia de Feature para Criar Novas Variáveis

Nesta etapa, vamos transformar a nossa base de dados consolidada em um conjunto de dados robusto e rico em informações. A **engenharia de features** é o processo de criar novas variáveis a partir dos dados existentes, o que permite ao nosso modelo de previsão capturar padrões mais complexos e precisos.

O objetivo aqui é fornecer ao modelo informações que ele não conseguiria extrair por conta própria. Para o nosso problema de previsão de vendas, vamos focar em dois tipos principais de features:

#### **A. Features de Tempo**
Modelos de previsão dependem fortemente do tempo. Vamos criar variáveis para capturar a sazonalidade e a tendência da nossa série temporal.
* **`week_rank`**: O número da semana do ano (`1` a `52`).
* **`week_of_month`**: A semana dentro do mês (`1` a `4` ou `5`). Essa variável é poderosa para capturar picos de vendas no início ou no fim do mês.
* **`rank`**: Um identificador sequencial que nos permite ordenar o tempo corretamente para aplicar as funções de janela.

#### **B. Features Defasadas (Lagged Features)**
Estas são as features mais importantes para prever vendas. Uma feature defasada é uma variável que contém o valor de uma coluna em um período de tempo anterior.

* **Exemplo:** A venda total das últimas 3 semanas (`sum_quantity_last_3_weeks`) é um excelente preditor para a venda da próxima semana.
* **O que faremos?** Para cada pdv e produto, vamos calcular a soma, média, mínimo, máximo e desvio padrão das vendas e de outras métricas (como faturamento e lucro) em diferentes janelas de tempo (última semana, últimas 3 semanas, etc.).

Ao final desta etapa, nossa base de dados estará completa e pronta para o processo de seleção de features, onde descobriremos quais dessas variáveis são realmente importantes para o modelo.

In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
import os
import sys

In [None]:
# Define a raiz do projeto (subindo um nível a partir da pasta 'notebooks')
src_path = os.path.join(project_root, 'src')

# Adiciona a pasta 'src' ao caminho de busca do Python
if src_path not in sys.path:
    sys.path.append(src_path)

print(f"Caminho do projeto configurado para: {project_root}")

Caminho do projeto configurado para: /content/Big-Data-forecast


In [None]:
# Inicializa a sessão Spark
try:
    spark
except NameError:
    spark = SparkSession.builder \
    .appName("FeatureEngineeringNotebook") \
    .config("spark.network.timeout", "600s") \
    .config("spark.sql.broadcastTimeout", "1200s") \
    .getOrCreate()

print("Sessão Spark iniciada com sucesso.")

Sessão Spark iniciada com sucesso.


In [None]:
# !!! Add o diretorio src ao SparkContext's Python !!!
spark.sparkContext.addPyFile(os.path.join(project_root, 'src', '_02_feature_engineering.py'))

# Import apos adicionar arquivo ao SparkContext
from _02_feature_engineering import feature_engineering_pipeline

print('\nCodigo de pipeline de feature engineering carregado.\n')


Codigo de pipeline de feature engineering carregado.



In [None]:
# Define o caminho para o DataFrame consolidado, que é a saída do 01_load_data.py
input_path = os.path.join(project_root_base, "data", "processed", "base_consolidada.parquet")


In [None]:
# Verifica se o arquivo existe antes de tentar ler
if not os.path.exists(input_path):
    print("Erro: Arquivo base_consolidada.parquet não encontrado.")
    print("Verifique se o 01_load_data.py foi executado e se o caminho está correto.")
else:
    # 1. Carrega o DataFrame consolidado com o Spark
    print("\nCarregando o DataFrame consolidado...\n")
    df_consolidado = spark.read.parquet(input_path)
    print("DataFrame consolidado carregado com sucesso.")

    # Take a sample of the DataFrame (adjust fraction as needed)
    sample_fraction = 0.15  # 15% sample
    df_consolidado = df_consolidado.sample(False, sample_fraction, seed=42)

    print(f"\nCriada uma amostra de {sample_fraction*100}% do DataFrame.\n")
    print(f"Volume da amostra: {df_consolidado.count()} linhas\n")


Carregando o DataFrame consolidado...

DataFrame consolidado carregado com sucesso.

Criada uma amostra de 15.0% do DataFrame.

Volume da amostra: 985342 linhas



In [None]:
# Define as janelas de semanas e as funções de agregação desejadas
week_windows_to_consider = [1, 3, 6, 9, 12, 15, 18]

# Define as colunas para as quais as agregações de valor serão aplicadas
value_cols_to_aggregate = ['quantity', 'gross_value', 'net_value', 'gross_profit', 'discount', 'taxes', 'quantidade_pedidos']

# Define as funções de agregação a serem aplicadas ('count' para distinct_products, outros para value_columns)
aggregation_functions_to_apply = ['count', 'sum', 'mean', 'min', 'max', 'std'] # 'mean' é mapeado para 'avg', 'std' para 'stddev'

df_final = feature_engineering_pipeline(
    df_consolidado,
    week_windows_to_consider,
    value_cols_to_aggregate,
    aggregation_functions_to_apply,
    spark
    )

Criando identificador de semana global e rank...
Criando identificador de semana global e rank...
Calculando tempo_ultimo_pedido...
Calculando features defasadas e de janela móvel de forma otimizada...
Calculando contagem distinta de produtos por semana...


In [None]:
import os

output_path = os.path.join(project_root_base, "data", "processed", "base_com_features.parquet")

# Garante que o diretório exista
output_dir = os.path.dirname(output_path)
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    print(f"Diretório criado: {output_dir}")

print(f"Salvando o DataFrame Spark 'df_final' em: {output_path} com repartition(100)")

df_final.repartition(100).write.parquet(output_path, mode="overwrite")

print("DataFrame Spark salvo com sucesso.")

Salvando o DataFrame Spark 'df_final' em: /content/drive/MyDrive/Hackaton - Big Data/data/processed/base_com_features.parquet com repartition(100)
DataFrame Spark salvo com sucesso.


# 3 - Tratamento e Agrupamento de Categorias

Nesta etapa, vamos nos concentrar na limpeza das nossas variáveis categóricas para torná-las mais eficientes para o modelo. Um desafio comum em datasets com muitas categorias é a presença de valores de baixa frequência.

Para resolver isso, utilizaremos uma função que agrupa automaticamente essas categorias raras em uma única "super-categoria" chamada **`Outros`**. Isso ajuda o modelo a focar nos padrões mais importantes, sem se distrair com categorias que aparecem poucas vezes.

Além disso, é importante reforçar que, após este agrupamento, não precisaremos de mais tratamentos como o **One-Hot Encoding**. O nosso modelo, o **CatBoost**, foi escolhido justamente por lidar com variáveis categóricas de forma nativa e otimizada.

**As vantagens de usar o CatBoost são:**
* **Sem Custo de Performance:** Ele não precisa transformar categorias em vetores binários, o que economiza tempo de processamento e memória.
* **Redução de Dimensionalidade:** Ao lidar com categorias diretamente, ele evita o problema de `sparse data` (dados esparsos), que acontece quando o One-Hot Encoding cria milhares de colunas com valores zero.
* **Robustez:** O CatBoost é menos propenso a erros de inferência de tipo e lida de forma mais inteligente com dados categóricos, resultando em um modelo mais preciso e estável.

Com este passo, garantimos que todas as nossas variáveis categóricas estejam prontas para o treinamento, sem a necessidade de etapas adicionais de pré-processamento.

In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
import os
import sys

In [None]:
src_path = os.path.join(project_root, 'src')

# Adiciona a pasta 'src' ao caminho de busca do Python
if src_path not in sys.path:
    sys.path.append(src_path)

print(f"Caminho do projeto configurado para: {project_root}")

Caminho do projeto configurado para: /content/Big-Data-forecast


In [None]:
# Inicializa a sessão Spark
try:
    spark
except NameError:
    spark = SparkSession.builder \
    .appName("FeatureEngineeringNotebook") \
    .config("spark.network.timeout", "600s") \
    .config("spark.sql.broadcastTimeout", "1200s") \
    .getOrCreate()

print("Sessão Spark iniciada com sucesso.")

Sessão Spark iniciada com sucesso.


In [None]:
# !!! Add o diretorio src ao SparkContext's Python !!!
spark.sparkContext.addPyFile(os.path.join(project_root, 'src', '_03_cat_encoder.py'))

# Import apos adicionar arquivo ao SparkContext
from _03_cat_encoder import group_low_frequency_categories

In [None]:
# Define o caminho para o DataFrame com features, que é a saída da etapa 2

input_path = os.path.join(project_root_base, "data", "processed", "base_com_features.parquet")

In [None]:
# Verifica se o diretório existe antes de tentar ler
if not os.path.exists(input_path):
    print(f"Erro: Diretório '{input_path}' não encontrado.")
    print("Por favor, garanta que o DataFrame com as features foi salvo neste local.")
else:
    print(f"Lendo arquivos Parquet de: {input_path}")
    try:
        # O Spark lê automaticamente todos os arquivos de partição dentro do diretório
        df_features_read = spark.read.parquet(input_path)
        print("DataFrame com as features lido com sucesso.")

        print("\nVolume da Base de Features:")
        print(f"Linhas: {df_features_read.count()}")
        print(f"Colunas: {len(df_features_read.columns)}")

    except Exception as e:
        print(f"Erro ao ler arquivos Parquet: {e}")

Lendo arquivos Parquet de: /content/drive/MyDrive/Hackaton - Big Data/data/processed/base_com_features.parquet
DataFrame com as features lido com sucesso.

Volume da Base de Features:
Linhas: 982857
Colunas: 285


In [None]:
import os

CATEGORICAL_FEATURES = [
    'premise',
    'categoria_pdv',
    'zipcode',
    'categoria',
    'tipos',
    'label',
    'subcategoria',
    'marca',
    'fabricante',
    'week_of_month',
]

df_processed, grouped_map = group_low_frequency_categories(df=df_features_read, categorical_cols=CATEGORICAL_FEATURES)

Processando a coluna: premise
  Agrupando 1 categorias com frequência < 1.00% em 'Outros' para a coluna 'premise'
Processando a coluna: categoria_pdv
  Agrupando 48 categorias com frequência < 1.00% em 'Outros' para a coluna 'categoria_pdv'
Processando a coluna: zipcode
  Agrupando 746 categorias com frequência < 1.00% em 'Outros' para a coluna 'zipcode'
Processando a coluna: categoria
  Agrupando 2 categorias com frequência < 1.00% em 'Outros' para a coluna 'categoria'
Processando a coluna: tipos
  Agrupando 13 categorias com frequência < 1.00% em 'Outros' para a coluna 'tipos'
Processando a coluna: label
  Agrupando 12 categorias com frequência < 1.00% em 'Outros' para a coluna 'label'
Processando a coluna: subcategoria
  Agrupando 27 categorias com frequência < 1.00% em 'Outros' para a coluna 'subcategoria'
Processando a coluna: marca
  Agrupando 3423 categorias com frequência < 1.00% em 'Outros' para a coluna 'marca'
Processando a coluna: fabricante
  Agrupando 311 categorias com f

In [None]:
import pickle

# Salva o dicionário de mapeamento
output_path = "models/grouped_categories_map.pkl"
try:
    with open(output_path, 'wb') as f:
        pickle.dump(grouped_map, f)
    print(f"\nDicionário de mapeamento salvo em: {output_path}\n")
except Exception as e:
    print(f"Erro ao salvar o dicionário: {e}")


Dicionário de mapeamento salvo em: models/grouped_categories_map.pkl



In [None]:
output_path = os.path.join(project_root_base, "data", "processed", "base_com_features_encoded.parquet")

# Garante que o diretório exista
output_dir = os.path.dirname(output_path)
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    print(f"Diretório criado: {output_dir}")

print(f"Salvando o DataFrame Spark 'df_final' em: {output_path} com repartition(100)")

df_processed.repartition(100).write.parquet(output_path, mode="overwrite")

print("DataFrame Spark salvo com sucesso.")

Saving Spark DataFrame 'df_final' to: /content/drive/MyDrive/Hackaton - Big Data/data/processed/base_com_features_encoded.parquet with repartition(100)
Spark DataFrame saved successfully.


# **4 - Seleção de Features**
---
Nesta etapa, vamos otimizar o nosso modelo, garantindo que ele use apenas as variáveis mais relevantes e evitando o problema de **overfitting**.

Para isso, estamos combinando duas técnicas poderosas em um processo automatizado:

1.  **Triagem Rápida por Importância:** Primeiro, treinamos um modelo rápido com todas as features para identificar a importância de cada uma delas. As variáveis com baixa relevância (importância `0` ou próxima de `0`) são removidas, o que nos economiza tempo. 🚀
2.  **`Backward Selection` com Validação:** Em seguida, para as features restantes, usamos um processo de eliminação mais rigoroso. A cada rodada, removemos uma feature e avaliamos o modelo. A feature que causa a menor perda de performance no conjunto de teste é a removida.

Este processo garante que o modelo final seja robusto e não se ajuste demais aos dados de treino.

In [3]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
import os
import sys

In [4]:
src_path = os.path.join(project_root, 'src')

# Adiciona a pasta 'src' ao caminho de busca do Python
if src_path not in sys.path:
    sys.path.append(src_path)

print(f"Caminho do projeto configurado para: {project_root}")

Caminho do projeto configurado para: /content/Big-Data-forecast


In [5]:
# Inicializa a sessão Spark
try:
    spark
except NameError:
    spark = SparkSession.builder \
    .appName("FeatureEngineeringNotebook") \
    .config("spark.network.timeout", "600s") \
    .config("spark.sql.broadcastTimeout", "1200s") \
    .getOrCreate()

print("Sessão Spark iniciada com sucesso.")

Sessão Spark iniciada com sucesso.


In [6]:
!pip install tqdm
!pip install catboost



In [7]:
# !!! Add o diretorio src ao SparkContext's Python !!!
# project_root = os.path.abspath(os.path.join(os.getcwd()))
spark.sparkContext.addPyFile(os.path.join(project_root, 'src', '_04_select_features.py'))

# Import apos adicionar arquivo ao SparkContext
from _04_select_features import run_feature_selection

print('\nCodigo de seleção de Features carregado com sucesso.\n')


Codigo de seleção de Features carregado com sucesso.



In [8]:
import pandas as pd
import os

# Define o caminho para a pasta contendo os arquivos parquet
input_folder_path = os.path.join(project_root_base, "data", "processed", "base_com_features_encoded.parquet")

# Verifica se a pasta existe antes de tentar ler
if not os.path.exists(input_folder_path):
    print(f"Erro: Pasta '{input_folder_path}' não encontrada.")
    print("Por favor, verifique se o caminho está correto.")
else:
    print(f"Lendo arquivos parquet da pasta: {input_folder_path}")
    try:
        # O pandas read_parquet pode ler diretamente de uma pasta contendo múltiplos arquivos parquet
        df_features_read_encoded_pandas = pd.read_parquet(input_folder_path, engine='pyarrow')
        print("DataFrame com features lido com sucesso.")

        print("\nVolume da Base de Features:")
        print(f"Linhas: {len(df_features_read_encoded_pandas)}")
        print(f"Colunas: {len(df_features_read_encoded_pandas.columns)}")

        # Se for preciso, pegar uma amostra do Dataframe
        sample_fraction = 1.0  # You can adjust this fraction as needed
        df_features_read_encoded_pandas_sample = df_features_read_encoded_pandas.sample(frac=sample_fraction, random_state=42)
        print(f"\nCriada uma amostra de {sample_fraction*100}% do DataFrame.")
        print(f"Volume da amostra: {len(df_features_read_encoded_pandas_sample)} linhas")

        # Remove o dataframe original para poupar memoria
        del df_features_read_encoded_pandas
        print("\nDataFrame original removido da memória.")


    except Exception as e:
        print(f"Erro ao ler arquivos parquet: {e}")

Lendo arquivos parquet da pasta: /content/drive/MyDrive/Hackaton - Big Data/data/processed/base_com_features_encoded.parquet
DataFrame com features lido com sucesso.

Volume da Base de Features:
Linhas: 982857
Colunas: 285

Criada uma amostra de 100.0% do DataFrame.
Volume da amostra: 982857 linhas

DataFrame original removido da memória.


In [11]:
import datetime
from datetime import date

df_features_read_encoded_pandas_sample.fillna(-1, inplace = True)

# Dicionário de parâmetros para o Catboost
catboost_params = {
    'task_type': 'GPU',
    'iterations': 1500,                  # Aumentado para o early stopping
    'learning_rate': 0.10,               # Valor mais baixo para maior precisão
    'depth': 6,                          # Maior profundidade para capturar padrões complexos
    'early_stopping_rounds': 50,
    'loss_function': 'RMSE',
    'l2_leaf_reg': 3,
    'min_data_in_leaf': 1500,
    'verbose': 0,
    'random_state': 42,

    # --- Parâmetros Otimizados Adicionados Posteriormente ---
    # 'subsample': 0.8,                    # Usa 80% dos dados em cada árvore
    # 'colsample_bylevel': 0.8,            # Usa 80% das colunas em cada nível da árvore
    'random_strength': 1.0,              # Aumenta a aleatoriedade para evitar overfitting
    # 'bootstrap_type': 'Bayesian',       # Método de amostragem eficiente
}

# Parâmetros de configuração do pipeline
TARGET = 'quantity'
IDENTIFIERS = ['internal_store_id', 'internal_product_id', 'rank', 'global_week_id']
IGNORE_FEATURES = [
    'pdv',
    'produto',
    'distributor_id',
    'transaction_date',
    'descricao',
    'reference_date',
    ]
CATEGORICAL_FEATURES = [
    'premise',
    'categoria_pdv',
    'zipcode',
    'categoria',
    'tipos',
    'label',
    'subcategoria',
    'marca',
    'fabricante',
    'week_of_month',
]
filter_date = date(2022, 11, 1)

# Lista de todas as features candidatas, excluindo o target e os identificadores
ALL_FEATURES = [col for col in df_features_read_encoded_pandas_sample.columns if col not in [TARGET] + IDENTIFIERS + IGNORE_FEATURES]

In [12]:

# Chama a função principal com todos os parâmetros
features_selecionadas = run_feature_selection(
    df = df_features_read_encoded_pandas_sample,
    reference_date_for_validation=filter_date,
    catboost_params=catboost_params,
    label_col=TARGET,
    id_cols=IDENTIFIERS,
    ignore_features=IGNORE_FEATURES,
    categorical_features=CATEGORICAL_FEATURES
    )

--- 1. Preparando os dados para a seleção de features ---
Conjunto de treino (amostra): 900396 linhas
Conjunto de validação (completo): 82461 linhas

--- 2. Seleção Rápida por Feature Importance ---
Features após a triagem inicial: 47 de 279

--- 3. Iniciando Backward Selection com Validação ---


Backward Selection:   0%|          | 0/47 [00:00<?, ?it/s]

Testando 47 features:   0%|          | 0/47 [00:00<?, ?it/s]

-> Removida: 'categoria'. Novo melhor WMAPE: 0.3894


Testando 46 features:   0%|          | 0/46 [00:00<?, ?it/s]


O WMAPE não melhorou mais. Parando a eliminação.

--- Processo de Seleção Concluído ---
Conjunto final de features: ['mean_discount_last_15_weeks', 'min_quantity_last_9_weeks', 'std_taxes_last_18_weeks', 'min_quantity_last_12_weeks', 'std_taxes_last_12_weeks', 'max_quantity_last_18_weeks', 'std_gross_value_last_15_weeks', 'min_quantity_last_15_weeks', 'Vendas_Anteriores_lag_3', 'marca', 'Vendas_Anteriores_lag_1', 'min_quantity_last_6_weeks', 'sum_quantity_last_15_weeks', 'std_quantity_last_12_weeks', 'min_taxes_last_6_weeks', 'max_quantity_last_12_weeks', 'max_quantity_last_15_weeks', 'week_rank', 'sum_quantity_last_18_weeks', 'mean_quantity_last_12_weeks', 'mean_quantity_last_15_weeks', 'sum_quantity_last_12_weeks', 'max_gross_value_last_9_weeks', 'sum_quantidade_pedidos_last_18_weeks', 'zipcode', 'mean_quantidade_pedidos_last_6_weeks', 'max_net_value_last_18_weeks', 'week_of_month', 'fabricante', 'distinct_products_last_1_weeks', 'distinct_products_last_9_weeks', 'taxes', 'premise',

In [13]:
# Salvar a Lista de Features Selecionadas ---
output_path = os.path.join(project_root, "models", "features_selecionadas.txt")

if features_selecionadas:
    with open(output_path, "w") as f:
        for item in features_selecionadas:
            f.write(f"{item}\n")
    print(f"\nLista de features selecionadas salva em '{output_path}'.\n")
else:
    print("\nNenhuma feature foi selecionada. Verifique os parâmetros.\n")


Lista de features selecionadas salva em '/content/Big-Data-forecast/models/features_selecionadas.txt'.



In [None]:
# features_selecionadas = ['min_quantity_last_6_weeks', 'max_quantity_last_3_weeks', 'min_quantity_last_3_weeks', 'mean_quantity_last_3_weeks', 'std_quantity_last_3_weeks', 'sum_quantidade_pedidos_last_15_weeks', 'marca', 'week_of_month', 'zipcode', 'fabricante', 'premise', 'distinct_products_last_6_weeks', 'categoria', 'tipos', 'distinct_products_last_15_weeks', 'taxes', 'distinct_products_last_3_weeks', 'distinct_products_last_12_weeks', 'distinct_products_last_9_weeks', 'net_value', 'distinct_products_last_18_weeks', 'subcategoria', 'discount']

# 5 - Treinamento Final do modelo

Depois de todo o trabalho de engenharia e seleção de features, agora temos uma base de dados rica em informações e um conjunto ideal de variáveis para o nosso modelo.

Nesta etapa, o objetivo é treinar o modelo CatBoost com todo o conjunto de dados históricos (de 2022). Utilizaremos a lista final de features que selecionamos no passo anterior.

Ao final, salvaremos o modelo treinado em um arquivo (.cbm) na pasta models/. Este arquivo será a "inteligência" que usaremos para fazer as previsões para janeiro de 2023 no próximo script.

Com este passo, garantimos que o modelo final seja o mais preciso e robusto possível, pronto para nos ajudar na tomada de decisões no varejo.

In [14]:
import os
import pandas as pd
import numpy as np
from catboost import CatBoostRegressor

# Define o caminho para o arquivo de features selecionadas
project_root = '/content/Big-Data-forecast'
features_file_path = os.path.join(project_root, "models", "features_selecionadas.txt")

# Verifica se o arquivo existe
if not os.path.exists(features_file_path):
    print(f"Erro: Arquivo '{features_file_path}' não encontrado.")
    print("Por favor, verifique se a seleção de features foi executada com sucesso.")
else:
    # Lê as features do arquivo
    with open(features_file_path, "r") as f:
        features_selecionadas = [line.strip() for line in f if line.strip()]

    print(f"Features selecionadas lidas do arquivo: {len(features_selecionadas)} features")
    print("Lista de features selecionadas:")
    print(features_selecionadas)

Features selecionadas lidas do arquivo: 46 features
Lista de features selecionadas:
['mean_discount_last_15_weeks', 'min_quantity_last_9_weeks', 'std_taxes_last_18_weeks', 'min_quantity_last_12_weeks', 'std_taxes_last_12_weeks', 'max_quantity_last_18_weeks', 'std_gross_value_last_15_weeks', 'min_quantity_last_15_weeks', 'Vendas_Anteriores_lag_3', 'marca', 'Vendas_Anteriores_lag_1', 'min_quantity_last_6_weeks', 'sum_quantity_last_15_weeks', 'std_quantity_last_12_weeks', 'min_taxes_last_6_weeks', 'max_quantity_last_12_weeks', 'max_quantity_last_15_weeks', 'week_rank', 'sum_quantity_last_18_weeks', 'mean_quantity_last_12_weeks', 'mean_quantity_last_15_weeks', 'sum_quantity_last_12_weeks', 'max_gross_value_last_9_weeks', 'sum_quantidade_pedidos_last_18_weeks', 'zipcode', 'mean_quantidade_pedidos_last_6_weeks', 'max_net_value_last_18_weeks', 'week_of_month', 'fabricante', 'distinct_products_last_1_weeks', 'distinct_products_last_9_weeks', 'taxes', 'premise', 'categoria_pdv', 'gross_value', 

In [None]:
import pandas as pd
import os

# Define o caminho para a pasta contendo os arquivos parquet
input_folder_path = "/content/drive/MyDrive/Hackaton - Big Data/data/processed/base_com_features.parquet"

# Verifica se a pasta existe antes de tentar ler
if not os.path.exists(input_folder_path):
    print(f"Erro: Pasta '{input_folder_path}' não encontrada.")
    print("Por favor, verifique se o caminho está correto.")
else:
    print(f"Lendo arquivos parquet da pasta: {input_folder_path}")
    try:
        # O pandas read_parquet pode ler diretamente de uma pasta contendo múltiplos arquivos parquet
        df_features_read_encoded_pandas = pd.read_parquet(input_folder_path, engine='pyarrow')
        print("DataFrame com features lido com sucesso.")

        # Display schema and a few rows
        print("\nSchema do DataFrame lido:")
        df_features_read_encoded_pandas.info()

        print("\nVolume da Base de Features:")
        print(f"Linhas: {len(df_features_read_encoded_pandas)}")
        print(f"Colunas: {len(df_features_read_encoded_pandas.columns)}")

        print("\nPrimeiras 5 linhas do DataFrame lido:")
        display(df_features_read_encoded_pandas.head())

        # Take a sample of the DataFrame
        sample_fraction = 1.0 # You can adjust this fraction as needed
        df_features_read_encoded_pandas_sample = df_features_read_encoded_pandas.sample(frac=sample_fraction, random_state=42)
        print(f"\nCriada uma amostra de {sample_fraction*100}% do DataFrame.")
        print(f"Volume da amostra: {len(df_features_read_encoded_pandas_sample)} linhas")

        # Remove the original DataFrame from memory
        del df_features_read_encoded_pandas
        print("\nDataFrame original removido da memória.")


    except Exception as e:
        print(f"Erro ao ler arquivos parquet: {e}")

Lendo arquivos parquet da pasta: /content/drive/MyDrive/Hackaton - Big Data/data/processed/base_com_features.parquet
DataFrame com features lido com sucesso.

Schema do DataFrame lido:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 656762 entries, 0 to 656761
Columns: 283 entries, internal_store_id to std_quantidade_pedidos_last_18_weeks
dtypes: float64(238), int32(4), int64(29), object(12)
memory usage: 1.4+ GB

Volume da Base de Features:
Linhas: 656762
Colunas: 283

Primeiras 5 linhas do DataFrame lido:


Unnamed: 0,internal_store_id,week_rank,premise,categoria_pdv,zipcode,internal_product_id,reference_date,gross_value,net_value,gross_profit,discount,taxes,categoria,tipos,label,subcategoria,marca,fabricante,week_of_month,global_week_id,quantity,quantidade_pedidos,last_order_week,tempo_ultimo_pedido,distinct_products_last_1_weeks,distinct_products_last_3_weeks,distinct_products_last_6_weeks,distinct_products_last_9_weeks,distinct_products_last_12_weeks,distinct_products_last_15_weeks,distinct_products_last_18_weeks,Vendas_Anteriores_lag_1,Vendas_Anteriores_lag_3,Vendas_Anteriores_lag_6,Vendas_Anteriores_lag_9,Vendas_Anteriores_lag_12,Vendas_Anteriores_lag_15,Vendas_Anteriores_lag_18,sum_quantity_last_1_weeks,mean_quantity_last_1_weeks,min_quantity_last_1_weeks,max_quantity_last_1_weeks,std_quantity_last_1_weeks,sum_gross_value_last_1_weeks,mean_gross_value_last_1_weeks,min_gross_value_last_1_weeks,max_gross_value_last_1_weeks,std_gross_value_last_1_weeks,sum_net_value_last_1_weeks,mean_net_value_last_1_weeks,min_net_value_last_1_weeks,max_net_value_last_1_weeks,std_net_value_last_1_weeks,sum_gross_profit_last_1_weeks,mean_gross_profit_last_1_weeks,min_gross_profit_last_1_weeks,max_gross_profit_last_1_weeks,std_gross_profit_last_1_weeks,sum_discount_last_1_weeks,mean_discount_last_1_weeks,min_discount_last_1_weeks,max_discount_last_1_weeks,std_discount_last_1_weeks,sum_taxes_last_1_weeks,mean_taxes_last_1_weeks,min_taxes_last_1_weeks,max_taxes_last_1_weeks,std_taxes_last_1_weeks,sum_quantidade_pedidos_last_1_weeks,mean_quantidade_pedidos_last_1_weeks,min_quantidade_pedidos_last_1_weeks,max_quantidade_pedidos_last_1_weeks,std_quantidade_pedidos_last_1_weeks,sum_quantity_last_3_weeks,mean_quantity_last_3_weeks,min_quantity_last_3_weeks,max_quantity_last_3_weeks,std_quantity_last_3_weeks,sum_gross_value_last_3_weeks,mean_gross_value_last_3_weeks,min_gross_value_last_3_weeks,max_gross_value_last_3_weeks,std_gross_value_last_3_weeks,sum_net_value_last_3_weeks,mean_net_value_last_3_weeks,min_net_value_last_3_weeks,max_net_value_last_3_weeks,std_net_value_last_3_weeks,sum_gross_profit_last_3_weeks,mean_gross_profit_last_3_weeks,min_gross_profit_last_3_weeks,max_gross_profit_last_3_weeks,std_gross_profit_last_3_weeks,sum_discount_last_3_weeks,mean_discount_last_3_weeks,min_discount_last_3_weeks,max_discount_last_3_weeks,std_discount_last_3_weeks,sum_taxes_last_3_weeks,mean_taxes_last_3_weeks,min_taxes_last_3_weeks,max_taxes_last_3_weeks,std_taxes_last_3_weeks,sum_quantidade_pedidos_last_3_weeks,mean_quantidade_pedidos_last_3_weeks,min_quantidade_pedidos_last_3_weeks,max_quantidade_pedidos_last_3_weeks,std_quantidade_pedidos_last_3_weeks,sum_quantity_last_6_weeks,mean_quantity_last_6_weeks,min_quantity_last_6_weeks,max_quantity_last_6_weeks,std_quantity_last_6_weeks,sum_gross_value_last_6_weeks,mean_gross_value_last_6_weeks,min_gross_value_last_6_weeks,max_gross_value_last_6_weeks,std_gross_value_last_6_weeks,sum_net_value_last_6_weeks,mean_net_value_last_6_weeks,min_net_value_last_6_weeks,max_net_value_last_6_weeks,std_net_value_last_6_weeks,sum_gross_profit_last_6_weeks,mean_gross_profit_last_6_weeks,min_gross_profit_last_6_weeks,max_gross_profit_last_6_weeks,std_gross_profit_last_6_weeks,sum_discount_last_6_weeks,mean_discount_last_6_weeks,min_discount_last_6_weeks,max_discount_last_6_weeks,std_discount_last_6_weeks,sum_taxes_last_6_weeks,mean_taxes_last_6_weeks,min_taxes_last_6_weeks,max_taxes_last_6_weeks,std_taxes_last_6_weeks,sum_quantidade_pedidos_last_6_weeks,mean_quantidade_pedidos_last_6_weeks,min_quantidade_pedidos_last_6_weeks,max_quantidade_pedidos_last_6_weeks,std_quantidade_pedidos_last_6_weeks,sum_quantity_last_9_weeks,mean_quantity_last_9_weeks,min_quantity_last_9_weeks,max_quantity_last_9_weeks,std_quantity_last_9_weeks,sum_gross_value_last_9_weeks,mean_gross_value_last_9_weeks,min_gross_value_last_9_weeks,max_gross_value_last_9_weeks,std_gross_value_last_9_weeks,sum_net_value_last_9_weeks,mean_net_value_last_9_weeks,min_net_value_last_9_weeks,max_net_value_last_9_weeks,std_net_value_last_9_weeks,sum_gross_profit_last_9_weeks,mean_gross_profit_last_9_weeks,min_gross_profit_last_9_weeks,max_gross_profit_last_9_weeks,std_gross_profit_last_9_weeks,sum_discount_last_9_weeks,mean_discount_last_9_weeks,min_discount_last_9_weeks,max_discount_last_9_weeks,std_discount_last_9_weeks,sum_taxes_last_9_weeks,mean_taxes_last_9_weeks,min_taxes_last_9_weeks,max_taxes_last_9_weeks,std_taxes_last_9_weeks,sum_quantidade_pedidos_last_9_weeks,mean_quantidade_pedidos_last_9_weeks,min_quantidade_pedidos_last_9_weeks,max_quantidade_pedidos_last_9_weeks,std_quantidade_pedidos_last_9_weeks,sum_quantity_last_12_weeks,mean_quantity_last_12_weeks,min_quantity_last_12_weeks,max_quantity_last_12_weeks,std_quantity_last_12_weeks,sum_gross_value_last_12_weeks,mean_gross_value_last_12_weeks,min_gross_value_last_12_weeks,max_gross_value_last_12_weeks,std_gross_value_last_12_weeks,sum_net_value_last_12_weeks,mean_net_value_last_12_weeks,min_net_value_last_12_weeks,max_net_value_last_12_weeks,std_net_value_last_12_weeks,sum_gross_profit_last_12_weeks,mean_gross_profit_last_12_weeks,min_gross_profit_last_12_weeks,max_gross_profit_last_12_weeks,std_gross_profit_last_12_weeks,sum_discount_last_12_weeks,mean_discount_last_12_weeks,min_discount_last_12_weeks,max_discount_last_12_weeks,std_discount_last_12_weeks,sum_taxes_last_12_weeks,mean_taxes_last_12_weeks,min_taxes_last_12_weeks,max_taxes_last_12_weeks,std_taxes_last_12_weeks,sum_quantidade_pedidos_last_12_weeks,mean_quantidade_pedidos_last_12_weeks,min_quantidade_pedidos_last_12_weeks,max_quantidade_pedidos_last_12_weeks,std_quantidade_pedidos_last_12_weeks,sum_quantity_last_15_weeks,mean_quantity_last_15_weeks,min_quantity_last_15_weeks,max_quantity_last_15_weeks,std_quantity_last_15_weeks,sum_gross_value_last_15_weeks,mean_gross_value_last_15_weeks,min_gross_value_last_15_weeks,max_gross_value_last_15_weeks,std_gross_value_last_15_weeks,sum_net_value_last_15_weeks,mean_net_value_last_15_weeks,min_net_value_last_15_weeks,max_net_value_last_15_weeks,std_net_value_last_15_weeks,sum_gross_profit_last_15_weeks,mean_gross_profit_last_15_weeks,min_gross_profit_last_15_weeks,max_gross_profit_last_15_weeks,std_gross_profit_last_15_weeks,sum_discount_last_15_weeks,mean_discount_last_15_weeks,min_discount_last_15_weeks,max_discount_last_15_weeks,std_discount_last_15_weeks,sum_taxes_last_15_weeks,mean_taxes_last_15_weeks,min_taxes_last_15_weeks,max_taxes_last_15_weeks,std_taxes_last_15_weeks,sum_quantidade_pedidos_last_15_weeks,mean_quantidade_pedidos_last_15_weeks,min_quantidade_pedidos_last_15_weeks,max_quantidade_pedidos_last_15_weeks,std_quantidade_pedidos_last_15_weeks,sum_quantity_last_18_weeks,mean_quantity_last_18_weeks,min_quantity_last_18_weeks,max_quantity_last_18_weeks,std_quantity_last_18_weeks,sum_gross_value_last_18_weeks,mean_gross_value_last_18_weeks,min_gross_value_last_18_weeks,max_gross_value_last_18_weeks,std_gross_value_last_18_weeks,sum_net_value_last_18_weeks,mean_net_value_last_18_weeks,min_net_value_last_18_weeks,max_net_value_last_18_weeks,std_net_value_last_18_weeks,sum_gross_profit_last_18_weeks,mean_gross_profit_last_18_weeks,min_gross_profit_last_18_weeks,max_gross_profit_last_18_weeks,std_gross_profit_last_18_weeks,sum_discount_last_18_weeks,mean_discount_last_18_weeks,min_discount_last_18_weeks,max_discount_last_18_weeks,std_discount_last_18_weeks,sum_taxes_last_18_weeks,mean_taxes_last_18_weeks,min_taxes_last_18_weeks,max_taxes_last_18_weeks,std_taxes_last_18_weeks,sum_quantidade_pedidos_last_18_weeks,mean_quantidade_pedidos_last_18_weeks,min_quantidade_pedidos_last_18_weeks,max_quantidade_pedidos_last_18_weeks,std_quantidade_pedidos_last_18_weeks
0,3323032207893061290,45,Off Premise,Package/Liquor,80517.0,7162840612005571065,2022-10-01,35.779999,34.725506,1.418839,1.93,1.054494,Distilled Spirits,Distilled Spirits,Allocated,Bourbon Whiskey,Buffalo Trace Kentucky Straight Bourbon Whiskey,Sazerac Spirits,1,2022-10-1,1.0,1,-1,-1,-1,3,11,22,26,29,33,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0
1,1340525425503310227,45,On Premise,Mexican Rest,30350.0,6358449271208655162,2022-10-01,106.0,98.676132,46.212132,0.0,7.32387,Draft,Draft,Core,Lager / Pilsner,Michelob Ultra,AB Anheuser Busch Inc,1,2022-10-1,2.0,1,38,7,-1,1,3,4,4,5,5,1.0,2.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,1.0,1.0,1.0,1.0,-1.0,55.0,55.0,55.0,55.0,-1.0,51.338066,51.338066,51.338066,51.338066,-1.0,25.106066,25.106066,25.106066,25.106066,-1.0,0.0,0.0,0.0,0.0,-1.0,3.661935,3.661935,3.661935,3.661935,-1.0,1,1.0,1,1,-1.0,1.0,1.0,1.0,1.0,-1.0,55.0,55.0,55.0,55.0,-1.0,51.338066,51.338066,51.338066,51.338066,-1.0,25.106066,25.106066,25.106066,25.106066,-1.0,0.0,0.0,0.0,0.0,-1.0,3.661935,3.661935,3.661935,3.661935,-1.0,1,1.0,1,1,-1.0,1.0,1.0,1.0,1.0,-1.0,55.0,55.0,55.0,55.0,-1.0,51.338066,51.338066,51.338066,51.338066,-1.0,25.106066,25.106066,25.106066,25.106066,-1.0,0.0,0.0,0.0,0.0,-1.0,3.661935,3.661935,3.661935,3.661935,-1.0,1,1.0,1,1,-1.0,1.0,1.0,1.0,1.0,-1.0,55.0,55.0,55.0,55.0,-1.0,51.338066,51.338066,51.338066,51.338066,-1.0,25.106066,25.106066,25.106066,25.106066,-1.0,0.0,0.0,0.0,0.0,-1.0,3.661935,3.661935,3.661935,3.661935,-1.0,1,1.0,1,1,-1.0
2,4895110631059629750,41,Off Premise,Package/Liquor,80116.0,6979267524236536577,2022-09-01,39.599998,39.599998,-21.120001,81.599998,0.0,Distilled Spirits,Allocated Spirits,,Liqueurs & Cordials,Ryan's Irish Cream Liqueur,Sazerac Spirits,2,2022-09-2,10.0,1,-1,-1,3,8,16,18,26,31,43,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0
3,3633790490745603955,29,Off Premise,Package/Liquor,30005.0,8022696847157343453,2022-06-01,28.65,26.370241,8.970242,0.0,2.279758,Package,Package,Core,IPA,New Belgium Voodoo Ranger IPA,NB New Belgium,5,2022-06-5,1.0,1,-1,-1,3,6,10,18,21,23,27,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0
4,3089889927683582952,48,Off Premise,Convenience,80132.0,1271951761907094129,2022-10-01,47.450001,46.962257,11.274258,7.9,0.487742,Package,Package,Core,Lager,Budweiser,AB Anheuser Busch Inc,4,2022-10-4,2.0,1,-1,-1,-1,4,6,8,8,9,12,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1,-1.0,-1,-1,-1.0



Criada uma amostra de 100.0% do DataFrame.
Volume da amostra: 656762 linhas

DataFrame original removido da memória.


In [15]:
import datetime
from datetime import date

df_features_read_encoded_pandas_sample.fillna(-1, inplace = True)

# Dicionário de parâmetros para o Catboost
catboost_params = {
    'task_type': 'GPU',
    'iterations': 1500,                  # Aumentado para o early stopping
    'learning_rate': 0.10,               # Valor mais baixo para maior precisão
    'depth': 6,                          # Maior profundidade para capturar padrões complexos
    'early_stopping_rounds': 50,
    'loss_function': 'RMSE',
    'l2_leaf_reg': 3,
    'min_data_in_leaf': 1500,
    'verbose': 0,
    'random_state': 42,

    # --- Parâmetros Otimizados Adicionados Posteriormente ---
    # 'subsample': 0.8,                    # Usa 80% dos dados em cada árvore
    # 'colsample_bylevel': 0.8,            # Usa 80% das colunas em cada nível da árvore
    'random_strength': 1.0,              # Aumenta a aleatoriedade para evitar overfitting
    # 'bootstrap_type': 'Bayesian',       # Método de amostragem eficiente
}

# Parâmetros de configuração do pipeline
TARGET = 'quantity'
IDENTIFIERS = ['internal_store_id', 'internal_product_id', 'rank', 'global_week_id']
IGNORE_FEATURES = [
    'pdv',
    'produto',
    'distributor_id',
    'transaction_date',
    'descricao',
    'reference_date',
    ]
CATEGORICAL_FEATURES = [
    'premise',
    'categoria_pdv',
    'zipcode',
    'categoria',
    'tipos',
    'label',
    'subcategoria',
    'marca',
    'fabricante',
    'week_of_month',
]
filter_date = date(2022, 11, 1)

# Lista de todas as features candidatas, excluindo o target e os identificadores
ALL_FEATURES = [col for col in df_features_read_encoded_pandas_sample.columns if col not in [TARGET] + IDENTIFIERS + IGNORE_FEATURES]

In [21]:
import pandas as pd
import numpy as np
from catboost import CatBoostRegressor
import os
from sklearn.model_selection import train_test_split
from tqdm.auto import tqdm
import warnings

# Ignora avisos
warnings.filterwarnings('ignore')

def calculate_wmape(y_true: pd.Series, y_pred: pd.Series) -> float:
    """Calcula o Weighted Mean Absolute Percentage Error (WMAPE) para DataFrames Pandas."""
    if y_true.sum() == 0:
        return float('inf')
    return np.sum(np.abs(y_true - y_pred)) / np.sum(y_true)


def train_final_model(
    df: pd.DataFrame,
    selected_features: list,
    categorical_features: list,
    catboost_params: dict,
    label_col: str,
    reference_date_for_validation
    ):
    """
    Treina o modelo final CatBoost com as features selecionadas.

    Args:
        df (pd.DataFrame): DataFrame de entrada com features e target.
        selected_features (list): Lista de nomes das features selecionadas.
        categorical_features (list): Lista de nomes das features categóricas.
        catboost_params (dict): Dicionário de parâmetros para o CatBoostRegressor.
        label_col (str): Nome da coluna target.
        reference_date_for_validation (int): data de referencia que será usada na validação out-of-time.

    Returns:
        CatBoostRegressor: O modelo CatBoost treinado.
    """
    print("\n--- Iniciando Treinamento do Modelo Final ---")

    # Garantir que as features selecionadas e o target estão no DataFrame
    required_cols = selected_features + [label_col]
    if not all(col in df.columns for col in required_cols):
        missing = [col for col in required_cols if col not in df.columns]
        print(f"Erro: As seguintes colunas necessárias não estão no DataFrame: {missing}")
        return None

    # Divide os dados em treino e validação (out-of-time)
    df['reference_date'] = pd.to_datetime(df['reference_date']).dt.date

    df_train = df[df["reference_date"] <= reference_date_for_validation]
    df_val = df[df["reference_date"] > reference_date_for_validation]

    print(f"Tamanho do conjunto de treino: {len(df_train)} linhas")
    print(f"Tamanho do conjunto de validação: {len(df_val)} linhas")

    # Define X e y para treino e validação
    X_train = df_train[selected_features]
    y_train = df_train[label_col]
    X_val = df_val[selected_features]
    y_val = df_val[label_col]

    # Identifica features categóricas presentes no conjunto de features selecionadas
    categorical_features_in_selection = [f for f in selected_features if f in categorical_features]

    # Converte as colunas categóricas para o tipo 'category' no Pandas
    for col_name in categorical_features_in_selection:
        X_train[col_name] = X_train[col_name].astype(str)
        X_val[col_name] = X_val[col_name].astype(str)


    print(f"Features categóricas na seleção: {categorical_features_in_selection}")

    # Inicializa e treina o modelo CatBoost
    model = CatBoostRegressor(**catboost_params)

    print("\nTreinando o modelo...")
    model.fit(X_train, y_train,
              eval_set=(X_val, y_val),
              cat_features=categorical_features_in_selection,
              early_stopping_rounds=catboost_params.get('early_stopping_rounds', 25),
              verbose=catboost_params.get('verbose', 0))

    print("\n--- Treinamento Concluído ---")

    # Avalia o modelo no conjunto de treino
    y_pred_train = model.predict(X_train)
    wmape_train = calculate_wmape(y_train, y_pred_train)
    print(f"WMAPE no conjunto de treino: {wmape_train:.4f}")

    # Avalia o modelo no conjunto de validação
    y_pred_val = model.predict(X_val)
    wmape_val = calculate_wmape(y_val, y_pred_val)
    print(f"WMAPE no conjunto de validação: {wmape_val:.4f}")

    # Calcula WMAPE no dataset completo
    X_full = df[selected_features]
    y_full = df[label_col]
    for col_name in categorical_features_in_selection:
        X_full[col_name] = X_full[col_name].astype(str)
    y_pred_full = model.predict(X_full)
    wmape_full = calculate_wmape(y_full, y_pred_full)
    print(f"WMAPE no dataset completo: {wmape_full:.4f}")

    # Calcula a diferença absoluta entre WMAPE de treino e validação
    wmape_diff_abs = abs(wmape_val - wmape_train)
    print(f"Diferença absoluta entre WMAPE de treino e validação: {wmape_diff_abs:.4f}")


    return model

In [22]:
# Chamar a função de treinamento
final_model = train_final_model(
    df=df_features_read_encoded_pandas_sample,
    selected_features=features_selecionadas,
    categorical_features=CATEGORICAL_FEATURES,
    catboost_params=catboost_params,
    label_col=TARGET,
    reference_date_for_validation=filter_date
)

if final_model:
    print("\nModelo final treinado com sucesso!")
    # Aqui você pode adicionar código para salvar o modelo, se necessário
    # Ex: final_model.save_model("catboost_final_model.cbm")
else:
    print("\nFalha no treinamento do modelo final.")


--- Iniciando Treinamento do Modelo Final ---
Tamanho do conjunto de treino: 900396 linhas
Tamanho do conjunto de validação: 82461 linhas
Features categóricas na seleção: ['marca', 'zipcode', 'week_of_month', 'fabricante', 'premise', 'categoria_pdv', 'label', 'tipos', 'subcategoria']

Treinando o modelo...

--- Treinamento Concluído ---
WMAPE no conjunto de treino: 0.4191
WMAPE no conjunto de validação: 0.3956
WMAPE no dataset completo: 0.4179
Diferença absoluta entre WMAPE de treino e validação: 0.0235

Modelo final treinado com sucesso!


In [23]:
import os

# Define o caminho para salvar o modelo
model_save_path = os.path.join(project_root, "models", "catboost_final_model.cbm")

# Verifica se o diretório de modelos existe, caso contrário, cria-o
model_dir = os.path.dirname(model_save_path)
if not os.path.exists(model_dir):
    os.makedirs(model_dir)
    print(f"Created model directory: {model_dir}")

# Salva o modelo treinado
if 'final_model' in locals() and final_model is not None:
    try:
        final_model.save_model(model_save_path)
        print(f"\nModelo CatBoost final salvo com sucesso em: '{model_save_path}'\n")
    except Exception as e:
        print(f"\nErro ao salvar o modelo: {e}")
else:
    print("\nErro: O modelo final não foi encontrado. Por favor, execute a célula de treinamento do modelo.\n")


Modelo CatBoost final salvo com sucesso em: '/content/Big-Data-forecast/models/catboost_final_model.cbm'



# 6 - Pipeline e Scoragem da Base Completa

Após o processo rigoroso de engenharia e seleção de features, é hora de colocar o modelo para trabalhar na base de dados completa de 2022.

Nesta fase, faremos o seguinte:

**Carregar** o modelo final treinado no passo anterior.

**Preparar** a base de dados completa de 2022, aplicando todas as features que selecionamos.

**Scorar** cada registro da base completa, gerando a previsão para cada semana, produto e pdv de 2022.

**Calcular** o WMAPE para toda a base. Esta métrica nos dará a visão real da performance do nosso modelo e o quão bem ele se generaliza para os dados que ele viu durante o treinamento, o que é um ótimo indicador de estabilidade.

Ao final desta etapa, teremos uma visão clara da performance do nosso modelo e a confiança necessária para fazer as previsões para o futuro de 2023.

In [24]:
# 1. Load the data

import os
from pyspark.sql import SparkSession

# Define o caminho para o DataFrame consolidado
project_root_base = '/content/drive/MyDrive/Hackaton - Big Data'
input_path_consolidada = os.path.join(project_root_base, "data", "processed", "base_consolidada.parquet")

# Inicializa a sessão Spark (se ainda não estiver inicializada)
try:
    spark
except NameError:
    spark = SparkSession.builder.appName("ScoringPipeline").getOrCreate()

# Verifica se o arquivo existe antes de tentar ler
if not os.path.exists(input_path_consolidada):
    print(f"Erro: Arquivo '{input_path_consolidada}' não encontrado.")
    print("Verifique se o 01_load_data.py foi executado e se o caminho está correto.")
else:
    # Carrega o DataFrame consolidado com o Spark
    print(f"\nCarregando o DataFrame consolidado de: {input_path_consolidada}\n")
    df_consolidada_score = spark.read.parquet(input_path_consolidada)
    print("DataFrame consolidado carregado com sucesso.")


Carregando o DataFrame consolidado de: /content/drive/MyDrive/Hackaton - Big Data/data/processed/base_consolidada.parquet

DataFrame consolidado carregado com sucesso.


In [25]:
# 2. Feature Engineering (com imports e setup de ambiente explicitos)

import os
import sys
from pyspark.sql import SparkSession

# Define a raiz do projeto
src_path = os.path.join(project_root, 'src')

# Adiciona a pasta 'src' ao caminho de busca do Python
if src_path not in sys.path:
    sys.path.append(src_path)
    print(f"Adicionado '{src_path}' ao sys.path")
else:
    print(f"'{src_path}' já está no sys.path")


# Inicializa a sessão Spark (se ainda não estiver inicializada)
try:
    spark
except NameError:
    spark = SparkSession.builder.appName("ScoringPipeline").getOrCreate()
    print("Sessão Spark iniciada.")
else:
    print("Sessão Spark já inicializada.")


# Adiciona o arquivo Python ao SparkContext (necessário para funções UDF ou modulos em src)
# Verifique se o arquivo _02_feature_engineering.py existe antes de adicionar
feature_engineering_script_path = os.path.join(project_root, 'src', '_02_feature_engineering.py')
if os.path.exists(feature_engineering_script_path):
    spark.sparkContext.addPyFile(feature_engineering_script_path)
    print(f"Adicionado '{feature_engineering_script_path}' ao SparkContext")
else:
    print(f"Erro: Arquivo '{feature_engineering_script_path}' não encontrado. Não foi possível adicionar ao SparkContext.")


# Importa a função feature_engineering_pipeline APÓS adicionar o arquivo ao SparkContext
try:
    from _02_feature_engineering import feature_engineering_pipeline
    print("Função 'feature_engineering_pipeline' importada com sucesso.")
except ImportError as e:
    print(f"Erro ao importar 'feature_engineering_pipeline': {e}")
    print("Verifique se o arquivo '_02_feature_engineering.py' existe na pasta 'src' e se o caminho foi adicionado corretamente.")


# Define os parâmetros de feature engineering
week_windows_to_consider = [1, 3, 6, 9, 12, 15, 18]
value_cols_to_aggregate = ['quantity', 'gross_value', 'net_value', 'gross_profit', 'discount', 'taxes', 'quantidade_pedidos']
aggregation_functions_to_apply = ['count', 'sum', 'mean', 'min', 'max', 'std']


print("\nAplicando feature engineering ao DataFrame consolidado para scoring...")

# Aplica a pipeline de feature engineering
# Certifique-se que df_consolidada_score foi carregado em uma célula anterior
if 'df_consolidada_score' in locals():
    df_scored_features = feature_engineering_pipeline(
        df_consolidada_score,
        week_windows_to_consider,
        value_cols_to_aggregate,
        aggregation_functions_to_apply,
        spark
    )

    print("\nFeature engineering concluída para scoring.")
    # print("Schema do DataFrame com features para scoring:")
    # df_scored_features.printSchema()
else:
    print("Erro: DataFrame 'df_consolidada_score' não encontrado. Por favor, carregue os dados antes de aplicar a feature engineering.")

'/content/Big-Data-forecast/src' já está no sys.path
Sessão Spark já inicializada.
Adicionado '/content/Big-Data-forecast/src/_02_feature_engineering.py' ao SparkContext
Função 'feature_engineering_pipeline' importada com sucesso.

Aplicando feature engineering ao DataFrame consolidado para scoring...
Criando identificador de semana global e rank...
Criando identificador de semana global e rank...
Calculando tempo_ultimo_pedido...
Calculando features defasadas e de janela móvel de forma otimizada...
Calculando contagem distinta de produtos por semana...

Feature engineering concluída para scoring.


In [27]:
# 3. Seleção de Features (Filtragem)

# Define o caminho para o arquivo de features selecionadas
features_file_path = os.path.join(project_root, "models", "features_selecionadas.txt")
identifiers = ['internal_store_id', 'internal_product_id', 'rank']


# Verifica se o arquivo existe e lê as features
if not os.path.exists(features_file_path):
    print(f"Erro: Arquivo '{features_file_path}' não encontrado.")
    print("Por favor, verifique se a seleção de features foi executada e o arquivo foi salvo.")
    features_selecionadas = [] # Inicializa como uma lista vazia para evitar NameError
else:
    with open(features_file_path, "r") as f:
        features_selecionadas = [line.strip() for line in f if line.strip()]

    print(f"\nFeatures selecionadas lidas do arquivo: {len(features_selecionadas)} features")

    # Filtra o DataFrame scored para manter apenas as features selecionadas e a coluna target
    # Assumindo que a coluna target é 'quantity' conforme definido anteriormente
    TARGET = 'quantity' # Garante que TARGET seja definido se não estiver globalmente disponível

    # Adiciona a coluna target à lista de colunas para selecionar
    cols_to_select = identifiers + features_selecionadas + [TARGET]

    # Filtra quaisquer features selecionadas que possam não estar presentes no DataFrame scored
    # Isso pode acontecer se a engenharia de features falhou para algumas features
    available_cols_to_select = [col for col in cols_to_select if col in df_scored_features.columns]
    missing_cols = [col for col in cols_to_select if col not in df_scored_features.columns]

    if missing_cols:
        print(f"Aviso: As seguintes features selecionadas estão faltando no DataFrame scored: {missing_cols}")

    if available_cols_to_select:
        df_scored_features_selected = df_scored_features.select(available_cols_to_select)
        print("\nDataFrame com as features selecionadas criado.")
        print("Schema do DataFrame com as features selecionadas:")
        df_scored_features_selected.printSchema()

        # Define o caminho de saída para o DataFrame filtrado
        project_root_base = '/content/drive/MyDrive/Hackaton - Big Data'
        output_path_filtered = os.path.join(project_root_base, "data", "processed", "base_features_filtered.parquet")

        # Salva o DataFrame Spark filtrado como um arquivo Parquet
        print(f"\nSalvando o DataFrame filtrado em: {output_path_filtered}")
        df_scored_features_selected.write.mode("overwrite").parquet(output_path_filtered)
        print("DataFrame filtrado salvo com sucesso.")

    else:
        print("\nErro: Nenhuma feature selecionada está disponível no DataFrame scored.")
        df_scored_features_selected = None # Define como None se nenhuma feature for selecionada ou estiver disponível


Features selecionadas lidas do arquivo: 46 features

DataFrame with selected features created.
Schema of the DataFrame with selected features:
root
 |-- internal_store_id: string (nullable = true)
 |-- internal_product_id: string (nullable = true)
 |-- rank: integer (nullable = false)
 |-- mean_discount_last_15_weeks: double (nullable = false)
 |-- min_quantity_last_9_weeks: double (nullable = false)
 |-- std_taxes_last_18_weeks: double (nullable = false)
 |-- min_quantity_last_12_weeks: double (nullable = false)
 |-- std_taxes_last_12_weeks: double (nullable = false)
 |-- max_quantity_last_18_weeks: double (nullable = false)
 |-- std_gross_value_last_15_weeks: double (nullable = false)
 |-- min_quantity_last_15_weeks: double (nullable = false)
 |-- Vendas_Anteriores_lag_3: double (nullable = false)
 |-- marca: string (nullable = true)
 |-- Vendas_Anteriores_lag_1: double (nullable = false)
 |-- min_quantity_last_6_weeks: double (nullable = false)
 |-- sum_quantity_last_15_weeks: doub

In [28]:
!pip install catboost



In [29]:
# 4. Load the Model

import os
from catboost import CatBoostRegressor

# Define o caminho para o modelo salvo
model_load_path = os.path.join(project_root, "models", "catboost_final_model.cbm")

# Verifica se o arquivo do modelo existe antes de tentar carregar
if not os.path.exists(model_load_path):
    print(f"Erro: Arquivo do modelo '{model_load_path}' não encontrado.")
    print("Por favor, verifique se o modelo foi treinado e salvo corretamente.")
    loaded_model = None # Initialize as None to avoid NameError
else:
    print(f"\nCarregando o modelo CatBoost de: '{model_load_path}'\n")
    try:
        # Inicializa um modelo CatBoost vazio e carrega os parâmetros do arquivo
        loaded_model = CatBoostRegressor()
        loaded_model.load_model(model_load_path)
        print("Modelo CatBoost carregado com sucesso.")
    except Exception as e:
        print(f"Erro ao carregar o modelo: {e}")
        loaded_model = None # Set to None if loading fails


Carregando o modelo CatBoost de: '/content/Big-Data-forecast/models/catboost_final_model.cbm'

Modelo CatBoost carregado com sucesso.


In [30]:
import pandas as pd

def apply_category_grouping(df: pd.DataFrame, grouped_categories_map: dict) -> pd.DataFrame:
    """
    Aplica o agrupamento de categorias a um DataFrame com base em um mapeamento pré-definido.

    Args:
        df (pd.DataFrame): O DataFrame de entrada para transformar.
        grouped_categories_map (dict): Um dicionário que mapeia nomes de colunas para a lista de
                                       categorias a serem agrupadas em 'Outros'.

    Returns:
        pd.DataFrame: O DataFrame com as categorias agrupadas de acordo com o mapa.
    """
    df_transformed = df.copy()

    for col, categories_to_group in grouped_categories_map.items():
        if col in df_transformed.columns:
            print(f"Aplicando agrupamento à coluna: {col}")
            # Substitui as categorias especificadas por 'Outros'
            df_transformed[col] = df_transformed[col].replace(categories_to_group, 'Outros')
        else:
            print(f"Aviso: Coluna '{col}' não encontrada no DataFrame para a transformação.")

    return df_transformed

In [33]:
import pickle
import os

# Define o caminho para o mapa de categorias agrupadas salvo
grouped_map_save_path = os.path.join(project_root, "models", "grouped_categories_map.pkl")

# Verifica se o arquivo existe antes de tentar carregar
if not os.path.exists(grouped_map_save_path):
    print(f"Erro: Arquivo do mapa de categorias agrupadas '{grouped_map_save_path}' não encontrado.")
    print("Por favor, verifique se o mapa foi salvo com sucesso.")
    loaded_grouped_map = None # Inicializa como None para evitar NameError
else:
    print(f"\nCarregando mapa de categorias agrupadas de: '{grouped_map_save_path}'\n")
    try:
        # Carrega o dicionário usando pickle
        with open(grouped_map_save_path, 'rb') as f:
            loaded_grouped_map = pickle.load(f)
        print("Mapa de categorias agrupadas carregado com sucesso.\n")
        # Exibe o mapa carregado (opcional)
        # print("\nMapa de categorias agrupadas carregado:")
        # print(loaded_grouped_map)
    except Exception as e:
        print(f"\nErro ao carregar o mapa de categorias agrupadas: {e}")
        loaded_grouped_map = None # Define como None se o carregamento falhar


Carregando mapa de categorias agrupadas de: '/content/Big-Data-forecast/models/grouped_categories_map.pkl'

Mapa de categorias agrupadas carregado com sucesso.



In [34]:
# Adiciona o arquivo Python ao SparkContext (necessário para funções UDF ou modulos em src)
# Verifique se o arquivo _04_select_features.py existe antes de adicionar
feature_engineering_script_path = os.path.join(project_root, 'src', '_04_select_features.py')
if os.path.exists(feature_engineering_script_path):
    spark.sparkContext.addPyFile(feature_engineering_script_path)
    print(f"Adicionado '{feature_engineering_script_path}' ao SparkContext")
else:
    print(f"Erro: Arquivo '{feature_engineering_script_path}' não encontrado. Não foi possível adicionar ao SparkContext.")


# Importa a função feature_engineering_pipeline APÓS adicionar o arquivo ao SparkContext
try:
    from _04_select_features import calculate_wmape

    print("Função 'calculate_wmape' importada com sucesso.")
except ImportError as e:
    print(f"Erro ao importar 'calculate_wmape': {e}")
    print("Verifique se o arquivo '_04_select_features.py' existe na pasta 'src' e se o caminho foi adicionado corretamente.")


Adicionado '/content/Big-Data-forecast/src/_04_select_features.py' ao SparkContext
Função 'calculate_wmape' importada com sucesso.


In [35]:

import pandas as pd
import numpy as np
import os

# Define o caminho para a pasta contendo os arquivos parquet filtrados
input_path_filtered_parquet = os.path.join(project_root_base, "data", "processed", "base_features_filtered.parquet")

# Verifica se o diretório parquet filtrado existe antes de tentar ler
if not os.path.exists(input_path_filtered_parquet):
    print(f"Erro: Diretório '{input_path_filtered_parquet}' não encontrado.")
    print("Por favor, verifique se a base filtrada foi salva corretamente.")
    df_scored_features_pandas = None
else:
    print(f"\nLendo base filtrada de: {input_path_filtered_parquet} diretamente com pandas\n")
    try:
        # Lê os arquivos parquet diretamente para um DataFrame Pandas
        # Inclui as colunas de identificação e week_rank
        df_scored_features_pandas = pd.read_parquet(input_path_filtered_parquet, engine='pyarrow')
        df_scored_features_pandas.fillna(-1, inplace = True)
        df_scored_features_pandas = apply_category_grouping(df_scored_features_pandas, loaded_grouped_map)
        print("DataFrame com features filtradas lido com sucesso (Pandas).")

        # Opcional: Mostra schema e algumas linhas para verificação
        print("\nSchema do DataFrame com features filtradas:")
        df_scored_features_pandas.info()
        print("\nPrimeiras 5 linhas do DataFrame com features filtradas:")
        display(df_scored_features_pandas.head())

    except Exception as e:
        print(f"Erro ao ler arquivos parquet com pandas: {e}")
        df_scored_features_pandas = None


# Procede com a scoragem se o DataFrame pandas foi carregado com sucesso e o modelo está disponível
if loaded_model is None:
    print("\nErro: O modelo não foi carregado. Não é possível realizar a scoragem.")
elif df_scored_features_pandas is None:
    print("\nErro: O DataFrame com features filtradas não foi criado. Não é possível realizar a scoragem.")
else:
    print("\nIniciando a scoragem do DataFrame...")

    try:
        CATEGORICAL_FEATURES = [
            'premise',
            'categoria_pdv',
            'zipcode',
            'categoria',
            'tipos',
            'label',
            'subcategoria',
            'marca',
            'fabricante',
            'week_of_month',
            'global_week_id',
        ]
        # Identifica as features categóricas no DataFrame pandas e converte seu tipo
        # Garante que CATEGORICAL_FEATURES esteja definido (deve ser de células anteriores)
        categorical_features_in_scoring = [f for f in features_selecionadas if f in df_scored_features_pandas.columns and f in CATEGORICAL_FEATURES and f not in identifiers]
        for col_name in categorical_features_in_scoring:
            if col_name in df_scored_features_pandas.columns: # Verifica novamente se a coluna existe após a filtragem
                df_scored_features_pandas[col_name] = df_scored_features_pandas[col_name].astype(str)
        print(f"Features categóricas identificadas para scoragem: {categorical_features_in_scoring}")


        # Faz previsões usando o modelo CatBoost carregado
        print("Gerando previsões com o modelo CatBoost...")
        # Garante que as colunas em df_scored_features_pandas estejam na mesma ordem que features_selecionadas
        # Embora o CatBoost lide com a ordem das features internamente, selecionar explicitamente pode evitar problemas
        predictions = loaded_model.predict(df_scored_features_pandas[features_selecionadas]) # Prediz usando as features selecionadas
        print("Previsões geradas.")

        # Adiciona as previsões como uma nova coluna ao DataFrame pandas
        df_scored_features_pandas['prediction'] = predictions
        df_scored_features_pandas['prediction'] = np.where(df_scored_features_pandas['prediction'] < 0, 0, df_scored_features_pandas['prediction'].round().astype(int))

        # 6. Calcular Métricas (WMAPE)
        # Garante que a função calculate_wmape esteja disponível neste escopo
        # (Ela deve estar se a célula que a define foi executada)
        wmape_score = calculate_wmape(df_scored_features_pandas[TARGET], df_scored_features_pandas['prediction'])
        print(f"\nWMAPE na base completa: {wmape_score:.4f}\n")

        print("\nScoragem concluída. DataFrame com previsões criado (Pandas).")

    except Exception as e:
        print(f"Erro durante a scoragem: {e}")


Lendo base filtrada de: /content/drive/MyDrive/Hackaton - Big Data/data/processed/base_features_filtered.parquet diretamente com pandas

Applying grouping to column: premise
Applying grouping to column: categoria_pdv
Applying grouping to column: zipcode
Applying grouping to column: tipos
Applying grouping to column: label
Applying grouping to column: subcategoria
Applying grouping to column: marca
Applying grouping to column: fabricante
DataFrame com features filtradas lido com sucesso (Pandas).

Schema do DataFrame com features filtradas:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6454785 entries, 0 to 6454784
Data columns (total 50 columns):
 #   Column                                Dtype  
---  ------                                -----  
 0   internal_store_id                     object 
 1   internal_product_id                   object 
 2   rank                                  int32  
 3   mean_discount_last_15_weeks           float64
 4   min_quantity_last_9_weeks    

Unnamed: 0,internal_store_id,internal_product_id,rank,mean_discount_last_15_weeks,min_quantity_last_9_weeks,std_taxes_last_18_weeks,min_quantity_last_12_weeks,std_taxes_last_12_weeks,max_quantity_last_18_weeks,std_gross_value_last_15_weeks,min_quantity_last_15_weeks,Vendas_Anteriores_lag_3,marca,Vendas_Anteriores_lag_1,min_quantity_last_6_weeks,sum_quantity_last_15_weeks,std_quantity_last_12_weeks,min_taxes_last_6_weeks,max_quantity_last_12_weeks,max_quantity_last_15_weeks,week_rank,sum_quantity_last_18_weeks,mean_quantity_last_12_weeks,mean_quantity_last_15_weeks,sum_quantity_last_12_weeks,max_gross_value_last_9_weeks,sum_quantidade_pedidos_last_18_weeks,zipcode,mean_quantidade_pedidos_last_6_weeks,max_net_value_last_18_weeks,week_of_month,fabricante,distinct_products_last_1_weeks,distinct_products_last_9_weeks,taxes,premise,categoria_pdv,gross_value,net_value,distinct_products_last_3_weeks,distinct_products_last_18_weeks,distinct_products_last_15_weeks,label,tipos,distinct_products_last_12_weeks,distinct_products_last_6_weeks,subcategoria,gross_profit,discount,quantity
0,1000237487041964405,1837429607327399565,6,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,Outros,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,6,-1.0,-1.0,-1.0,-1.0,-1.0,-1,Outros,-1.0,-1.0,1,Outros,-1,-1,2.279758,On Premise,Outros,35.200001,32.920242,-1,-1,-1,Core,Package,-1,-1,IPA,9.420242,0.0,1.0
1,1000237487041964405,1837429607327399565,8,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,Outros,1.0,1.0,-1.0,-1.0,2.279758,-1.0,-1.0,8,-1.0,-1.0,-1.0,-1.0,-1.0,-1,Outros,1.0,-1.0,3,Outros,-1,-1,4.559516,On Premise,Outros,70.400002,65.840485,4,-1,-1,Core,Package,-1,6,IPA,18.840485,0.0,2.0
2,1000237487041964405,1837429607327399565,23,0.0,-1.0,1.612032,-1.0,-1.0,2.0,-1.0,2.0,-1.0,Outros,2.0,-1.0,2.0,-1.0,-1.0,-1.0,2.0,23,3.0,-1.0,2.0,-1.0,-1.0,2,Outros,-1.0,65.840485,4,Outros,-1,5,2.279758,On Premise,Outros,38.400002,36.120243,-1,18,14,Core,Package,9,5,IPA,10.700243,0.0,1.0
3,1000237487041964405,1837429607327399565,28,0.0,1.0,-1.0,1.0,-1.0,1.0,-1.0,1.0,1.0,Outros,1.0,1.0,1.0,-1.0,2.279758,1.0,1.0,28,1.0,1.0,1.0,1.0,38.400002,1,Outros,1.0,36.120243,4,Outros,-1,8,4.559516,On Premise,Outros,76.800003,72.240486,-1,17,17,Core,Package,13,8,IPA,21.400486,0.0,2.0
4,1000237487041964405,1837429607327399565,31,0.0,1.0,1.612032,1.0,1.612032,2.0,27.152901,1.0,2.0,Outros,2.0,2.0,3.0,0.707107,4.559516,2.0,2.0,31,3.0,1.5,1.5,3.0,76.800003,2,Outros,1.0,72.240486,2,Outros,1,14,4.559516,On Premise,Outros,76.800003,72.240486,6,23,19,Core,Package,14,6,IPA,21.400486,0.0,2.0



Iniciando a scoragem do DataFrame...
Features categóricas identificadas para scoragem: ['marca', 'zipcode', 'week_of_month', 'fabricante', 'premise', 'categoria_pdv', 'label', 'tipos', 'subcategoria']
Gerando previsões com o modelo CatBoost...
Previsões geradas.

WMAPE na base completa: 0.5621


Scoragem concluída. DataFrame com previsões criado (Pandas).


In [36]:
import numpy as np

# Add the predictions as a new column to the pandas DataFrame
df_scored_features_pandas['prediction'] = predictions
df_scored_features_pandas['prediction'] = np.where(df_scored_features_pandas['prediction'] < 0, 0, df_scored_features_pandas['prediction'].round().astype(int))

# 6. Calculate Metrics (WMAPE)
# Ensure calculate_wmape function is available in this scope
# (It should be if the cell defining it was executed)
wmape_score = calculate_wmape(df_scored_features_pandas[TARGET], df_scored_features_pandas['prediction'])
print(f"\nWMAPE na base completa: {wmape_score:.4f}\n")

print("\nScoragem concluída. DataFrame com previsões criado (Pandas).")
print("Schema do DataFrame com previsões:")
df_scored_features_pandas.info()
print("\nPrimeiras 5 linhas do DataFrame com previsões:")
display(df_scored_features_pandas.head())


WMAPE na base completa: 0.5621


Scoragem concluída. DataFrame com previsões criado (Pandas).
Schema do DataFrame com previsões:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6454785 entries, 0 to 6454784
Data columns (total 51 columns):
 #   Column                                Dtype  
---  ------                                -----  
 0   internal_store_id                     object 
 1   internal_product_id                   object 
 2   rank                                  int32  
 3   mean_discount_last_15_weeks           float64
 4   min_quantity_last_9_weeks             float64
 5   std_taxes_last_18_weeks               float64
 6   min_quantity_last_12_weeks            float64
 7   std_taxes_last_12_weeks               float64
 8   max_quantity_last_18_weeks            float64
 9   std_gross_value_last_15_weeks         float64
 10  min_quantity_last_15_weeks            float64
 11  Vendas_Anteriores_lag_3               float64
 12  marca                                 

Unnamed: 0,internal_store_id,internal_product_id,rank,mean_discount_last_15_weeks,min_quantity_last_9_weeks,std_taxes_last_18_weeks,min_quantity_last_12_weeks,std_taxes_last_12_weeks,max_quantity_last_18_weeks,std_gross_value_last_15_weeks,min_quantity_last_15_weeks,Vendas_Anteriores_lag_3,marca,Vendas_Anteriores_lag_1,min_quantity_last_6_weeks,sum_quantity_last_15_weeks,std_quantity_last_12_weeks,min_taxes_last_6_weeks,max_quantity_last_12_weeks,max_quantity_last_15_weeks,week_rank,sum_quantity_last_18_weeks,mean_quantity_last_12_weeks,mean_quantity_last_15_weeks,sum_quantity_last_12_weeks,max_gross_value_last_9_weeks,sum_quantidade_pedidos_last_18_weeks,zipcode,mean_quantidade_pedidos_last_6_weeks,max_net_value_last_18_weeks,week_of_month,fabricante,distinct_products_last_1_weeks,distinct_products_last_9_weeks,taxes,premise,categoria_pdv,gross_value,net_value,distinct_products_last_3_weeks,distinct_products_last_18_weeks,distinct_products_last_15_weeks,label,tipos,distinct_products_last_12_weeks,distinct_products_last_6_weeks,subcategoria,gross_profit,discount,quantity,prediction
0,1000237487041964405,1837429607327399565,6,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,Outros,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,6,-1.0,-1.0,-1.0,-1.0,-1.0,-1,Outros,-1.0,-1.0,1,Outros,-1,-1,2.279758,On Premise,Outros,35.200001,32.920242,-1,-1,-1,Core,Package,-1,-1,IPA,9.420242,0.0,1.0,1
1,1000237487041964405,1837429607327399565,8,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,Outros,1.0,1.0,-1.0,-1.0,2.279758,-1.0,-1.0,8,-1.0,-1.0,-1.0,-1.0,-1.0,-1,Outros,1.0,-1.0,3,Outros,-1,-1,4.559516,On Premise,Outros,70.400002,65.840485,4,-1,-1,Core,Package,-1,6,IPA,18.840485,0.0,2.0,2
2,1000237487041964405,1837429607327399565,23,0.0,-1.0,1.612032,-1.0,-1.0,2.0,-1.0,2.0,-1.0,Outros,2.0,-1.0,2.0,-1.0,-1.0,-1.0,2.0,23,3.0,-1.0,2.0,-1.0,-1.0,2,Outros,-1.0,65.840485,4,Outros,-1,5,2.279758,On Premise,Outros,38.400002,36.120243,-1,18,14,Core,Package,9,5,IPA,10.700243,0.0,1.0,0
3,1000237487041964405,1837429607327399565,28,0.0,1.0,-1.0,1.0,-1.0,1.0,-1.0,1.0,1.0,Outros,1.0,1.0,1.0,-1.0,2.279758,1.0,1.0,28,1.0,1.0,1.0,1.0,38.400002,1,Outros,1.0,36.120243,4,Outros,-1,8,4.559516,On Premise,Outros,76.800003,72.240486,-1,17,17,Core,Package,13,8,IPA,21.400486,0.0,2.0,2
4,1000237487041964405,1837429607327399565,31,0.0,1.0,1.612032,1.0,1.612032,2.0,27.152901,1.0,2.0,Outros,2.0,2.0,3.0,0.707107,4.559516,2.0,2.0,31,3.0,1.5,1.5,3.0,76.800003,2,Outros,1.0,72.240486,2,Outros,1,14,4.559516,On Premise,Outros,76.800003,72.240486,6,23,19,Core,Package,14,6,IPA,21.400486,0.0,2.0,2


In [37]:
import os

# Define o caminho de saída para o DataFrame pontuado
output_path_scored = os.path.join(project_root_base, "data", "processed", "base_scored_2022.parquet")

# Garante que o diretório exista
output_dir = os.path.dirname(output_path_scored)
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    print(f"Diretório criado: {output_dir}")

# Verifica se o DataFrame a ser salvo existe
if 'df_scored_features_pandas' in locals() and df_scored_features_pandas is not None:
    print(f"\nSalvando o DataFrame pontuado em: {output_path_scored}")
    try:
        # Salva o DataFrame do Pandas em um arquivo parquet
        df_scored_features_pandas.to_parquet(output_path_scored, engine='pyarrow', index=False) # index=False para não salvar o índice do Pandas
        print("DataFrame pontuado salvo com sucesso.")
    except Exception as e:
        print(f"Erro ao salvar o DataFrame pontuado: {e}")
else:
    print("\nErro: DataFrame 'df_scored_features_pandas' não encontrado. Por favor, execute a célula de pontuação primeiro.")


Salvando o DataFrame pontuado em: /content/drive/MyDrive/Hackaton - Big Data/data/processed/base_scored_2022.parquet
DataFrame pontuado salvo com sucesso.


# Scoragem de Base para Jan/2023

## Criação da Base com datas de Janeiro de 2023

In [5]:
import os
import pandas as pd
import numpy as np
import sys
from pyspark.sql import SparkSession

# Define o caminho para a pasta contendo os arquivos parquet filtrados
input_path_filtered_parquet = os.path.join(project_root_base, "data", "processed", "base_consolidada.parquet")

# Verifica se o diretório parquet filtrado existe antes de tentar ler
if not os.path.exists(input_path_filtered_parquet):
    print(f"Erro: Diretório '{input_path_filtered_parquet}' não encontrado.")
    print("Por favor, verifique se a base filtrada foi salva corretamente.")
    base_consolidada_hist = None
else:
    print(f"\nLendo base filtrada de: {input_path_filtered_parquet} diretamente com pandas\n")
    try:
        # Lê os arquivos parquet diretamente para um DataFrame Pandas
        base_consolidada_hist = pd.read_parquet(input_path_filtered_parquet, engine='pyarrow')
        base_consolidada_hist.fillna(-1, inplace = True)
        print("DataFrame com as features filtradas lido com sucesso (Pandas).")
    except Exception as e:
        print(f"Erro ao ler arquivos parquet com pandas: {e}")
        base_consolidada_hist = None


Lendo base filtrada de: /content/drive/MyDrive/Hackaton - Big Data/data/processed/base_consolidada.parquet diretamente com pandas

DataFrame com as features filtradas lido com sucesso (Pandas).


In [6]:
from datetime import date
import numpy as np

# Filtra o DataFrame para uma data de referência específica
df_filtrado = base_consolidada_hist[base_consolidada_hist['reference_date'] == date(2022, 1, 1)].copy() # Use .copy() para evitar SettingWithCopyWarning

# Atribua o valor NaN à coluna 'quantity'
df_filtrado['quantity'] = np.nan

# Atualiza as colunas de data para 2023
# Assumindo que as colunas de data são do tipo datetime.date ou podem ser convertidas
def update_year_to_2023(d):
    if isinstance(d, date):
        try:
            return date(2023, d.month, d.day)
        except ValueError:
            # Lida com casos onde o dia pode ser inválido para o mês em 2023 (ex: 29 de fev em ano não bissexto)
            # Para simplificar, podemos retornar a data original ou lidar especificamente se necessário
            print(f"Aviso: Data inválida após atualizar o ano para 2023: {d}")
            return d # Ou retorna uma data padrão, ou lida de forma diferente
    return d # Retorna o valor original se não for uma data

df_filtrado['transaction_date'] = df_filtrado['transaction_date'].apply(update_year_to_2023)
df_filtrado['reference_date'] = df_filtrado['reference_date'].apply(update_year_to_2023)


# Exibe o DataFrame filtrado e transformado
print(f"DataFrame filtrado para reference_date == 2022-01-01 e transformado para 2023:")
display(df_filtrado.head())
print(f"Número de linhas no DataFrame filtrado e transformado: {len(df_filtrado)}")

DataFrame filtrado para reference_date == 2022-01-01 e transformado para 2023:


Unnamed: 0,pdv,premise,categoria_pdv,zipcode,internal_store_id,internal_product_id,distributor_id,transaction_date,reference_date,quantity,...,discount,taxes,produto,categoria,descricao,tipos,label,subcategoria,marca,fabricante
64,8277069645128751773,Off Premise,Grocery,80424.0,8277069645128751773,2412225681255611361,7,2023-01-12,2023-01-01,,...,0.0,0.380409,2412225681255611361,ABA Spirits,FIREBALL CINNAMON MALT 50ML,Package,Core,Liqueurs & Cordials,Fireball Cinnamon-Malt,Sazerac ABA
65,7794032741516778774,Off Premise,Grocery,30096.0,7794032741516778774,3726736891643803768,4,2023-01-27,2023-01-01,,...,0.0,0.0,3726736891643803768,Non-Alcohol,GHOST ENERGY CITRUS 12/16 CN,Non Alcohol,Core,Energy,Ghost Energy Citrus,AB Anheuser Busch Inc
109,7731233724512594516,Off Premise,Convenience,80229.0,7731233724512594516,500478784353013717,5,2023-01-11,2023-01-01,,...,38.220001,1.575,500478784353013717,Package,BUD LIGHT 2/15/12 CN,Package,Core,Lager,Bud Light,AB Anheuser Busch Inc
143,8737393492013252421,Off Premise,Package/Liquor,80002.0,8737393492013252421,4451855961536069898,5,2023-01-10,2023-01-01,,...,0.0,0.234375,4451855961536069898,Package,BUDWEISER 15/25 CN,Package,Core,Lager,Budweiser,AB Anheuser Busch Inc
202,3444141618594117443,On Premise,Restaurant,80478.0,3444141618594117443,3262679882836704514,7,2023-01-14,2023-01-01,,...,0.0,0.54,3262679882836704514,Package,BUD LIGHT 2/12/12 LN,Package,Core,Lager,Bud Light,AB Anheuser Busch Inc


Número de linhas no DataFrame filtrado e transformado: 383932


In [7]:
base_final = pd.concat([base_consolidada_hist, df_filtrado], ignore_index=True)

# Convert identifier and categorical columns to string to avoid type issues when saving to parquet
cols_to_convert_to_string = [
    'pdv',
    'produto',
    'internal_store_id',
    'internal_product_id',
    'distributor_id',
    'premise',
    'categoria_pdv',
    'zipcode',
    'categoria',
    'tipos',
    'label',
    'subcategoria',
    'marca',
    'fabricante'
]

for col_name in cols_to_convert_to_string:
    if col_name in base_final.columns:
        # Convert para string, tratando possíveis valores NaN ou outros tipos
        base_final[col_name] = base_final[col_name].astype(str)

In [8]:
import os

# Define o caminho de saída para o DataFrame combinado
output_path_combined = os.path.join(project_root_base, "data", "processed", "base_combined_hist_future.parquet")

# Garante que o diretório existe, caso contrário, cria-o
output_dir = os.path.dirname(output_path_combined)
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    print(f"Diretório criado: {output_dir}")

# Verifica se o DataFrame a ser salvo existe
if 'base_final' in locals() and base_final is not None:
    print(f"\nSalvando DataFrame combinado em: {output_path_combined}")
    try:
        # Salva o DataFrame pandas em um arquivo parquet
        base_final.to_parquet(output_path_combined, engine='pyarrow', index=False) # index=False para não salvar o índice do pandas
        print("DataFrame combinado salvo com sucesso.")
    except Exception as e:
        print(f"Erro ao salvar DataFrame combinado: {e}")
else:
    print("\nErro: DataFrame 'base_final' não encontrado. Por favor, execute a célula de concatenação primeiro.")


Salvando DataFrame combinado em: /content/drive/MyDrive/Hackaton - Big Data/data/processed/base_combined_hist_future.parquet
DataFrame combinado salvo com sucesso.


## Estrutura Para a Scoragem de 2023

In [45]:
import pandas as pd
import numpy as np
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when
from pyspark.sql import Window
import tempfile # Import tempfile para criação de diretorios temporarios

# Inicializa a sessão Spark (se ainda não estiver inicializada)
try:
    spark
except NameError:
    spark = SparkSession.builder \
    .appName("FeatureEngineeringNotebook") \
    .config("spark.network.timeout", "600s") \
    .config("spark.sql.broadcastTimeout", "1200s") \
    .getOrCreate()

In [46]:
# 1. Load the data

# Define o caminho para o DataFrame consolidado
input_path_consolidada = os.path.join(project_root_base, "data", "processed", "base_combined_hist_future.parquet")

# Verifica se o arquivo existe antes de tentar ler
if not os.path.exists(input_path_consolidada):
    print(f"Erro: Arquivo '{input_path_consolidada}' não encontrado.")
    print("Verifique se o 01_load_data.py foi executado e se o caminho está correto.")
else:
    # Carrega o DataFrame consolidado com o Spark
    print(f"\nCarregando o DataFrame consolidado de: {input_path_consolidada}\n")
    df_consolidada_score = spark.read.parquet(input_path_consolidada)
    print("DataFrame consolidado carregado com sucesso.")

    # Get distinct internal_store_id and order them
    all_store_ids = df_consolidada_score.select('internal_store_id').distinct().orderBy('internal_store_id').rdd.flatMap(lambda x: x).collect()

    # Split the store IDs into four lists
    quarter_length = len(all_store_ids) // 4
    lista_1 = all_store_ids[:quarter_length]
    lista_2 = all_store_ids[quarter_length:quarter_length*2]
    lista_3 = all_store_ids[quarter_length*2:quarter_length*3]
    lista_4 = all_store_ids[quarter_length*3:]


    print(f"Total distinct internal_store_ids: {len(all_store_ids)}")

    print(f"Number of store IDs in lista_1 (first quarter): {len(lista_1)}")
    print(f"Number of store IDs in lista_2 (second quarter): {len(lista_2)}")
    print(f"Number of store IDs in lista_3 (third quarter): {len(lista_3)}")
    print(f"Number of store IDs in lista_4 (fourth quarter): {len(lista_4)}")


    # Filter the DataFrame to process only the first quarter of the stores
    df_consolidada_score = df_consolidada_score.filter(col('internal_store_id').isin(lista_4))
    lista_ids = 4
    print(f"DataFrame filtered to include only stores from lista_{lista_ids}. Number of rows: {df_consolidada_score.count()}")


Carregando o DataFrame consolidado de: /content/drive/MyDrive/Hackaton - Big Data/data/processed/base_combined_hist_future.parquet

DataFrame consolidado carregado com sucesso.
Total distinct internal_store_ids: 15086
Number of store IDs in lista_1 (first quarter): 3771
Number of store IDs in lista_2 (second quarter): 3771
Number of store IDs in lista_3 (third quarter): 3771
Number of store IDs in lista_4 (fourth quarter): 3773
DataFrame filtered to include only stores from lista_4. Number of rows: 1836662


In [47]:
# 2. Feature Engineering (com imports e setup de ambiente explicitos)

# Define a raiz do projeto
src_path = os.path.join(project_root, 'src')

# Adiciona a pasta 'src' ao caminho de busca do Python
if src_path not in sys.path:
    sys.path.append(src_path)
    print(f"Adicionado '{src_path}' ao sys.path")
else:
    print(f"'{src_path}' já está no sys.path")


# Adiciona o arquivo Python ao SparkContext (necessário para funções UDF ou modulos em src)
# Verifique se o arquivo _02_feature_engineering.py existe antes de adicionar
feature_engineering_script_path = os.path.join(project_root, 'src', '_02_feature_engineering.py')
if os.path.exists(feature_engineering_script_path):
    spark.sparkContext.addPyFile(feature_engineering_script_path)
    print(f"Adicionado '{feature_engineering_script_path}' ao SparkContext")
else:
    print(f"Erro: Arquivo '{feature_engineering_script_path}' não encontrado. Não foi possível adicionar ao SparkContext.")


# Importa a função feature_engineering_pipeline APÓS adicionar o arquivo ao SparkContext
try:

    from _02_feature_engineering import feature_engineering_pipeline, calculate_week_of_month
    from _02_feature_engineering import create_global_week_id_and_rank, calculate_lagged_features_optimized

    print("Funções de '_02_feature_engineering.py' importadas com sucesso.")
except ImportError as e:
    print(f"Erro ao importar '_02_feature_engineering.py': {e}")
    print("Verifique se o arquivo '_02_feature_engineering.py' existe na pasta 'src' e se o caminho foi adicionado corretamente.")

'/content/Big-Data-forecast/src' já está no sys.path
Adicionado '/content/Big-Data-forecast/src/_02_feature_engineering.py' ao SparkContext
Funções de '_02_feature_engineering.py' importadas com sucesso.


In [48]:
!pip install catboost



In [49]:
# 4. Load the Model

import os
from catboost import CatBoostRegressor

# Define o caminho para o modelo salvo
model_load_path = os.path.join(project_root, "models", "catboost_final_model.cbm")

# Verifica se o arquivo do modelo existe antes de tentar carregar
if not os.path.exists(model_load_path):
    print(f"Erro: Arquivo do modelo '{model_load_path}' não encontrado.")
    print("Por favor, verifique se o modelo foi treinado e salvo corretamente.")
    loaded_model = None # Initialize as None to avoid NameError
else:
    print(f"\nCarregando o modelo CatBoost de: '{model_load_path}'\n")
    try:
        # Inicializa um modelo CatBoost vazio e carrega os parâmetros do arquivo
        loaded_model = CatBoostRegressor()
        loaded_model.load_model(model_load_path)
        print("Modelo CatBoost carregado com sucesso.")
    except Exception as e:
        print(f"Erro ao carregar o modelo: {e}")
        loaded_model = None # Set to None if loading fails


Carregando o modelo CatBoost de: '/content/Big-Data-forecast/models/catboost_final_model.cbm'

Modelo CatBoost carregado com sucesso.


In [50]:
features_file_path = os.path.join(project_root, "models", "features_selecionadas.txt")

with open(features_file_path, "r") as f:
  features_selecionadas = [line.strip() for line in f if line.strip()]

print(f"\nFeatures selecionadas lidas do arquivo: {len(features_selecionadas)} features")


Features selecionadas lidas do arquivo: 46 features


In [51]:
import pickle

# Define o caminho para o mapa de categorias agrupadas salvo
grouped_map_save_path = os.path.join(project_root, "models", "grouped_categories_map.pkl")

# Verifica se o arquivo existe antes de tentar carregar
if not os.path.exists(grouped_map_save_path):
    print(f"Erro: Arquivo do mapa de categorias agrupadas '{grouped_map_save_path}' não encontrado.")
    print("Por favor, verifique se o mapa foi salvo com sucesso.")
    loaded_grouped_map = None # Inicializa como None para evitar NameError
else:
    print(f"\nCarregando mapa de categorias agrupadas de: '{grouped_map_save_path}'\n")
    try:
        # Carrega o dicionário usando pickle
        with open(grouped_map_save_path, 'rb') as f:
            loaded_grouped_map = pickle.load(f)
        print("Mapa de categorias agrupadas carregado com sucesso.")
        # Exibe o mapa carregado (opcional)
        # print("\nMapa de categorias agrupadas carregado:")
        # print(loaded_grouped_map)
    except Exception as e:
        print(f"\nErro ao carregar o mapa de categorias agrupadas: {e}")
        loaded_grouped_map = None # Define como None se o carregamento falhar


Carregando mapa de categorias agrupadas de: '/content/Big-Data-forecast/models/grouped_categories_map.pkl'

Mapa de categorias agrupadas carregado com sucesso.


In [52]:
import pandas as pd

def apply_category_grouping(df: pd.DataFrame, grouped_categories_map: dict) -> pd.DataFrame:
    """
    Aplica o agrupamento de categorias a um DataFrame com base em um mapeamento pré-definido.

    Args:
        df (pd.DataFrame): O DataFrame de entrada para transformar.
        grouped_categories_map (dict): Um dicionário que mapeia nomes de colunas para a lista de
                                       categorias a serem agrupadas em 'Outros'.

    Returns:
        pd.DataFrame: O DataFrame com as categorias agrupadas de acordo com o mapa.
    """
    df_transformed = df.copy()

    for col, categories_to_group in grouped_categories_map.items():
        if col in df_transformed.columns:
            print(f"Aplicando agrupamento à coluna: {col}")
            # Substitui as categorias especificadas por 'Outros'
            df_transformed[col] = df_transformed[col].replace(categories_to_group, 'Outros')
        else:
            print(f"Aviso: Coluna '{col}' não encontrada no DataFrame para a transformação.")

    return df_transformed

In [53]:
CATEGORICAL_FEATURES = [
    'premise',
    'categoria_pdv',
    'zipcode',
    'categoria',
    'tipos',
    'label',
    'subcategoria',
    'marca',
    'fabricante',
    'week_of_month',
    'global_week_id',
]

In [54]:
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, weekofyear, dense_rank, lag, concat, lit, countDistinct, count, mean, sum, avg, min, max, stddev, when, size, collect_set, lpad, cast
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
import os
from pyspark.sql.window import Window
from datetime import datetime

calculate_week_of_month_udf = udf(calculate_week_of_month, IntegerType())

# Aplica as funções para calcular global_week_id, rank e week_of_month em df_consolidada_score

# Garante que df_consolidada_score está disponível
if 'df_consolidada_score' in locals() and df_consolidada_score is not None:
    print("Calculando global_week_id, rank e week_of_month para df_consolidada_score...")


    # Aplica a função para calcular week_of_month
    df_filtered_with_week = df_consolidada_score.withColumn(
        'week_of_month',
        calculate_week_of_month_udf(col('transaction_date'), col('reference_date'))
    )
    print("week_of_month calculado.")

    # Aplica a função para criar global_week_id e rank
    df_filtered_with_rank = create_global_week_id_and_rank(df_filtered_with_week)
    print("global_week_id e rank calculados.")

    # Atualiza df_consolidada_score para ser o DataFrame com as novas colunas
    df_consolidada_score = df_filtered_with_rank

    print("\nDataFrame df_consolidada_score atualizado com global_week_id, rank e week_of_month.")
    # df_filtered.printSchema()
    # df_filtered.show(5)

else:
    print("Erro: DataFrame 'df_consolidada_score' não encontrado. Por favor, carregue e filtre os dados primeiro.")

Calculando global_week_id, rank e week_of_month para df_consolidada_score...
week_of_month calculado.
Criando identificador de semana global e rank...
global_week_id e rank calculados.

DataFrame df_consolidada_score atualizado com global_week_id, rank e week_of_month.


In [55]:
# Step 4.3: Cálculo Iterativo de Features e Scoragem para Semanas Futuras (Refatorado)

import pandas as pd
from pyspark.sql.functions import col, lit, when
from pyspark.sql import Window
import os
import numpy as np # Importa numpy para np.where
import tempfile # Importa tempfile para criar diretórios temporários
from tqdm.auto import tqdm # Importa tqdm para barra de progresso

calculate_week_of_month_udf = udf(calculate_week_of_month, IntegerType())

# Importa as funções de feature engineering e agrupamento de categorias
# Certifique-se de que essas funções estão definidas em outras células ou scripts
# from _02_feature_engineering import feature_engineering_pipeline, calculate_lagged_features_optimized # Funções para cálculo de features
# from your_module import apply_category_grouping # Função para agrupamento de categorias
# from your_module import calculate_wmape # Função para cálculo de WMAPE (se necessário nesta etapa)


def calculate_and_score_week(
    original_cols,
    df_iterative_spark,
    current_rank: int,
    features_selecionadas: list,
    loaded_model: CatBoostRegressor,
    loaded_grouped_map: dict,
    categorical_features: list,
    spark: SparkSession,
    week_windows_to_consider: list,
    value_cols_to_aggregate: list,
    aggregation_functions_to_apply: list,):
    """
    Calcula features para a semana atual, escore e retorna o DataFrame Pandas scorado
    e o DataFrame Spark iterativo atualizado com a previsão.

    Args:
        df_iterative_spark (df.DataFrame): DataFrame Spark combinado (histórico + futuro)
                                            com coluna 'prediction' atualizável.
        current_rank (int): O rank da semana atual para processar.
        features_selecionadas (list): Lista de nomes das features selecionadas para o modelo.
        loaded_model (CatBoostRegressor): O modelo CatBoost treinado.
        loaded_grouped_map (dict): Mapa para agrupar categorias de baixa frequência.
        categorical_features (list): Lista de nomes das features categóricas originais.
        spark (SparkSession): A sessão Spark.
        week_windows_to_consider (list): Janelas de semanas para features defasadas.
        value_cols_to_aggregate (list): Colunas de valor para agregar.
        aggregation_functions_to_apply (list): Funções de agregação.

    Returns:
        tuple[pd.DataFrame, df.DataFrame]: Uma tupla contendo o DataFrame Pandas
                                            com a semana atual scorada e o DataFrame
                                            Spark iterativo atualizado.
        Retorna (None, df_iterative_spark) se ocorrer um erro ou não houver dados para a semana.
    """
    print(f"\n--- Processando Week Rank: {current_rank} ---")

    # Aplica a função week of month
    df_iterative_spark = df_consolidada_score.withColumn(
        'week_of_month',
        calculate_week_of_month_udf(col('transaction_date'), col('reference_date'))
    )

    # Aplica a função para criar global_week_id e rank
    df_iterative_spark = create_global_week_id_and_rank(df_iterative_spark)

    # 1. Identifica os dados para a semana atual e dados anteriores necessários para features
    # Filtra o DataFrame combinado para obter as linhas até a semana futura atual
    df_current_week_data_for_features = df_iterative_spark.filter(col("global_week_id") <= f'2023-01-{current_rank}').select(original_cols)

    if df_current_week_data_for_features.count() == 0:
        print(f"Nenhum dado encontrado para o rank {current_rank} e anteriores para cálculo de features. Pulando.")
        return None, df_iterative_spark

    # --- Cálculo de Features para a semana atual ---
    # Re-aplica a lógica de feature engineering ao subset de dados até a semana atual.
    # Usa feature_engineering_pipeline conforme solicitado.
    # Esta função DEVE ser capaz de usar a coluna 'prediction'
    # quando 'quantity' é nulo para as semanas futuras ao calcular features defasadas/de janela.

    print("\nAplicando feature engineering ao subset de dados para o rank atual...")


    # Aplica a pipeline de feature engineering no subset de dados.
    # Esta chamada assume que feature_engineering_pipeline pode lidar com o DataFrame iterativo
    # e usar a coluna 'prediction' para calcular features para semanas futuras.
    df_current_week_with_features = feature_engineering_pipeline(
            df_current_week_data_for_features, # Passa o subset de dados até a semana atual
            week_windows_to_consider,
            value_cols_to_aggregate, # Passa as colunas de valor relevantes
            aggregation_functions_to_apply,
            spark # Passa a sessão Spark
        )

    # Filtra para obter APENAS as linhas da semana atual com as features calculadas
    df_current_week_to_score_spark = df_current_week_with_features.filter(col("global_week_id") == f'2023-01-{current_rank}')

    if df_current_week_to_score_spark.count() == 0:
        print(f"Nenhuma linha com features calculadas encontrada para o rank {current_rank}. Pulando.")
        return None, df_iterative_spark

    # --- Preparação dos dados para scoragem ---
    # Seleciona as colunas necessárias para scoragem (features + identificadores/semana).
    cols_for_scoring_spark = features_selecionadas + ['internal_store_id', 'internal_product_id', 'rank', 'global_week_id', 'quantity']

    # Filtra colunas que podem não estar em df_current_week_to_score_spark
    available_cols_for_scoring = [c for c in cols_for_scoring_spark if c in df_current_week_to_score_spark.columns]
    if len(available_cols_for_scoring) != len(cols_for_scoring_spark):
        missing = [c for c in cols_for_scoring_spark if c not in df_current_week_to_score_spark.columns]
        print(f"Aviso: Colunas selecionadas para scoragem faltando em df_current_week_to_score_spark: {missing}")
        # Decide como lidar com features faltando - preencher com valor padrão (-1) ou pular a scoragem?
        # Para agora, vamos tentar preencher e avisar.
        # Seleciona APENAS as colunas disponíveis para evitar erro no select
        df_current_week_for_scoring_spark = df_current_week_to_score_spark.select(available_cols_for_scoring)
    else:
         df_current_week_for_scoring_spark = df_current_week_to_score_spark.select(available_cols_for_scoring)


    # Converte para pandas para scoragem via salvamento em parquet e leitura de volta (solução para grandes DataFrames)
    print("Salvando dados do rank atual em parquet temporário para conversão para Pandas...")
    # Cria um diretório temporário
    with tempfile.TemporaryDirectory() as tmp_dir:
        temp_parquet_path = os.path.join(tmp_dir, f"rank_{current_rank}_data.parquet")
        try:
            # Repartition antes de escrever para controlar o número de arquivos de saída
            df_current_week_for_scoring_spark.repartition(100).write.mode("overwrite").parquet(temp_parquet_path)
            print("Salvamento completo. Lendo parquet temporário com Pandas...")
            df_current_week_pandas = pd.read_parquet(temp_parquet_path, engine='pyarrow')
            print("Leitura com Pandas completa.")
        except Exception as e:
            print(f"Erro durante salvamento/leitura do parquet temporário: {e}")
            return None, df_iterative_spark # Retorna None para o DataFrame Pandas e o Spark DF original


    if df_current_week_pandas is not None:
        # Preenche valores nulos com -1 no DataFrame Pandas (para features calculadas que podem resultar em nulos)
        df_current_week_pandas.fillna(-1, inplace = True)
        print("Valores nulos preenchidos no DataFrame Pandas com -1.")

        # Aplica o agrupamento de categorias no DataFrame Pandas
        print("Aplicando agrupamento de categorias ao DataFrame Pandas...")
        df_current_week_pandas = apply_category_grouping(df_current_week_pandas, loaded_grouped_map)
        print("Agrupamento de categorias aplicado.")

        # Identifica features categóricas no DataFrame Pandas e converte seus tipos para string
        # CatBoost espera string ou category para features categóricas
        categorical_features_in_scoring = [f for f in features_selecionadas if f in df_current_week_pandas.columns and f in CATEGORICAL_FEATURES]
        for col_name in categorical_features_in_scoring:
            if col_name in df_current_week_pandas.columns:
                df_current_week_pandas[col_name] = df_current_week_pandas[col_name].astype(str)
        print(f"Features categóricas identificadas e convertidas para string para scoragem: {categorical_features_in_scoring}")


        # --- Geração de Previsões ---
        print("Gerando previsões com o modelo CatBoost...")
        # Garante que as colunas em df_current_week_pandas estão na mesma ordem que features_selecionadas
        # Lida com casos onde colunas de feature podem estar faltando após o cálculo/seleção
        features_present_for_scoring = [f for f in features_selecionadas if f in df_current_week_pandas.columns]
        if len(features_present_for_scoring) != len(features_selecionadas):
            missing_scoring_features = [f for f in features_selecionadas if f not in df_current_week_pandas.columns]
            print(f"Erro: Features selecionadas faltando no DataFrame Pandas para scoragem: {missing_scoring_features}")
            # Se features essenciais estiverem faltando, preve 0 ou pule
            predictions = np.zeros(len(df_current_week_pandas)) # Preve 0 se features estiverem faltando
        else:
            predictions = loaded_model.predict(df_current_week_pandas[features_present_for_scoring])
        print("Previsões geradas.")

        # Adiciona as previsões como uma nova coluna ao DataFrame Pandas
        df_current_week_pandas['prediction'] = predictions
        # Garante que as previsões não são negativas e arredonda para o inteiro mais próximo
        df_current_week_pandas['prediction'] = np.where(df_current_week_pandas['prediction'] < 0, 0, df_current_week_pandas['prediction']).round().astype(int)
        print("Previsões adicionadas ao DataFrame Pandas.")

        # Atualiza a coluna 'quantity' com as previsões
        df_current_week_pandas['quantity'] = np.where(df_current_week_pandas['quantity'] == -1, df_current_week_pandas['prediction'], df_current_week_pandas['quantity'])

        print(f"Scoragem para o rank {current_rank} concluída.")

        # --- Atualiza df_iterative com as previsões da semana atual ---
        # Converte o DataFrame Pandas scorado de volta para Spark
        # Seleciona apenas as colunas necessárias para o join de atualização
        df_current_week_scored_spark = spark.createDataFrame(df_current_week_pandas[['internal_store_id', 'internal_product_id', 'global_week_id', 'prediction']])

        # Atualiza a coluna 'prediction' em df_iterative para a semana atual
        # Isso é necessário para calcular features para semanas futuras subsequentes.

        df_iterative_updated = df_iterative_spark.alias("iter").join(
            df_current_week_scored_spark.alias("scored"),
            (col("iter.internal_store_id") == col("scored.internal_store_id")) &
            (col("iter.internal_product_id") == col("scored.internal_product_id")) &
            (col("iter.global_week_id") == col("scored.global_week_id")), # Join usando a coluna 'rank'
            "left" # Usa left join para manter todas as linhas em df_iterative_spark
        ).withColumn(
            "quantity",
            when(col("iter.rank") == f'2023-01-{current_rank}', col("scored.prediction")).otherwise(col("iter.quantity"))
        ).select(
            "iter.*", # Mantém todas as colunas originais de df_iterative_spark
            col("quantity").alias("quantity") # Seleciona a coluna de previsão atualizada
        )

        df_iterative_updated = df_iterative_updated.select(original_cols)

        print(f"df_iterative atualizado com previsões para o rank {current_rank}.")
        # print(df_iterative_updated.printSchema())

        return df_current_week_pandas, df_iterative_updated # Retorna o Pandas DF scorado e o Spark DF atualizado

    else:
        return None, df_iterative_spark # Retorna None para o DataFrame Pandas e o Spark DF original em caso de erro


In [56]:

# --- Bloco Principal de Execução ---
# Garante que as variáveis necessárias estão definidas e disponíveis
if 'df_consolidada_score' not in locals() or df_consolidada_score is None: # Usando df_consolidada_score como base
    print("Erro: DataFrame 'df_consolidada_score' não encontrado. Por favor, carregue e prepare a base combinada.")
elif 'loaded_model' not in locals() or loaded_model is None:
    print("Erro: Modelo treinado 'loaded_model' não encontrado. Por favor, carregue o modelo.")
elif 'features_selecionadas' not in locals() or not features_selecionadas:
     print("Erro: 'features_selecionadas' não encontrado ou vazio. Por favor, execute a seleção de features.")
elif 'CATEGORICAL_FEATURES' not in locals() or not CATEGORICAL_FEATURES:
     print("Erro: 'CATEGORICAL_FEATURES' não encontrado ou vazio. Por favor, defina as features categóricas.")
elif 'loaded_grouped_map' not in locals() or loaded_grouped_map is None:
     print("Erro: 'loaded_grouped_map' não encontrado. Por favor, carregue o mapa de categorias agrupadas.")
else:
    print("\n--- Iniciando o Loop de Scoragem Iterativa para Semanas Futuras ---")

    # Define os parâmetros de feature engineering (podem ser reutilizados se definidos globalmente)
    week_windows_to_consider = [1, 3, 6, 9, 12, 15, 18]
    value_cols_to_aggregate = ['quantity', 'gross_value', 'net_value', 'gross_profit', 'discount', 'taxes', 'quantidade_pedidos']
    aggregation_functions_to_apply = ['count', 'sum', 'mean', 'min', 'max', 'std']
    TARGET = 'quantity'

    # Obtém a lista de ranks futuros a serem processados diretamente de df_consolidada_score
    # Filtra os ranks que correspondem às semanas futuras (onde quantity é nulo)
    # Assumindo que 'rank' é a coluna de rank contínuo
    future_week_ranks = df_consolidada_score.filter(col(TARGET).isNull()).select('week_of_month').distinct().orderBy('week_of_month').rdd.flatMap(lambda x: x).collect()

    if not future_week_ranks:
        print("Nenhum rank futuro encontrado em df_consolidada_score com quantity nulo.")
    else:
        print(f"Ranks futuros a serem processados: {future_week_ranks}")

        # Inicializa uma lista para armazenar os DataFrames Pandas scorados de cada semana
        scored_future_data_list = []
        original_cols = [c for c in df_consolidada_score.columns]

        # Inicializa o DataFrame Spark iterativo com a coluna de previsão
        # Usa df_consolidada_score como base e adiciona a coluna de previsão
        df_iterative_spark = df_consolidada_score.withColumn("prediction", col(TARGET)) # Inicializa prediction com os valores de quantity (null para futuro)
        df_iterative_spark = df_iterative_spark.withColumn("prediction", col("prediction").cast("double")) # Garante tipo double

        # Loop pelos ranks futuros
        for current_rank in tqdm(future_week_ranks, desc="Scoragem Iterativa Semanal"):
            # Calcula features, escore e atualiza o DataFrame iterativo
            scored_pandas_df, df_iterative_spark = calculate_and_score_week(
                original_cols,
                df_iterative_spark,
                current_rank,
                features_selecionadas,
                loaded_model,
                loaded_grouped_map,
                CATEGORICAL_FEATURES,
                spark,
                week_windows_to_consider,
                value_cols_to_aggregate,
                aggregation_functions_to_apply,
            )
            # display(scored_pandas_df.head())
            # Se a scoragem foi bem-sucedida para a semana, anexa o resultado Pandas
            if scored_pandas_df is not None:
                 scored_future_data_list.append(scored_pandas_df)


        # Após iterar por todas as semanas futuras, combina os DataFrames Pandas scorados
        if scored_future_data_list:
            df_scored_future_final_pandas = pd.concat(scored_future_data_list, ignore_index=True)
            print("\n--- Scoragem Iterativa Completa ---")
            print("DataFrame Futuro Scorado Final (Pandas):")
            print(df_scored_future_final_pandas.info())
            display(df_scored_future_final_pandas.head())

            # Defina o caminho de saída
            output_path_future_predictions = os.path.join(project_root_base, "data", "processed", "jan2023_predictions.parquet")
            df_scored_future_final_pandas.to_parquet(output_path_future_predictions, index=False)
            print(f"\nPrevisões futuras salvas em: {output_path_future_predictions}")

        else:
            print("\nNenhum dado futuro foi scorado.")

    print("\n--- Loop de Scoragem Iterativa Finalizado ---")


--- Iniciando o Loop de Scoragem Iterativa para Semanas Futuras ---
Ranks futuros a serem processados: [1, 2, 3, 4, 5]


Scoragem Iterativa Semanal:   0%|          | 0/5 [00:00<?, ?it/s]


--- Processando Week Rank: 1 ---
Criando identificador de semana global e rank...

Aplicando feature engineering ao subset de dados para o rank atual...
Criando identificador de semana global e rank...
Criando identificador de semana global e rank...
Calculando tempo_ultimo_pedido...
Calculando features defasadas e de janela móvel de forma otimizada...
Calculando contagem distinta de produtos por semana...
Salvando dados do rank atual em parquet temporário para conversão para Pandas...
Salvamento completo. Lendo parquet temporário com Pandas...
Leitura com Pandas completa.
Valores nulos preenchidos no DataFrame Pandas com -1.
Aplicando agrupamento de categorias ao DataFrame Pandas...
Aplicando agrupamento à coluna: premise
Aplicando agrupamento à coluna: categoria_pdv
Aplicando agrupamento à coluna: zipcode
Aviso: Coluna 'categoria' não encontrada no DataFrame para a transformação.
Aplicando agrupamento à coluna: tipos
Aplicando agrupamento à coluna: label
Aplicando agrupamento à colu

Unnamed: 0,mean_discount_last_15_weeks,min_quantity_last_9_weeks,std_taxes_last_18_weeks,min_quantity_last_12_weeks,std_taxes_last_12_weeks,max_quantity_last_18_weeks,std_gross_value_last_15_weeks,min_quantity_last_15_weeks,Vendas_Anteriores_lag_3,marca,...,distinct_products_last_6_weeks,subcategoria,gross_profit,discount,internal_store_id,internal_product_id,rank,global_week_id,quantity,prediction
0,17.85,3.0,0.194035,2.0,0.121843,5.0,43.571063,2.0,2.0,Outros,...,14,Lager,31.055996,20.4,7639839691455801442,3278510150933226765,60,2023-01-1,4.0,4
1,0.0,1.0,0.02693,1.0,0.024141,2.0,0.0,1.0,1.0,Bud Light,...,109,Lager,8.58,0.0,9014410406002033115,1253559410644523416,60,2023-01-1,1.0,1
2,1.2625,1.0,0.0,1.0,0.0,2.0,1.285,1.0,1.0,Yuengling Lager,...,326,Lager,38.851688,16.1,8089063016528764381,1085687032950723886,60,2023-01-1,7.0,7
3,7.4,2.0,4.298753,2.0,5.264875,5.0,35.983356,2.0,5.0,Bud Ice,...,72,Lager,23.956614,9.25,9166701173094509446,4570988632387137438,60,2023-01-1,5.0,5
4,0.0,1.0,0.0,1.0,0.0,1.0,0.0,1.0,1.0,Budweiser,...,14,Lager,7.412242,0.0,8184565139377519225,1271951761907094129,60,2023-01-1,2.0,2



Previsões futuras salvas em: /content/drive/MyDrive/Hackaton - Big Data/data/processed/jan2023_predictions.parquet

--- Loop de Scoragem Iterativa Finalizado ---


In [57]:
df_scored_future_final_pandas.shape

(99153, 52)

In [58]:
df_scored_future_final_pandas_grouped = df_scored_future_final_pandas.groupby(['internal_store_id', 'internal_product_id', 'global_week_id']).agg({'quantity': 'sum'}).reset_index()

rename_columns = {
    'internal_store_id': 'pdv',
    'internal_product_id': 'produto',
    'global_week_id': 'semana',
    'quantity': 'quantidade'
}
df_scored_future_final_pandas_grouped = df_scored_future_final_pandas_grouped.rename(columns=rename_columns)

df_scored_future_final_pandas_grouped['semana'] = df_scored_future_final_pandas_grouped['semana'].apply(lambda x: x.split('-')[-1])

# Define o caminho de saída para o arquivo CSV
output_path_future_predictions = os.path.join(project_root_base, "data", "processed", f"jan2023_predictions_{lista_ids}.csv")

# Salva o DataFrame em um arquivo CSV
df_scored_future_final_pandas_grouped[['semana', 'pdv', 'produto', 'quantidade']].to_csv(
    output_path_future_predictions,
    sep=';',
    encoding='utf-8',
    index=False # 'index=False' é importante para não salvar o índice do DataFrame
)

print(f"\nPrevisões futuras salvas em: {output_path_future_predictions}")


Previsões futuras salvas em: /content/drive/MyDrive/Hackaton - Big Data/data/processed/jan2023_predictions_4.csv


In [59]:
import pandas as pd
import os

# Define o caminho base para os arquivos processados
processed_dir = os.path.join(project_root_base, "data", "processed")

# Lista para armazenar os caminhos dos arquivos CSV gerados
csv_files = []
# Assumindo que você processou as listas de 1 a 4
for i in range(1, 5):
    file_path = os.path.join(processed_dir, f"jan2023_predictions_{i}.csv")
    if os.path.exists(file_path):
        csv_files.append(file_path)
    else:
        print(f"Aviso: Arquivo não encontrado: {file_path}")

if not csv_files:
    print("Erro: Nenhum arquivo CSV encontrado para concatenar.")
else:
    print(f"Encontrados {len(csv_files)} arquivos CSV para concatenar.")

    # Lista para armazenar os DataFrames lidos
    dfs_to_concat = []

    # Lê cada arquivo CSV e armazena em uma lista de DataFrames
    print("Lendo arquivos CSV...")
    for file in csv_files:
        try:
            df = pd.read_csv(file, sep=';', encoding='utf-8')
            dfs_to_concat.append(df)
            print(f"Lido {file}")
        except Exception as e:
            print(f"Erro ao ler o arquivo {file}: {e}")

    if dfs_to_concat:
        # Concatena todos os DataFrames
        print("Concatenando DataFrames...")
        combined_df = pd.concat(dfs_to_concat, ignore_index=True)
        print("Concatenação completa.")

        # Define o caminho de saída para o arquivo CSV combinado
        output_path_combined = os.path.join(processed_dir, "jan2023_predictions.csv")

        # Salva o DataFrame combinado em um arquivo CSV
        print(f"\nSalvando DataFrame combinado em: {output_path_combined}")
        try:
            # index=False é importante para não salvar o índice do DataFrame
            combined_df.to_csv(output_path_combined, sep=';', encoding='utf-8', index=False)
            print("DataFrame combinado salvo com sucesso.")
        except Exception as e:
            print(f"Erro ao salvar o DataFrame combinado: {e}")
    else:
        print("Nenhum DataFrame foi lido com sucesso para concatenar.")

Encontrados 4 arquivos CSV para concatenar.
Lendo arquivos CSV...
Lido /content/drive/MyDrive/Hackaton - Big Data/data/processed/jan2023_predictions_1.csv
Lido /content/drive/MyDrive/Hackaton - Big Data/data/processed/jan2023_predictions_2.csv
Lido /content/drive/MyDrive/Hackaton - Big Data/data/processed/jan2023_predictions_3.csv
Lido /content/drive/MyDrive/Hackaton - Big Data/data/processed/jan2023_predictions_4.csv
Concatenando DataFrames...
Concatenação completa.

Salvando DataFrame combinado em: /content/drive/MyDrive/Hackaton - Big Data/data/processed/jan2023_predictions.csv
DataFrame combinado salvo com sucesso.
