In [2]:
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from datetime import datetime, timedelta
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

class DataCleaner(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        X.rename(columns={
            'data': 'ds',
            'valor': 'y',
        }, inplace=True)
        X.drop(columns=['percentual', 'ano', 'decada', 'var_pct_ano_anterior'], inplace=True)
        X['ds'] = pd.to_datetime(X['ds'])
        X['y'] = X['y']
        X.sort_values(by=['ds'], ascending=True, inplace=True)
        X.set_index('ds', inplace=True)
        return X

class DataPreparer(BaseEstimator, TransformerMixin):
    def __init__(self, train_size=0.8, scaler=None):
        self.train_size = train_size
        self.scaler = scaler if scaler else MinMaxScaler(feature_range=(0, 1))

    def fit(self, X, y=None):
        close = X['y'].to_numpy().reshape(-1,1)
        train_close = int(len(close) * self.train_size)
        self.scaler.fit(close[0: train_close, :])
        return self

    def transform(self, X):
        close = X['y'].to_numpy().reshape(-1,1)
        train_close = int(len(close) * self.train_size)
        scaler_train = self.scaler.transform(close[0: train_close, :])
        scaler_test = self.scaler.transform(close[train_close:,:])

        scaled_data = list(scaler_train.reshape(len(scaler_train))) + list(scaler_test.reshape(len(scaler_test)))
        scaled_data = np.array(scaled_data).reshape(len(scaled_data),1)
        return scaled_data, train_close


class LSTMModel(BaseEstimator):
    def __init__(self, epochs=10, batch_size=50, scaler=None):
        self.epochs = epochs
        self.batch_size = batch_size
        self.model = None
        self.scaler = scaler

    def _create_model(self, input_shape):
        model = Sequential()
        model.add(LSTM(units=50, return_sequences=True, input_shape=input_shape))
        model.add(LSTM(units=50, return_sequences=True))
        model.add(LSTM(units=50))
        model.add(Dense(10))
        model.add(Dense(1))
        model.compile(optimizer='adam', loss='mean_squared_error')
        return model

    def fit(self, X, y=None):
        scaled_data, train_close = X
        train_data = scaled_data[0: train_close,:]

        X_train = []
        y_train = []
        for i in range(30, len(train_data)):
            X_train.append(train_data[i - 30:i, 0])
            y_train.append(train_data[i, 0])

        X_train, y_train = np.array(X_train), np.array(y_train)
        X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)

        self.model = self._create_model((X_train.shape[1], 1))
        self.model.fit(X_train, y_train, epochs=self.epochs, batch_size=self.batch_size)
        return self

    def predict(self, X):
        scaled_data, train_close = X
        test_data = scaled_data[train_close - 30:, :]

        X_test = []
        for i in range(30, len(test_data)):
            X_test.append(test_data[i - 30: i, 0])

        X_test = np.array(X_test)
        X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

        prev_lstm = self.model.predict(X_test)
        prev_lstm = self.scaler.inverse_transform(prev_lstm)
        return prev_lstm

def avaliar_modelo(y_test, prev_lstm):
    mse = mean_squared_error(y_test, prev_lstm)
    mae = mean_absolute_error(y_test, prev_lstm)
    r2 = r2_score(y_test, prev_lstm)
    mape = np.mean(np.abs((y_test - prev_lstm) / y_test)) * 100
    return mse, mae, r2, mape

In [3]:
# Carregar dados
df_ipea = pd.read_csv('/content/drive/MyDrive/PosTech_Analise_de_dados/FASE4/tech_challenge_4/dados/ipeadata_limpo.csv')

# Executar etapas individuais do pipeline
data_cleaner = DataCleaner()
df_ipea = data_cleaner.fit_transform(df_ipea)

data_preparer = DataPreparer(train_size=0.8, scaler=StandardScaler())
scaled_data, train_close = data_preparer.fit_transform(df_ipea)

lstm_model = LSTMModel(epochs=10, batch_size=50, scaler=data_preparer.scaler)
lstm_model.fit((scaled_data, train_close))

# Fazer previsões
y_test = df_ipea['y'].values[train_close:]  # Use the original scale for y_test
prev_lstm = lstm_model.predict((scaled_data, train_close))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [4]:
# Avaliar o modelo
mse, mae, r2, mape = avaliar_modelo(y_test, prev_lstm)

print(f"MSE: {mse:.2f}")
print(f"MAE: {mae:.2f}")
print(f"R²: {r2:.2f}")
print(f"MAPE: {mape:.2f}%")

MSE: 6.78
MAE: 1.79
R²: 0.98
MAPE: 36.44%


In [5]:
# Prever futuro
def prever_futuro(model, close, scaler):
    last_30_days_scaled = scaler.transform(close[-30:].reshape(-1, 1))

    X_test = np.array([last_30_days_scaled])
    X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

    predictions = []
    for i in range(30):
        pred_future = model.predict(X_test)
        predictions.append(pred_future)
        X_test = np.append(X_test[:, 1:, :], pred_future.reshape(1, 1, 1), axis=1)

    predictions = np.array(predictions).reshape(-1, 1)
    predictions = scaler.inverse_transform(predictions)
    return predictions


# Prever futuro
predictions = prever_futuro(lstm_model.model, df_ipea['y'].to_numpy(), data_preparer.scaler)

# Filtrar os últimos 4 anos de dados
ultima_data = df_ipea.index.max()
quatro_anos_atras = ultima_data - pd.DateOffset(years=4)
df_ipea_ultimos_4_anos = df_ipea[df_ipea.index >= quatro_anos_atras]

# Preparar os dados de previsão
proxima_data = ultima_data + pd.Timedelta(days=1)
dates = pd.date_range(start=proxima_data, periods=30, freq='D')
future_predictions_df = pd.DataFrame({'ds': dates, 'previsao': predictions.flatten()})
future_predictions_df.set_index('ds', inplace=True)

# Concatenar os dados reais filtrados com os dados de previsão
combined_df = pd.concat([df_ipea_ultimos_4_anos, future_predictions_df])

# Plotar os resultados
fig = go.Figure()
fig.add_trace(go.Scatter(x=df_ipea_ultimos_4_anos.index, y=df_ipea_ultimos_4_anos['y'], name='Valor Real', line=dict(color='steelblue')))
fig.add_trace(go.Scatter(x=combined_df.index, y=combined_df['previsao'], name='Previsão', line=dict(color='orange')))

fig.update_layout(
    title='Previsão LSTM vs Valores Reais',
    xaxis_title='Data',
    yaxis_title='Valor',
    legend_title='Tipo',
    template='plotly_white'
)
fig.show()



In [13]:
future_predictions_df.to_csv('future_predictions.csv', index_label='ds')

In [None]:
import pickle

model = lstm_model.model
model.save('lstm_model.keras')

with open('lstm_scaler.pkl', 'wb') as file:
    pickle.dump(data_preparer.scaler, file)