In [135]:
import os
import pyodbc
import numpy as np
import pandas as pd
import unicodedata
import holidays
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.io as pio

from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler, OneHotEncoder, RobustScaler
from scipy.stats import skew, kurtosis
from dotenv import load_dotenv
from datetime import datetime
from datetime import date
from IPython.display import display
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import cross_val_score, learning_curve, validation_curve
from sklearn.preprocessing import PowerTransformer
from sklearn.model_selection import RandomizedSearchCV
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor
from catboost import CatBoostRegressor

In [5]:
#Tranformar a data em datetime
df['DataEmissao'] = pd.to_datetime(df['DataEmissao'], errors='coerce', dayfirst=True)

In [6]:
# Add colunas de data, mês, dia, trimeste no dataframe
df['Ano'] = df['DataEmissao'].dt.year
df['Mes'] = df['DataEmissao'].dt.month
df['DiaMes'] = df['DataEmissao'].dt.day

# Criar lista com os nomes dos dias da semana em português
DiaSemana = ['segunda-feira', 'terça-feira', 'quarta-feira', 'quinta-feira', 'sexta-feira', 'sábado', 'domingo']

# Mapear o número para o nome
df['DiaSemana'] = df['DataEmissao'].map(lambda x: DiaSemana[x.weekday()])

# Versão Numérica dos dias da semana
df['DiaSemanaNumero'] = pd.to_datetime(df['DataEmissao']).dt.weekday  # 0=segunda, ..., 6=domingo

In [7]:
#Inclusão de Booleano de Feriados Nacionais e Estaduais por Estado.
# Garantir que a data está em datetime
df['DataEmissao'] = pd.to_datetime(df['DataEmissao'])

# Lista de todos os estados brasileiros
estados = [
    'AC', 'AL', 'AP', 'AM', 'BA', 'CE', 'DF', 'ES', 'GO',
    'MA', 'MT', 'MS', 'MG', 'PA', 'PB', 'PR', 'PE', 'PI',
    'RJ', 'RN', 'RS', 'RO', 'RR', 'SC', 'SP', 'SE', 'TO'
]

# Intervalo de anos com base na data
anos = list(range(2022, date.today().year + 1))

# Feriados nacionais
feriados_nacionais = holidays.Brazil(years=anos)

# Coluna booleana de feriado nacional
df['FeriadoNacional'] = df['DataEmissao'].apply(lambda x: x in feriados_nacionais)

# Carregar feriados estaduais uma única vez
feriados_estaduais = {
    uf: holidays.Brazil(years=anos, state=uf) for uf in estados
}

# Criar colunas booleanas para cada estado
for uf in estados:
    df[f'Feriado_{uf}'] = df['DataEmissao'].apply(lambda x: x in feriados_estaduais[uf])

In [10]:
# Dia útil
# Criar coluna feriado no estado da agência (usa a coluna dinamicamente)
df['FeriadoEstadoAgencia'] = df.apply(
    lambda row: row[f"Feriado_{row['Estado']}"], axis=1
)

# Versão Numérica dos dias da semana
#df['DiaSemanaNumero'] = pd.to_datetime(df['DataEmissao']).dt.weekday  # 0=segunda, ..., 6=domingo

# Criar coluna dia útil
df['dia_util'] = (
    (df['DiaSemanaNumero'] < 5) &
    (~df['FeriadoNacional']) &
    (~df['FeriadoEstadoAgencia'])
)

In [None]:
def diagnostica_vazios(df):
    resultado = []

    for col in df.columns:
        tipo = str(df[col].dtype)
        total_linhas = len(df)
        nulos = df[col].isnull().sum()
        vazios = 0
        espacos = 0
        zeros = 0
        total_vazios = nulos  # vai ser atualizado conforme o tipo

        # Diagnóstico para colunas texto
        if tipo in ['object', 'string']:
            vazios = (df[col] == '').sum()
            espacos = (df[col].astype(str).str.strip() == '').sum()
            total_vazios = nulos + espacos

        # Diagnóstico para colunas numéricas
        elif 'int' in tipo or 'float' in tipo:
            zeros = (df[col] == 0).sum()
            total_vazios = nulos + zeros

        # Diagnóstico para booleanas (somente nulos são relevantes)
        elif 'bool' in tipo:
            pass  # já está contabilizado como total_vazios = nulos

        resultado.append({
            'coluna': col,
            'tipo': tipo,
            'nulos': nulos,
            'vazios': vazios,
            'espacos': espacos,
            'zeros': zeros,
            'total_vazios': total_vazios,
            'percentual_nulos': round(nulos / total_linhas * 100, 2),
            'percentual_zeros': round(zeros / total_linhas * 100, 2) if zeros > 0 else 0
        })

    diagnostico_df = pd.DataFrame(resultado)
    diagnostico_df = diagnostico_df.sort_values(by='total_vazios', ascending=False)

    return diagnostico_df

