## ETL Banxico

In [1]:
import os
import requests
import pandas as pd
from dotenv import load_dotenv
from datetime import datetime
from dateutil.relativedelta import relativedelta
import boto3
import json

In [2]:
# 📌 Cargar variables de entorno
load_dotenv()

# 📌 Obtener los tokens desde .env
BANXICO_TOKEN = os.getenv("BANXICO_TOKEN")
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY")
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")

if not BANXICO_TOKEN:
    raise ValueError("❌ Error: BANXICO_TOKEN no encontrado en .env")

print("✅ Token de Banxico cargado correctamente.")

# 📌 Configurar fechas desde 2020 hasta hoy
initial_date = "2020-01-01"
final_date = datetime.today().strftime("%Y-%m-%d")

# 📌 Configurar cliente de Amazon S3
s3_client = boto3.client(
    "s3",
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
)


✅ Token de Banxico cargado correctamente.


In [3]:
def extract_banxico_data(serie_id, initial_date, final_date):
    """
    Extrae datos de la API de Banxico y los devuelve en un DataFrame.
    """
    url = f"https://www.banxico.org.mx/SieAPIRest/service/v1/series/{serie_id}/datos/{initial_date}/{final_date}?token={BANXICO_TOKEN}"
    response = requests.get(url)

    if response.status_code != 200:
        print(f"❌ Error {response.status_code}: {response.text}")
        return None

    data = response.json()
    
    if "bmx" not in data or "series" not in data["bmx"] or not data["bmx"]["series"]:
        print("⚠️ No se encontraron datos en la respuesta de Banxico.")
        return None

    series_data = data["bmx"]["series"][0]["datos"]

    if not series_data:
        print("⚠️ No hay datos disponibles en este rango de fechas.")
        return None

    # 📌 Convertir datos a DataFrame
    df = pd.DataFrame(series_data)
    df.columns = ["timestamp", "value"]

    # 📌 Convertir fechas correctamente
    df["timestamp"] = pd.to_datetime(df["timestamp"], format="%d/%m/%Y", dayfirst=True)

    # 📌 Convertir valores a numérico
    df["value"] = pd.to_numeric(df["value"], errors="coerce")

    return df


In [4]:
# 📌 Extraer Tipo de Cambio
df_tipo_cambio = extract_banxico_data("SF43718", initial_date, final_date)
df_tipo_cambio.to_csv("tipo_de_cambio.csv", index=False)

# 📌 Extraer Tasa de Interés
df_tasa_interes = extract_banxico_data("SF282", initial_date, final_date)
df_tasa_interes.to_csv("tasa_de_interes.csv", index=False)

# 📌 Extraer INPC desde Banxico
df_inpc = extract_banxico_data("SP1", initial_date, final_date)
df_inpc.to_csv("inpc.csv", index=False)

print("✅ Datos guardados localmente en CSV.")


✅ Datos guardados localmente en CSV.


In [8]:
# Toma"default"
s3_client = boto3.client("s3")

# Probar conexión
response = s3_client.list_buckets()
for bucket in response["Buckets"]:
    print(f"✅ Bucket encontrado: {bucket['Name']}")
# 📌 Probar conexión
try:
    response = s3_client.list_objects_v2(Bucket="banxico-hw", Prefix="raw/")
    for obj in response.get("Contents", []):
        print(f"📂 Archivo en S3: {obj['Key']} - Tamaño: {obj['Size']} bytes")
except Exception as e:
    print(f"❌ Error al conectar con S3: {e}")


✅ Bucket encontrado: arquitectura-athena-queries-sofia
✅ Bucket encontrado: banxico-hw
✅ Bucket encontrado: itam-analytics-sofia
✅ Bucket encontrado: sofia-temp-flights
📂 Archivo en S3: raw/ - Tamaño: 0 bytes
📂 Archivo en S3: raw/inpc.csv - Tamaño: 1190 bytes
📂 Archivo en S3: raw/tasa_de_interes.csv - Tamaño: 1029 bytes
📂 Archivo en S3: raw/tipo_de_cambio.csv - Tamaño: 24694 bytes


In [9]:
def upload_to_s3(file_name, bucket, folder):
    """
    Sube un archivo local a Amazon S3 en la carpeta especificada.
    """
    object_name = f"{folder}/{file_name}"
    try:
        s3_client.upload_file(file_name, bucket, object_name)
        print(f"✅ Archivo {file_name} subido a S3 en {object_name}")
    except Exception as e:
        print(f"❌ Error al subir {file_name}: {e}")

# 📌 Subir los archivos a S3
upload_to_s3("tipo_de_cambio.csv", "banxico-hw", "raw")
upload_to_s3("tasa_de_interes.csv", "banxico-hw", "raw")
upload_to_s3("inpc.csv", "banxico-hw", "raw")

print("✅ Todos los archivos han sido subidos a S3 en la carpeta 'raw/'.")


✅ Archivo tipo_de_cambio.csv subido a S3 en raw/tipo_de_cambio.csv
✅ Archivo tasa_de_interes.csv subido a S3 en raw/tasa_de_interes.csv
✅ Archivo inpc.csv subido a S3 en raw/inpc.csv
✅ Todos los archivos han sido subidos a S3 en la carpeta 'raw/'.


---

### Transforming and cleaning raw data

In [None]:
# 📌 Convertir Tipo de Cambio a Mensual (Promedio por mes)
df_exchange["month"] = df_exchange["date"].dt.to_period("M")  # Convertir a periodo mensual
df_exchange_monthly = df_exchange.groupby("month")["value"].mean().reset_index()

