## 3.1 -  Tabela de ingestão de dados


In [1]:
import pandas as pd
import requests
from io import StringIO
import numpy as np


In [2]:
# URL do CSV diretamente do GitHub (arquivo bruto)
file_path = "https://raw.githubusercontent.com/rivolela/fiap_tech_challenge_03/ae57bb6cd423c7b38a3208c9b6ea112d168d1344/csv/data_source_tech_challenge_03.csv"

# Baixar o arquivo CSV
response = requests.get(file_path, verify=True)

# Verificar se a requisição foi bem-sucedida
if response.status_code == 200:
    # Ler o CSV usando o delimitador correto
    df = pd.read_csv(StringIO(response.text), delimiter=',', quotechar='"', engine='python', on_bad_lines='skip')

    # Limpar os nomes das colunas removendo espaços em branco antes ou depois dos nomes
    df.columns = df.columns.str.strip()

    # Colunas que precisam ser convertidas para float
    colunas_para_converter = [
        'valor_m2_novo', 'valor_m2_existente', 'taxa_inflacao_nacional',
        'taxa_juros_emprestimo_nacional', 'indice_preco_habitacao_alojamento_novo',
        'indice_preco_habitacao_alojamento_existente', 'taxa_desemprego_16_a_74_anos'
    ]

    # Converter as colunas para float, lidando com erros
    for coluna in colunas_para_converter:
        df[coluna] = pd.to_numeric(df[coluna], errors='coerce')

    print(df.info)        
else:
    print(f"Erro ao baixar o arquivo: {response.status_code}")


## 3.2 - Limpar dados nulos

In [3]:
# Drop rows with null values

# Create a mask to identify rows with null values
null_rows = df[df.isnull().any(axis=1)]

df_clean = df.dropna()

# Print the rows that were removed
print("Rows removed due to null values:")
print(null_rows)

print(df_clean.head)


## 3.3 - Tratamento de colunas numéricas

In [None]:
# Limpar os nomes das colunas removendo espaços em branco antes ou depois dos nomes
df_clean.columns = df_clean.columns.str.strip()

# Lista das colunas que queremos converter para números
colunas_numericas = [
    'valor_m2_novo', 'valor_m2_existente', 'taxa_inflacao_nacional',
    'taxa_juros_emprestimo_nacional', 'indice_preco_habitacao_alojamento_novo',
    'indice_preco_habitacao_alojamento_existente', 'taxa_desemprego_16_a_74_anos'
]

# Substituir vírgulas por pontos e converter para float
for coluna in colunas_numericas:
    # Verifica se a coluna está no DataFrame
    if coluna in df_clean.columns:
        # Verifica se a coluna é do tipo 'object' (string)
        if df_clean[coluna].dtype == 'object':
            df_clean.loc[:, coluna] = df_clean[coluna].str.replace(',', '.').astype(float)
        else:
            df_clean.loc[:, coluna] = df_clean[coluna].astype(float)  # Já pode converter diretamente
    else:
        print(f"Coluna {coluna} não encontrada no DataFrame.")

# Exibir as primeiras linhas para verificar a conversão
print(df_clean[colunas_numericas].head())


## 3.4 - Removendo Outliers

In [None]:
# Função para detectar e tratar outliers usando o método IQR
def tratar_outliers_iqr(df, colunas_numericas):
    for coluna in colunas_numericas:
        if coluna in df.columns:
            # Calcular os quartis
            Q1 = df[coluna].quantile(0.25)
            Q3 = df[coluna].quantile(0.75)
            IQR = Q3 - Q1
            
            # Definir limites inferior e superior
            limite_inferior = Q1 - 1.5 * IQR
            limite_superior = Q3 + 1.5 * IQR
            
            # Criar uma cópia da coluna original para comparação
            original_values = df[coluna].copy()
            
            # Imprimir limites para verificação
            print(f"Coluna: {coluna}, Limite Inferior: {limite_inferior}, Limite Superior: {limite_superior}")
            
            # Contar os outliers antes de tratar
            outliers_count_before = ((original_values < limite_inferior) | (original_values > limite_superior)).sum()
            print(f"Número de outliers antes de tratamento: {outliers_count_before}")

            # Substituir outliers acima do limite superior com o limite superior usando .loc
            df.loc[df[coluna] > limite_superior, coluna] = limite_superior
            
            # Substituir outliers abaixo do limite inferior com o limite inferior usando .loc
            df.loc[df[coluna] < limite_inferior, coluna] = limite_inferior
            
            # Contar os outliers após tratar
            outliers_count_after = ((df[coluna] < limite_inferior) | (df[coluna] > limite_superior)).sum()
            print(f"Número de outliers após tratamento: {outliers_count_after}")

            # Identificar quais valores foram atualizados
            updated_items = df[df[coluna] != original_values]
            
            print(f"Outliers tratados na coluna {coluna}")
            if not updated_items.empty:
                print("Valores atualizados:")
                print(updated_items[[coluna]])  # Mostrar apenas a coluna atualizada
            else:
                print("Nenhum valor foi atualizado na coluna.")
        else:
            print(f"Coluna {coluna} não encontrada no DataFrame.")

    return df