# Exibir

diagnostico = diagnostica_vazios(df)
display(diagnostico)

In [13]:
# Remover registros com o 'GRUPO' vazio (nulo ou string vazia)
df = df[~(df['Grupo'].isnull() | (df['Grupo'].astype(str).str.strip() == ''))]

In [14]:
# Resetar index
df = df.reset_index(drop=True)

In [None]:
df['SaldoReal'].describe()

In [None]:
df.describe(include='object') #categórico

In [None]:
#DISTRIBUIÇÃO DA VARIÁVEL
plt.figure(figsize=(10, 5))
plt.hist(df['SaldoReal'], bins=50, edgecolor='black')
plt.title('Distribuição da variável "SaldoReal"')
plt.xlabel('SaldoReal')
plt.ylabel('Frequência')
plt.axvline(0, color='red', linestyle='--', label='Zero')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
# Zoom na cauda principal da distribuição (excluir extremos para ver melhor o centro)
plt.figure(figsize=(10, 5))
df['SaldoReal'].clip(lower=df['SaldoReal'].quantile(0.01), upper=df['SaldoReal'].quantile(0.99)).hist(bins=50)
plt.title('Distribuição de SaldoReal (sem extremos)')
plt.xlabel('SaldoReal (percentil 1% a 99%)')
plt.ylabel('Frequência')
plt.grid(True)
plt.tight_layout()
plt.show()

In [21]:
def comportamento_saldo(df, data_col='DataEmissao', target_col='SaldoReal'):
    # Converter data
    df = df.copy()
    df[data_col] = pd.to_datetime(df[data_col], errors='coerce', dayfirst=True)
    df = df.dropna(subset=[data_col])  # remove datas inválidas

    # Extrair partes da data
    df['Mes'] = df[data_col].dt.month
    df['DiaMes'] = df[data_col].dt.day
    df['DiaSemana'] = df[data_col].dt.dayofweek  # 0 = segunda
    df['mes_ano'] = df[data_col].dt.to_period('M')

    # 1. Por mês (1-12)
    saldo_mes = df.groupby('Mes')[target_col].sum()

    plt.figure(figsize=(10, 4))
    saldo_mes.plot(kind='bar')
    plt.title('Comportamento do SaldoReal por Mês')
    plt.xlabel('Mês')
    plt.ylabel('SaldoReal')
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    # 2. Por dia do mês (1-31)
    saldo_dia = df.groupby('DiaMes')[target_col].sum()

    plt.figure(figsize=(12, 4))
    saldo_dia.plot()
    plt.title('Comportamento do SaldoReal por Dia do Mês')
    plt.xlabel('Dia do Mês')
    plt.ylabel('SaldoReal')
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    # 3. Por dia da semana (segunda a domingo)
    dias_semana = ['segunda-feira', 'terça-feira', 'quarta-feira', 'quinta-feira', 'sexta-feira', 'sábado', 'domingo']
    saldo_semana = df.groupby('DiaSemana')[target_col].mean()

    plt.figure(figsize=(8, 4))
    saldo_semana.index = dias_semana
    saldo_semana.plot(kind='bar', color='orange')
    plt.title('Média de SaldoReal por Dia da Semana')
    plt.xlabel('Dia da Semana')
    plt.ylabel('SaldoReal Médio')
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    # 4. Por mês-ano (tendência temporal)
    saldo_mes_ano = df.groupby('mes_ano')[target_col].sum()

    plt.figure(figsize=(14, 5))
    saldo_mes_ano.plot(marker='o')
    plt.title('Comportamento do SaldoReal por Mês-Ano')
    plt.xlabel('Mês-Ano')
    plt.ylabel('SaldoReal')
    plt.xticks(rotation=45)
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    return saldo_mes, saldo_dia, saldo_semana, saldo_mes_ano

In [None]:
saldo_mes, saldo_dia, saldo_semana, saldo_mes_ano = comportamento_saldo(df, data_col='DataEmissao', target_col='SaldoReal')

In [None]:
# 1. Base: coluna original
y = df['SaldoReal'].copy()