# 📌 Convertir de Periodo a Timestamp para alinearlo con las otras series
df_exchange_monthly["date"] = df_exchange_monthly["month"].dt.to_timestamp()
df_exchange_monthly.drop(columns=["month"], inplace=True)

# 📌 Guardar los datos convertidos
df_exchange_monthly.to_csv("tipo_de_cambio_mensual.csv", index=False)
print("✅ Datos de Tipo de Cambio convertidos a frecuencia mensual guardados en tipo_de_cambio_mensual.csv")


In [None]:
# 📌 Definir la fecha de fin común
fecha_fin = "2025-02-01"

# 📌 Filtrar los DataFrames para asegurarnos de que terminan en la misma fecha
df_inpc = df_inpc[df_inpc["date"] <= fecha_fin]
df_interest = df_interest[df_interest["date"] <= fecha_fin]
df_exchange_monthly = df_exchange_monthly[df_exchange_monthly["date"] <= fecha_fin]

# 📌 Guardar los archivos corregidos
df_inpc.to_csv("inpc_corr.csv", index=False)
df_interest.to_csv("tasa_de_interes_corr.csv", index=False)
df_exchange_monthly.to_csv("tipo_de_cambio_corr.csv", index=False)

print("✅ Todas las series han sido recortadas a 2025-02-01.")


In [None]:
# 📌 Merge de todas las series usando "date" como clave
df_final = df_inpc.merge(df_interest, on="date", how="outer", suffixes=("_inpc", "_tasa"))
df_final = df_final.merge(df_exchange_monthly, on="date", how="outer")

# 📌 Renombrar columnas para claridad
df_final.rename(columns={"value_inpc": "inpc", "value_tasa": "tasa_de_interes", "value": "tipo_de_cambio"}, inplace=True)

# 📌 Verificar los primeros datos
print("📊 DataFrame Final con Todas las Series:")
print(df_final.head())

# 📌 Guardar el DataFrame Final
df_final.to_csv("datos_completos.csv", index=False)
print("✅ DataFrame Final guardado en datos_completos.csv")


#### INPC - inflación anualizada

In [None]:
# 📌 Cargar los datos completos
df = pd.read_csv("datos_completos.csv", parse_dates=["date"])

# 📌 Ordenar por fecha (por si acaso)
df = df.sort_values(by="date")

# 📌 Ver las primeras filas antes de calcular la inflación
print("📊 Datos antes de calcular la inflación:")
print(df.head())


In [None]:
# 📌 Calcular el INPC rezagado 12 meses
df["inpc_lag_12"] = df["inpc"].shift(12)

# 📌 Calcular la inflación anualizada
df["inflacion"] = 100 * (df["inpc"] / df["inpc_lag_12"] - 1)

# 📌 Ver los primeros valores calculados
print("\n📊 Datos después de calcular la inflación:")
print(df[["date", "inpc", "inpc_lag_12", "inflacion"]].head(15))


In [None]:
# 📌 Guardar los datos con la inflación calculada
df.to_csv("datos_con_inflacion.csv", index=False)
print("✅ Datos con inflación guardados en datos_con_inflacion.csv")


---

## ETL INEGI -- WIP

In [None]:
# Cargar las variables de entorno desde .env
load_dotenv()

# Obtener el token
INEGI_TOKEN = os.getenv("INEGI_TOKEN")

# Verificar que se cargó correctamente
if not INEGI_TOKEN:
    raise ValueError("❌ Error: INEGI_TOKEN no encontrado en .env")

print("✅ Token de INEGI cargado correctamente:", INEGI_TOKEN[:10] + "...")


In [None]:
# ID del INPC Mensual en INEGI
serie_id = "539261"

# Construcción de la URL
url = f"https://www.inegi.org.mx/app/api/indicadores/desarrolladores/jsonxml/INDICATOR/{serie_id}/es/0700/true/BISE/2.0/{INEGI_TOKEN}?type=json"

# Hacer la solicitud a la API
response = requests.get(url)

# Verificar la respuesta
if response.status_code == 200:
    data = response.json()
    print("📡 Código de respuesta:", response.status_code)
    print("🔍 Respuesta de la API:")
    print(json.dumps(data, indent=4)[:1000])  # Muestra los primeros 1000 caracteres
else:
    print("❌ Error en la solicitud:", response.text)


In [None]:
import requests
import json
from dotenv import load_dotenv
import os

# Cargar el token desde .env
load_dotenv()
INEGI_TOKEN = os.getenv("INEGI_TOKEN")

# ID del INPC Mensual en INEGI
serie_id = "539261"

# Construcción de la URL
url = f"https://www.inegi.org.mx/app/api/indicadores/desarrolladores/jsonxml/INDICATOR/{serie_id}/es/0700/true/BISE/2.0/{INEGI_TOKEN}?type=json"

# Hacer la solicitud a la API
response = requests.get(url)

# Verificar la respuesta
if response.status_code == 200:
    data = response.json()
    series_data = data["Series"][0]["OBSERVATIONS"]
    
    print(f"📊 Número de datos devueltos por la API: {len(series_data)}")
    print("📡 Primeros datos:")
    print(json.dumps(series_data[:5], indent=4))  # Muestra solo los primeros 5 valores
else:
    print(f"❌ Error {response.status_code}: {response.text}")