# Aplicar a função ao DataFrame
df_tratado = tratar_outliers_iqr(df_clean, colunas_numericas)

# Exibir o resultado
#print(df_tratado[colunas_numericas].describe())


## 3.5 - Normalização e Padronização


In [None]:
from sklearn.preprocessing import StandardScaler

# Defina as colunas que você deseja normalizar
colunas_a_normalizar = [
    'taxa_inflacao_nacional',
    'taxa_juros_emprestimo_nacional',
    'taxa_desemprego_16_a_74_anos'
]

# Inicializar o escalador
scaler = StandardScaler()

# Use .loc para garantir que estamos alterando o DataFrame corretamente
df_normalizado = df_tratado.copy()  # Melhor prática: faça uma cópia do DataFrame original
df_normalizado.loc[:, colunas_a_normalizar] = scaler.fit_transform(df_tratado[colunas_a_normalizar])

# Exibir o DataFrame normalizado
print(df_normalizado[colunas_a_normalizar].head())
print(df_normalizado)


## 3.6 - Codificação de Dados Categóricos


In [None]:
from sklearn.preprocessing import LabelEncoder

# 1. Codificação do Periodo usando Label Encoding
label_encoder = LabelEncoder()
df_normalizado['Periodo_encoded'] = label_encoder.fit_transform(df_normalizado['Periodo'])

# Verificar se a coluna 'Bairro' existe antes de aplicar One-Hot Encoding
if 'Bairro' in df_normalizado.columns:
    # 2. Codificação do Bairro usando One-Hot Encoding
    df_normalizado = pd.get_dummies(df_normalizado, columns=['Bairro'], prefix='Bairro')
else:
    print("Coluna 'Bairro' não encontrada no DataFrame")

# 3. Convertendo colunas booleanas resultantes para inteiros (0 e 1)
# Converter apenas as colunas que podem ser convertidas para inteiros
df_normalizado.loc[:, df_normalizado.columns.str.startswith('Bairro')] = df_normalizado.loc[:, df_normalizado.columns.str.startswith('Bairro')].astype(int)

# Exibir o DataFrame com as colunas codificadas
print(df_normalizado)

# Validação: Conferir as colunas codificadas
print("Colunas após codificação:")
print(df_normalizado.columns)

# Validação: Mostrar os primeiros registros
print("Primeiros registros do DataFrame:")
print(df_normalizado.head())


## 3.7 - Engenharia de features


In [None]:
# Criar uma nova coluna 'media_valor_m2' que é a média entre 'valor_m2_novo' e 'valor_m2_existente'
df_normalizado['media_valor_m2'] = (df_normalizado['valor_m2_novo'] + df_normalizado['valor_m2_existente']) / 2

# Exibir o DataFrame com a nova feature
print(df_normalizado)

### 3.7.2 - Criação de variáveis lag (trimestral) para o cálculo temporal

In [9]:
# Criar variáveis de lag de 1 a 8 trimestres
for lag in range(1, 8):
    df_normalizado[f'media_valor_m2_lag_{lag}'] = df_normalizado['media_valor_m2'].shift(lag)


# Remover linhas com valores ausentes que foram gerados pela criação de lags
df_normalizado.dropna(inplace=True)

# 4 - Divisão dos Dados e Seleção das Características


In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

# Divisão manual dos dados de treino e teste
# Definindo o tamanho do conjunto de treino para 80% dos dados, 
# assim as últimas 4 linhas serão usadas para teste.
train_size = int(len(df_normalizado) * 0.8)
train, test = df_normalizado.iloc[:train_size], df_normalizado.iloc[train_size:]
print(f"train_size: {train_size}")

# Definindo as variáveis dependentes e independentes para o conjunto de treino
X_train = train[['taxa_inflacao_nacional', 
                  'taxa_juros_emprestimo_nacional', 
                  'indice_preco_habitacao_alojamento_novo', 
                  'indice_preco_habitacao_alojamento_existente', 
                  'taxa_desemprego_16_a_74_anos'] +
                 [f'media_valor_m2_lag_{lag}' for lag in range(1, 8)]]