# 2. Transformações
logsim_y = np.sign(y) * np.log1p(np.abs(y))  # log com sinal
cbrt_y = np.cbrt(y)                          # raiz cúbica
power = PowerTransformer(method='yeo-johnson')
power_y = power.fit_transform(y.values.reshape(-1, 1))
quantile = QuantileTransformer(output_distribution='normal', random_state=83)
quantile_y = quantile.fit_transform(y.values.reshape(-1, 1))

# 3. Métricas estatísticas
print("📊 Estatísticas de Assimetria (skew) e Curtose (kurtosis):\n")

def estatisticas(nome, vetor):
    print(f"{nome:<25} →  skew: {skew(vetor):>6.2f}  |  kurtosis: {kurtosis(vetor):>6.2f}")

estatisticas("Original", y)
estatisticas("Log com sinal", logsim_y)
estatisticas("Raiz cúbica", cbrt_y)
estatisticas("PowerTransformer (YJ)", power_y.ravel())
estatisticas("QuantileTransformer", quantile_y.ravel())

# 4. Gráficos
fig, axs = plt.subplots(3, 2, figsize=(14, 12))

sns.histplot(y, bins=60, ax=axs[0, 0], kde=True)
axs[0, 0].set_title('Original')

sns.histplot(logsim_y, bins=60, ax=axs[0, 1], kde=True)
axs[0, 1].set_title('Log com sinal (sign*log1p(abs(x)))')

sns.histplot(cbrt_y, bins=60, ax=axs[1, 0], kde=True)
axs[1, 0].set_title('Raiz cúbica (cbrt)')

sns.histplot(power_y.ravel(), bins=60, ax=axs[1, 1], kde=True)
axs[1, 1].set_title('PowerTransformer (Yeo-Johnson)')

sns.histplot(quantile_y.ravel(), bins=60, ax=axs[2, 0], kde=True)
axs[2, 0].set_title('QuantileTransformer (Normal)')

axs[2, 1].axis('off')  # espaço vazio

plt.tight_layout()
plt.show()

In [None]:
df.columns

In [None]:
df.info()

In [None]:
# Selecionar apenas as colunas numéricas
df_numerico = df.select_dtypes(include=['number'])

# Calcular a correlação entre as colunas numéricas
corr_matrix = df_numerico.corr().abs()

# Exibir a matriz de correlação
corr_matrix

In [112]:
target = 'SaldoReal'
features = [
    'Empresa', 
    'GrupoEmpresaServico', 
    'CanalDeVenda', 
    'PontoDeVenda', 
    'Estado', 
    'Cidade', 
    'Grupo', 
    'Ano', 
    'Mes', 
    'DiaMes', 
    'FeriadoNacional',
    'FeriadoEstadoAgencia', 
    'dia_util'
]

In [113]:
# Subconjunto apenas com as variáveis selecionadas
df_model = df[features + [target]].copy()
df_model = df_model.sample(frac=1, random_state=83) #manter a aleatoriedade

In [None]:
# Cardinalidade - valores únicos

cat_features = ['Empresa', 
                'GrupoEmpresaServico', 
                'CanalDeVenda', 
                'PontoDeVenda', 
                'Estado', 
                'Cidade', 
                'Grupo']

print("📊 Cardinalidade das variáveis categóricas:\n")
for col in cat_features:
    unicos = df_model[col].nunique()
    print(f"{col:<20}: {unicos} valores únicos")

In [116]:
# Padronização das features de Texto (minúsculas, remover espaços, acentos)

def limpar_string(texto):
    if pd.isnull(texto):
        return texto
    texto = str(texto).strip().lower()
    texto = unicodedata.normalize('NFKD', texto).encode('ascii', errors='ignore').decode('utf-8')
    texto = ' '.join(texto.split())  # remove espaços extras
    return texto

In [117]:
# Tratar os dados

# Tratar valores e padronizar Strings
cat_features =  ['Empresa', 'GrupoEmpresaServico', 'CanalDeVenda', 'PontoDeVenda', 'Estado', 'Cidade', 'Grupo']
for col in cat_features:
    df_model[col] = df_model[col].astype(str).fillna('desconhecido').apply(limpar_string)

# Salvar colunas originais de alta cardinalidade e reduzir cardinalidade das categóricas
colunas_para_reduzir = ['PontoDeVenda', 'Estado', 'Cidade', 'Grupo']

# Definir o limite de frequência mínima para manter um ponto de venda
limite_frequencia = 50  # Número mínimo de ocorrências para um PontoDeVenda ser mantido como original

