In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.seasonal import seasonal_decompose
from prophet import Prophet
from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_absolute_error, mean_squared_error
from statsmodels.tsa.stattools import adfuller
import numpy as np
import cmdstanpy

In [None]:
df = pd.read_excel('/content/dataset_25.xlsx')
print(df.columns)
df.head()

In [None]:
df.info()

In [None]:
df.describe()

In [None]:
df["date"] = pd.to_datetime(df["date"])

In [None]:
# Combination of attributes
df['vol'] = df['vol'] / 15
# ou, se quiser renomear a coluna depois
df.rename(columns={'vol': 'qtd_vendida'}, inplace=True)

# Ordem cronológica da série
df = df.sort_values('date').reset_index(drop=True)

# View combination
df.head()

In [None]:
# Identificando se existe Datas Faltantes

# Gerando todas as datas do intervalo esperado
datas_completas = pd.date_range(start=df['date'].min(), end=df['date'].max(), freq='D')

# Verificando quais datas estão faltando
datas_faltando  = datas_completas.difference(df['date'])

datas_faltando

In [None]:
df_datas_completas = pd.DataFrame(datas_completas, columns=["date"])


In [None]:
df_vendas_diarias =  df.groupby('date')['qtd_vendida'].sum().reset_index()


df_vendas_diarias

In [None]:
# Mesclado o DataFrame de datas completas com o DataFrame de vendas diárias agregadas. Usamos um left merge para manter todas as datas.
# O fillna(0) preenche os dias que não tinham vendas (datas faltantes) com 0.
# Renomeado as colunas para clareza. df_completo é a série temporal principal que vamos analisar e modelar.

df_completo = df_datas_completas.merge(df_vendas_diarias, on="date", how="left").fillna(0)
# Renomeia as colunas date para data e money para qtd_vendida, já que a coluna money foi agregada por quantidade (count())
df_completo.columns = ["data", "qtd_vendida"]
df_completo

In [None]:
df_zero = df_completo[df_completo['qtd_vendida'] == 0]
df_zero

In [None]:
# transformando a coluna 'data' em índice do DataFrame
df_index_data = df_completo.set_index('data')

# Criando uma série (Series) com apenas os valores da coluna 'qtd_vendida'
serie_vendas = df_index_data['qtd_vendida']

# Calculando Media Movel Para detectar tendências mensais (entender a tendência real de crescimento ou queda)
media_movel = serie_vendas.rolling(window=30).mean()

#Calculando Desvio Padrão Movel
desvio_movel = serie_vendas.rolling(window=30).std()


plt.figure(figsize=(14,6))
plt.plot(serie_vendas, label='Vendas Diárias')
plt.plot(media_movel, label='Média Móvel (30 dias)', linestyle='--')
plt.plot(desvio_movel, label='Desvio Padrão Móvel (30 dias)', linestyle=':')
plt.xlabel('Data')
plt.ylabel('Qtd de Cafés Vendidos')
plt.title('Análise de Vendas')
plt.legend()
plt.grid()
plt.show()

In [None]:
# Decomposição da série temporal
decomposicao = seasonal_decompose(serie_vendas, model='additive', period=7)

# Plotar os componentes

# Gráfico manual com maior tamanho
fig, axs = plt.subplots(4, 1, figsize=(14, 10), sharex=True)

axs[0].plot(decomposicao.observed, label='Original')
axs[0].set_ylabel('Observado')
axs[0].legend()

axs[1].plot(decomposicao.trend, label='Trend', color='orange')
axs[1].set_ylabel('Tendência')
axs[1].legend()

axs[2].plot(decomposicao.seasonal, label='Seasonal', color='green')
axs[2].set_ylabel('Sazonalidade')
axs[2].legend()

axs[3].plot(decomposicao.resid, label='Residual', color='red')
axs[3].set_ylabel('Residual')
axs[3].legend()

plt.suptitle('Decomposição da Série Temporal', fontsize=16)
plt.xlabel('Data')
plt.tight_layout()
plt.show()

In [None]:
# Preparação dos dados para o Prophet
df_prophet = serie_vendas.reset_index()
df_prophet.columns = ['ds', 'y']
df_prophet.head()

In [None]:
# Função treinar_e_prever_prophet()

def treinar_e_prever_prophet(df_treino, df_teste):
    modelo = Prophet()
    modelo.fit(df_treino)
    futuro = modelo.make_future_dataframe(periods=len(df_teste))
    previsao = modelo.predict(futuro)
    datas = df_teste['ds'].tolist()
    previsao_filtrada = previsao[previsao['ds'].isin(datas)]
    previsao_filtrada = previsao_filtrada.set_index('ds').loc[datas].reset_index()
    return previsao_filtrada