y_train = train['media_valor_m2']

X_test = test[['taxa_inflacao_nacional', 
                'taxa_juros_emprestimo_nacional', 
                'indice_preco_habitacao_alojamento_novo', 
                'indice_preco_habitacao_alojamento_existente', 
                'taxa_desemprego_16_a_74_anos'] +
               [f'media_valor_m2_lag_{lag}' for lag in range(1, 8)]]

y_test = test['media_valor_m2']

print(f"Número de predições em testes: {len(X_test)}")

len

# 5 - Treino do modelo


## 5.1 - Modelo Escolhido

In [None]:
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error

# Criação do pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),  # Normalização dos dados
    ('model', GradientBoostingRegressor())  # Modelo de regressão Gradient Boosting
])

# Definindo os hiperparâmetros a serem ajustados
param_grid = {
    'model__learning_rate': [0.07],
    'model__max_depth': [15],
    'model__min_samples_leaf': [3],
    'model__min_samples_split': [10],
    'model__n_estimators': [250]
}

# Configurando o GridSearchCV para procurar os melhores parâmetros
grid_search = GridSearchCV(estimator=pipeline, param_grid=param_grid, cv=5, scoring='neg_mean_squared_error')

# Ajustando o modelo com os dados de treino
grid_search.fit(X_train, y_train)

# Fazendo previsões com os dados de teste
y_pred = grid_search.predict(X_test)

# Cálculo do erro quadrático médio
mse = mean_squared_error(y_test, y_pred)  # Use y_test em vez de test['media_valor_m2']
print(f"Mean Squared Error: {mse:.2f}")

# Mostrando os melhores hiperparâmetros
print("Melhores Hiperparâmetros:", grid_search.best_params_)


# 6 - Plot modelo


# 8 - Previsão Futura


In [None]:
import matplotlib.pyplot as plt
import numpy as np


# Número de previsões futuras
n_futuros = 10

# Previsões futuras (usando o código que você tem para previsões futuras)
future_predictions = []  # Lista para armazenar as previsões futuras
current_data = X_test.iloc[-1].values.reshape(1, -1)  # Obtendo os dados da última observação

# Supondo que você tenha as colunas de lag definidas
lag_columns = [f'media_valor_m2_lag_{i}' for i in range(1, 8)]  # Ajuste conforme suas colunas de lag

for t in range(n_futuros):
    future_pred = grid_search.predict(current_data)  # Fazendo a previsão
    future_predictions.append(future_pred[0])  # Armazenando a previsão
    
    # Atualizando current_data com o novo valor previsto
    current_data[0, -1] = future_pred[0]  # Supondo que a última coluna é a que estamos prevendo
    
    # Atualizando os lag para o próximo loop
    for i in range(len(lag_columns) - 1):
        current_data[0, i] = current_data[0, i + 1]  # Deslocando os valores de lag
    current_data[0, len(lag_columns) - 1] = future_pred[0]  # O novo valor previsto se torna o último lag

# Visualização das previsões para toda a amostragem de testes
plt.figure(figsize=(12, 6))

# Obtendo todos os valores reais e previstos
real_values = test['media_valor_m2'].values
predicted_values = y_pred

# Plotando os valores reais e previstos
plt.plot(real_values, label='Valor Real', marker='o', linestyle='-', color='blue')
plt.plot(predicted_values, label='Previsão', marker='x', linestyle='--', color='orange')

# Adicionando a linha da média real
plt.axhline(y=np.mean(real_values), color='r', linestyle='--', label='Média Real')

# Concatenando as previsões futuras
predicted_values_extended = np.concatenate((predicted_values, future_predictions))


# Garantindo que os rótulos do eixo x correspondam aos dados
trimestres = test.index  # Obtendo todos os índices dos trimestres

# Usando todos os trimestres como rótulos e adicionando rótulos para os futuros
future_labels = ['Futuro {}'.format(i) for i in range(1, n_futuros + 1)]
all_labels = np.concatenate((trimestres, future_labels))

plt.xticks(ticks=np.arange(len(all_labels)), labels=all_labels, rotation=45)

plt.xlabel('Observações no DF de teste')
plt.ylabel('Valor Médio do M²')
plt.plot(np.arange(len(real_values), len(real_values) + n_futuros), future_predictions, label='Previsões Futuras', marker='s', linestyle='--', color='green')
plt.legend()
plt.grid()
plt.tight_layout()  # Ajusta o layout para não cortar os rótulos
plt.title('Previsão de Valor Médio do M² - Testes e Previsões Futuras')
plt.show()