for col in colunas_para_reduzir:
     if col in df.columns:
         # Salvar a variável original
         df_model[col + '_ORIGINAL'] = df[col]
         
         # Limpar e preencher valores nulos
         df_model[col] = df[col].fillna('desconhecido').apply(limpar_string)
         
         # Contar as ocorrências de cada valor na coluna
         value_counts = df_model[col].value_counts()
         
         # Selecionar os valores que atendem ao limite de frequência (pontos de venda mais frequentes)
         top = value_counts[value_counts >= limite_frequencia].index
         
         # Aplicar a regra de "outros" para pontos de venda que não atendem ao limite de frequência
         df_model[col] = df_model[col].apply(lambda x: x if x in top else 'outros')
         
         # Adicionar a coluna ao conjunto de features (se ainda não estiver presente)
         if col not in features:
             features.append(col)

# Converter booleanas explicitamente para 0/1
bool_features = ['FeriadoNacional', 'FeriadoEstadoAgencia', 'dia_util']
df_model[bool_features] = df_model[bool_features].fillna(False).astype(int)

# Tratar valores nulos nas numéricas (se houver)
num_features = ['Ano', 'Mes', 'DiaMes']
for col in num_features:
    df_model[col] = df_model[col].fillna(df_model[col].median())


In [119]:
#Treino e Teste

# Separar features e target original
X = df_model[features]
y = df_model[[target]].values

# Separar treino e teste
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.15, random_state=83)

In [120]:
#PipeLine de Transformação

# Pipeline para dados categóricos
cat_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(drop='first', handle_unknown='ignore'))
])

# Pipeline para dados numéricos
num_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler())
])

# Booleanos já estão como 0/1
bool_transformer = 'passthrough'

# Transformador geral
preprocessor = ColumnTransformer(transformers=[
    ('cat', cat_transformer, cat_features),
    ('num', num_transformer, num_features),
    ('bool', bool_transformer, bool_features)
])

In [None]:
# Definir e Testar Modelo

# Modelos a serem avaliados
modelos = {
    'XGBoost': XGBRegressor(random_state=83, verbosity=0),
    'LightGBM': LGBMRegressor(random_state=83)
    'CatBoost': CatBoostRegressor(verbose=0, random_state=83)
}

# Modelos com ajuste pesado de parâmetros (usar RandomizedSearch)
pesados = ['XGBoost', 
           'LightGBM', 
           'CatBoost'
           ]

# Hiperparâmetros
parametros = {
    'XGBoost': {
        'regressor__n_estimators': [100, 300, 600, 800, 1000],
        'regressor__learning_rate': [0.01, 0.03, 0.05, 0.07, 0.1],
        'regressor__max_depth': [3, 5, 7, 9],
        'regressor__subsample': [0.6, 0.8, 1.0],
        'regressor__colsample_bytree': [0.5, 0.7, 0.8, 1.0]
    },
    
    'LightGBM': {
        'regressor__n_estimators': [100, 300, 600, 800],
        'regressor__learning_rate': [0.01, 0.03, 0.05, 0.07, 0.1],
        'regressor__num_leaves': [15, 31, 50, 63, 100],
        'regressor__subsample': [0.6, 0.8, 1.0],
        'regressor__colsample_bytree': [0.5, 0.7, 0.8, 1.0]
    },
    
    'CatBoost': {
        'regressor__iterations': [300, 600, 800, 1000],
        'regressor__learning_rate': [0.01, 0.03, 0.05, 0.1],
        'regressor__depth': [4, 6, 8, 10]
    }
}

# Rodar os modelos
resultados = []
modelos_com_erros = []