In [None]:
# Validação Cruzada Temporal com Métricas

# Parâmetros da validação cruzada
tamanho_inicial_treino = int(len(df_prophet) * 0.6)
horizonte_previsao = int(len(df_prophet) * 0.1)

metricas_prophet = []

# Validação cruzada temporal
for i in range(tamanho_inicial_treino, len(df_prophet) - horizonte_previsao, horizonte_previsao):
    df_treino = df_prophet.iloc[:i]
    df_teste = df_prophet.iloc[i:i + horizonte_previsao]

    previsao = treinar_e_prever_prophet(df_treino, df_teste)

    mae = mean_absolute_error(df_teste['y'], previsao['yhat'])
    rmse = np.sqrt(mean_squared_error(df_teste['y'], previsao['yhat']))

    metricas_prophet.append({'mae': mae, 'rmse': rmse})
    print(f"Iteração {len(metricas_prophet)} - MAE: {mae:.2f}, RMSE: {rmse:.2f}")

mae_medio = np.mean([m['mae'] for m in metricas_prophet])
rmse_medio = np.mean([m['rmse'] for m in metricas_prophet])

print("\nMétricas Médias com Validação Cruzada Temporal para Prophet:")
print(f"MAE Médio: {mae_medio:.2f}")
print(f"RMSE Médio: {rmse_medio:.2f}")

In [None]:
# Visualização das Previsões por Fold

plt.figure(figsize=(14, 6))
plt.plot(df_prophet['ds'], df_prophet['y'], label='Real', color='black')

for i in range(len(metricas_prophet)):
    inicio = tamanho_inicial_treino + i * horizonte_previsao
    fim = inicio + horizonte_previsao
    if fim > len(df_prophet):
        break
    df_treino = df_prophet.iloc[:inicio]
    df_teste = df_prophet.iloc[inicio:fim]

    previsao = treinar_e_prever_prophet(df_treino, df_teste)

    plt.plot(df_teste['ds'], previsao['yhat'], label=f'Previsão Fold {i+1}', linestyle='--')

plt.title('Validação Cruzada Temporal - Previsões Prophet')
plt.xlabel('Data')
plt.ylabel('Receita prevista')
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
# Teste Aumentado de Dickey-Fuller para verificar estacionaridade
resultado_adf = adfuller(serie_vendas)

print(f'Estatística ADF: {resultado_adf[0]}')
print(f'Valor-p: {resultado_adf[1]}')
print('Valores Críticos:')
for chave, valor in resultado_adf[4].items():
    print(f'\t{chave}: {valor}')

if resultado_adf[1] <= 0.05:
    print("\nA série é estacionária (rejeitamos a hipótese nula).")
else:
    print("\nA série NÃO é estacionária (não rejeitamos a hipótese nula).")

In [None]:
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

# Plotagem do ACF para a série diferenciada
plt.figure(figsize=(12, 6))
plot_acf(serie_vendas, lags=40, ax=plt.gca())
plt.title('Função de Autocorrelação (ACF) - Série Diferenciada')
plt.xlabel('Defasagem')
plt.ylabel('Autocorrelação')
plt.tight_layout()
plt.show()

# Plotagem do PACF para a série diferenciada
plt.figure(figsize=(12, 6))
plot_pacf(serie_vendas, lags=40, ax=plt.gca())
plt.title('Função de Autocorrelação Parcial (PACF) - Série Diferenciada')
plt.xlabel('Defasagem')
plt.ylabel('Autocorrelação Parcial')
plt.tight_layout()
plt.show()

In [None]:
# Função para treinar e prever com ARIMA

def treinar_e_prever_arima(serie_treino, serie_teste, order):
    # O modelo ARIMA assume que a série de entrada é a série original
    # e o parâmetro 'order' lida com a diferenciação
    modelo = ARIMA(serie_treino, order=order)
    resultado = modelo.fit()

    # O método forecast() prevê para o número especificado de passos à frente
    previsao = resultado.forecast(steps=len(serie_teste))

    # Retornar um DataFrame similar ao do Prophet para facilitar a comparação
    previsao_df = pd.DataFrame({
        'ds': serie_teste.index,
        'yhat': previsao.values # previsao.values contém os valores previstos
    })
    return previsao_df

In [None]:
# Ordem do modelo ARIMA com base na análise de ACF/PACF e ADF
ordem_arima_definida = (1, 1, 1)

# Validação Cruzada Temporal com Métricas para ARIMA

# Parâmetros da validação cruzada
tamanho_inicial_treino = int(len(serie_vendas) * 0.6) # Usando o tamanho da série original
horizonte_previsao = int(len(serie_vendas) * 0.1)   # Usando o tamanho da série original