for nome, modelo in modelos.items():
    print(f"\n🚀 Treinando: {nome}...")

    try:
        pipe = Pipeline(steps=[
            ('preprocessamento', preprocessor),
            ('regressor', modelo)
        ])

        if nome in parametros:
            if nome in pesados:
                search = RandomizedSearchCV(
                    estimator=pipe,
                    param_distributions=parametros[nome],
                    scoring='neg_root_mean_squared_error',
                    cv=2,                        # redução temporária
                    n_jobs=1 if nome in n_jobs_1_modelos else -1,
                    n_iter=5,
                    verbose=2,
                    random_state=83,
                    error_score='raise' 
        )
            else:
                search = GridSearchCV(
                    estimator=pipe,
                    param_grid=parametros[nome],
                    scoring='neg_root_mean_squared_error',
                    cv=3,
                    n_jobs=1 if nome in n_jobs_1_modelos else -1,
                    verbose=1
        )

            search.fit(X_train, y_train)
            best_model = search.best_estimator_
            y_pred = best_model.predict(X_test)
            print(f"✅ Melhores parâmetros para {nome}: {search.best_params_}")

        else:
            pipe.fit(X_train, y_train)
            best_model = pipe
            y_pred = pipe.predict(X_test)

        y_real = y_test.ravel()
        y_pred = y_pred.ravel()
        y_train_pred = best_model.predict(X_train).ravel()
        y_train_real = y_train.ravel()

        resultados.append({
            'modelo': nome,
            'MAE': mean_absolute_error(y_real, y_pred),
            'RMSE': np.sqrt(mean_squared_error(y_real, y_pred)),
            'R2_Teste': r2_score(y_real, y_pred),
            'R2_Treino': r2_score(y_train_real, y_train_pred)
        })

        print(f"✅ Concluído: {nome}")

    except Exception as e:
        print(f"❌ Erro ao treinar {nome}: {e}")
        modelos_com_erros.append((nome, str(e)))

# Exibir ranking
df_resultados = pd.DataFrame(resultados).sort_values(by='RMSE')
print("\n📊 Ranking de Modelos:")
display(df_resultados)

if modelos_com_erros:
    print("\n⚠️ Modelos com erro:")
    for nome, erro in modelos_com_erros:
        print(f"- {nome}: {erro}")


In [128]:
#Recuperação do modelo

melhores_parametros_lgbm = {
    'n_estimators': 600,
    'learning_rate': 0.03,
    'num_leaves': 63,
    'subsample': 0.8,
    'colsample_bytree': 1.0
}

# Recriar modelo com melhores parâmetros
modelo_lgbm = LGBMRegressor(
    n_estimators=600,
    learning_rate=0.03,
    num_leaves=63,
    subsample=0.8,
    colsample_bytree=1.0,
    random_state=83
)

# Pipeline com o mesmo preprocessor
modelo_final = Pipeline(steps=[
    ('preprocessamento', preprocessor),
    ('regressor', modelo_lgbm)
])

# Treinar com todos os dados disponíveis
modelo_final.fit(X, y)

  y = column_or_1d(y, warn=True)


[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.120380 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 5892
[LightGBM] [Info] Number of data points in the train set: 4152386, number of used features: 2924
[LightGBM] [Info] Start training from score 990.810422


In [None]:
y_pred = modelo_final.predict(X)
y_true = y.ravel()

mae = mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
r2 = r2_score(y_true, y_pred)

print("\n📊 Avaliação final (conjunto completo):")
print(f"MAE  : {mae:.2f}")
print(f"RMSE : {rmse:.2f}")
print(f"R²   : {r2:.4f}")

In [130]:
# Dicionário para salvar os valores mantidos por coluna
top_categorias = {}

# Redução de cardinalidade
limite_frequencia = 50
colunas_para_reduzir = ['PontoDeVenda', 'Estado', 'Cidade', 'Grupo']

for col in colunas_para_reduzir:
    if col in df.columns:
        df_model[col + '_ORIGINAL'] = df[col]
        df_model[col] = df[col].fillna('desconhecido').apply(limpar_string)

        value_counts = df_model[col].value_counts()
        top = value_counts[value_counts >= limite_frequencia].index

        # SALVA os top valores dessa coluna para uso futuro
        top_categorias[col] = list(top)

        df_model[col] = df_model[col].apply(lambda x: x if x in top else 'outros')

        if col not in features:
            features.append(col)

In [None]:
#Salvando o Modelo

# Importar dependências
import joblib
from datetime import datetime

# Definir nome do modelo com data
hoje = datetime.today().strftime('%Y-%m-%d')

nome_arquivo_modelo = f"modelo_LightGBM_{hoje}.pkl"
nome_arquivo_features = f"features_{hoje}.pkl"
nome_arquivo_topcategorias = f"top_categorias_{hoje}.pkl"

# Salvar modelo final treinado
joblib.dump(modelo_final, nome_arquivo_modelo)

# Salvar lista de features usadas (na ordem)
joblib.dump(features, nome_arquivo_features)

# Salvar top grupos reduzidos (usado na previsão futura)
joblib.dump(top_categorias, nome_arquivo_topcategorias)

print("\u2705 Modelo LightGBM e dependências salvos com sucesso:")
print("Modelo   :", nome_arquivo_modelo)
print("Features :", nome_arquivo_features)
print("Top Categorias:", nome_arquivo_topcategorias)