metricas_arima = [] # Lista para as métricas do ARIMA

# Validação cruzada temporal
# Loop sobre os índices da série original
for i in range(tamanho_inicial_treino, len(serie_vendas) - horizonte_previsao, horizonte_previsao):

    # Para o ARIMA, precisamos da série temporal na escala original (objeto Series do pandas)
    # O objeto 'serie' já está com o índice de data e valores agregados diariamente
    serie_treino_arima = serie_vendas.iloc[:i]
    serie_teste_arima = serie_vendas.iloc[i:i + horizonte_previsao]

    # Previsão com ARIMA
    try:
        previsao_arima = treinar_e_prever_arima(serie_treino_arima, serie_teste_arima, ordem_arima_definida)
        # Comparar com os valores reais da série_teste_arima (objeto Series)
        mae_arima = mean_absolute_error(serie_teste_arima.values, previsao_arima['yhat'])
        rmse_arima = np.sqrt(mean_squared_error(serie_teste_arima.values, previsao_arima['yhat']))
        metricas_arima.append({'mae': mae_arima, 'rmse': rmse_arima})
        print(f"Iteração {len(metricas_arima)} - ARIMA (MAE: {mae_arima:.2f}, RMSE: {rmse_arima:.2f})")
    except Exception as e:
        # Tratar possíveis erros durante o treinamento do ARIMA (pode acontecer com certas ordens ou dados)
        print(f"Erro ao treinar ou prever com ARIMA na iteração {len(metricas_arima) + 1}: {e}")
        metricas_arima.append({'mae': np.nan, 'rmse': np.nan}) # Adicionar NaN para não quebrar o cálculo da média

# Cálculo das métricas médias
mae_medio_arima = np.nanmean([m['mae'] for m in metricas_arima]) # Usar nanmean para ignorar NaNs se houver erros
rmse_medio_arima = np.nanmean([m['rmse'] for m in metricas_arima])

print("\nMétricas Médias com Validação Cruzada Temporal para ARIMA:")
print(f"MAE Médio: {mae_medio_arima:.2f}")
print(f"RMSE Médio: {rmse_medio_arima:.2f}")

In [None]:
# Visualização das Previsões por Fold para ARIMA

plt.figure(figsize=(14, 6))
# Plotar a série real completa para referência
plt.plot(serie_vendas.index, serie_vendas.values, label='Real', color='black')

# Parâmetros da validação cruzada (certifique-se de que são os mesmos usados na avaliação)
tamanho_inicial_treino = int(len(serie_vendas) * 0.6)
horizonte_previsao = int(len(serie_vendas) * 0.1)
ordem_arima_definida = (1, 1, 1) # Use a ordem que você definiu

# Loop para gerar previsões em cada fold e plotar
# O loop itera sobre os índices da série original
fold_count = 0 # Contador para o número do fold
for i in range(tamanho_inicial_treino, len(serie_vendas) - horizonte_previsao, horizonte_previsao):
    fold_count += 1
    inicio = i
    fim = i + horizonte_previsao

    # Certificar-se de que o fold de teste não ultrapassa o tamanho da série
    if fim > len(serie_vendas):
        break

    # Extrair dados de treino e teste para o fold atual
    serie_treino_arima = serie_vendas.iloc[:inicio]
    serie_teste_arima = serie_vendas.iloc[inicio:fim]

    # Gerar previsões para o fold de teste
    try:
        previsao_arima = treinar_e_prever_arima(serie_treino_arima, serie_teste_arima, ordem_arima_definida)

        # Plotar as previsões do ARIMA para o fold atual
        # Use um rótulo único para cada fold
        plt.plot(previsao_arima['ds'], previsao_arima['yhat'], label=f'Previsão ARIMA Fold {fold_count}', linestyle='--')

    except Exception as e:
        print(f"Erro ao gerar previsão para plotagem do ARIMA no Fold {fold_count}: {e}")
        # Se ocorrer um erro, não plotamos este fold

plt.title('Validação Cruzada Temporal - Previsões ARIMA')
plt.xlabel('Data')
plt.ylabel('Receita prevista')
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
# Comparação de Desempenho - Previsão de 1 Passo

# Definir o tamanho inicial do treino (pode ser o mesmo da validação cruzada ou um valor fixo)
tamanho_inicial_treino_1_passo = int(len(serie_vendas) * 0.6) # Ou outro valor apropriado

previsoes_reais_1_passo = []
previsoes_prophet_1_passo = []
previsoes_arima_1_passo = []

# Loop para previsão de 1 passo
# Começa onde termina o tamanho inicial do treino
for i in range(tamanho_inicial_treino_1_passo, len(serie_vendas)):

    # Obter dados de treino para o passo atual
    df_treino_prophet = df_prophet.iloc[:i]
    serie_treino_arima = serie_vendas.iloc[:i]

    # Obter o valor real do próximo passo
    if i < len(serie_vendas):
        valor_real_proximo_passo = serie_vendas.iloc[i]
        data_proximo_passo = serie_vendas.index[i]
    else:
        # Isso só aconteceria se o loop chegasse ao fim da série
        break # Sair do loop se não houver mais dados reais para comparar

    # Armazenar o valor real
    previsoes_reais_1_passo.append({'ds': data_proximo_passo, 'y': valor_real_proximo_passo})

    # Prever 1 passo à frente com Prophet
    try:
        # Prophet precisa de um DataFrame de futuro com a data do próximo passo
        futuro_prophet_1_passo = pd.DataFrame({'ds': [data_proximo_passo]})
        previsao_prophet = Prophet().fit(df_treino_prophet).predict(futuro_prophet_1_passo)
        previsoes_prophet_1_passo.append({'ds': data_proximo_passo, 'yhat': previsao_prophet['yhat'].iloc[0]})
    except Exception as e:
        print(f"Erro ao prever 1 passo com Prophet na data {data_proximo_passo}: {e}")
        previsoes_prophet_1_passo.append({'ds': data_proximo_passo, 'yhat': np.nan})


    # Prever 1 passo à frente com ARIMA
    try:
        # ARIMA prevê o número especificado de passos à frente (aqui 1)
        modelo_arima_1_passo = ARIMA(serie_treino_arima, order=ordem_arima_definida)
        resultado_arima_1_passo = modelo_arima_1_passo.fit()
        previsao_arima = resultado_arima_1_passo.forecast(steps=1)
        previsoes_arima_1_passo.append({'ds': data_proximo_passo, 'yhat': previsao_arima.iloc[0]}) # ARIMA forecast retorna uma Series ou array
    except Exception as e:
        print(f"Erro ao prever 1 passo com ARIMA na data {data_proximo_passo}: {e}")
        previsoes_arima_1_passo.append({'ds': data_proximo_passo, 'yhat': np.nan})


# Converter listas de previsões para DataFrames para facilitar o cálculo de métricas
df_reais_1_passo = pd.DataFrame(previsoes_reais_1_passo).set_index('ds')
df_prophet_1_passo = pd.DataFrame(previsoes_prophet_1_passo).set_index('ds')
df_arima_1_passo = pd.DataFrame(previsoes_arima_1_passo).set_index('ds')

# Juntar os DataFrames para garantir que as comparações sejam feitas nas mesmas datas
df_comparacao_1_passo = df_reais_1_passo.join(df_prophet_1_passo, rsuffix='_prophet').join(df_arima_1_passo, rsuffix='_arima').dropna()


# Calcular métricas de avaliação para previsão de 1 passo
mae_prophet_1_passo = mean_absolute_error(df_comparacao_1_passo['y'], df_comparacao_1_passo['yhat'])
rmse_prophet_1_passo = np.sqrt(mean_squared_error(df_comparacao_1_passo['y'], df_comparacao_1_passo['yhat']))

mae_arima_1_passo = mean_absolute_error(df_comparacao_1_passo['y'], df_comparacao_1_passo['yhat_arima']) # Usar 'yhat_arima' após o join
rmse_arima_1_passo = np.sqrt(mean_squared_error(df_comparacao_1_passo['y'], df_comparacao_1_passo['yhat_arima']))


print("\nMétricas de Desempenho - Previsão de 1 Passo:")
print("Prophet:")
print(f"MAE: {mae_prophet_1_passo:.2f}")
print(f"RMSE: {rmse_prophet_1_passo:.2f}")
print("\nARIMA:")
print(f"MAE: {mae_arima_1_passo:.2f}")
print(f"RMSE: {rmse_arima_1_passo:.2f}")

# Opcional: Visualizar as previsões de 1 passo
plt.figure(figsize=(14, 6))
plt.plot(df_comparacao_1_passo.index, df_comparacao_1_passo['y'], label='Real', color='black')
plt.plot(df_comparacao_1_passo.index, df_comparacao_1_passo['yhat'], label='Previsão Prophet (1 passo)', linestyle='--')
plt.plot(df_comparacao_1_passo.index, df_comparacao_1_passo['yhat_arima'], label='Previsão ARIMA (1 passo)', linestyle='--')
plt.title('Comparação de Previsões (1 Passo)')
plt.xlabel('Data')
plt.ylabel('Qtd')
plt.legend()
plt.tight_layout()
plt.show()