1. Encabezado del Notebook

# Notebook para procesamiento e inserción de datos de tasas de cambio en Redshift
Este notebook obtiene datos de tasas de cambio de una API, los procesa y los inserta en una base de datos Redshift.

2. Código para obtener datos (antes data_fetcher.py)

In [1]:
import requests
import pandas as pd
from datetime import datetime

def obtener_datos():
    # URL de la API
    url = 'https://api.exchangerate-api.com/v4/latest/USD'
    
    # Realizar la solicitud GET
    response = requests.get(url)
    
    # Verificar si la solicitud fue exitosa
    if response.status_code != 200:
        raise Exception(f"Error al obtener datos: {response.status_code}")
    
    # Convertir la respuesta JSON a un diccionario
    data = response.json()
    
    # Convertir los datos de tasas de cambio en un DataFrame
    rates_df = pd.DataFrame(list(data['rates'].items()), columns=['Currency', 'Rate'])
    
    base_currency = data['base']
    date = data['date']
    
    # Agregar columna de fecha
    rates_df['Date'] = datetime.strptime(date, '%Y-%m-%d').date()
    
    return rates_df, base_currency, date


3. Código para procesar los datos (antes data_preprocessing.py)

In [2]:
from datetime import datetime
import pandas as pd

def procesar_datos(rates_df, date):
    specific_currencies = {'USD', 'ARS', 'AUD', 'BDT', 'BRL', 'CAD', 'CNY', 'ETB', 'EUR', 'FJD', 'GBP', 'HTG', 'JPY', 'NGN', 'NIO', 'PGK', 'PYG', 'SLL', 'VUV', 'ZAR'}
    rates_df = rates_df[rates_df['Currency'].isin(specific_currencies)].copy()
    
    # Agregar columna temporal
    rates_df['Ingestion_Time'] = datetime.now()
    
    # Diccionario de monedas a datos geográficos
    currency_to_geo = {
        'USD': {'Country': 'United States', 'Region': 'North America', 'Continent': 'America'},
        'CAD': {'Country': 'Canada', 'Region': 'North America', 'Continent': 'America'},
        # (continúa con los datos del diccionario que tienes)
    }
    
    # Agregar datos geográficos
    geo_data = rates_df['Currency'].map(currency_to_geo)
    rates_df['Country'] = geo_data.apply(lambda x: x.get('Country') if isinstance(x, dict) else None)
    rates_df['Region'] = geo_data.apply(lambda x: x.get('Region') if isinstance(x, dict) else None)
    rates_df['Continent'] = geo_data.apply(lambda x: x.get('Continent') if isinstance(x, dict) else None)
    
    # Agregar columna Wealthy
    wealthy_currencies = {'USD', 'CAD', 'BRL', 'ARS', 'AUD', 'FJD', 'JPY', 'CNY', 'EUR', 'GBP', 'ZAR', 'ETB'}
    rates_df['Wealthy'] = rates_df['Currency'].apply(lambda x: 1 if x in wealthy_currencies else 0)
    
    rates_df = rates_df.drop_duplicates(subset=['Currency', 'Date'])
    
    return rates_df


4. Código para conectar y manipular Redshift (antes utils.py)

In [3]:
import psycopg2
import os
from dotenv import load_dotenv

# Cargar variables de entorno desde el archivo .env
load_dotenv()

def conectar_redshift():
    host = os.getenv('REDSHIFT_HOST')
    port = os.getenv('REDSHIFT_PORT')
    dbname = os.getenv('REDSHIFT_DBNAME')
    user = os.getenv('REDSHIFT_USER')
    password = os.getenv('REDSHIFT_PASSWORD')
    
    conn = psycopg2.connect(
        host=host,
        port=port,
        dbname=dbname,
        user=user,
        password=password
    )
    cur = conn.cursor()
    return conn, cur

def eliminar_registros(cur, rates_df):
    delete_query = 'DELETE FROM exchange_rates WHERE currency = %s AND date = %s'
    unique_keys = rates_df[['Currency', 'Date']].drop_duplicates()
    for _, row in unique_keys.iterrows():
        cur.execute(delete_query, (row['Currency'], row['Date']))

def insertar_datos(cur, conn, rates_df, base_currency, date):
    insert_query = '''
    INSERT INTO exchange_rates (base, date, currency, rate, ingestion_time, country, region, continent, wealthy) 
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
    '''
    for _, row in rates_df.iterrows():
        cur.execute(insert_query, (
            base_currency, row['Date'], row['Currency'], row['Rate'], row['Ingestion_Time'],
            row['Country'], row['Region'], row['Continent'], row['Wealthy']
        ))
    conn.commit()

def cerrar_conexion(cur, conn):
    cur.close()
    conn.close()


5. Celda con la función principal (antes main.py)

In [4]:
def main():
    try:
        # Obtener datos desde la API
        rates_df, base_currency, date = obtener_datos()
        
        # Procesar los datos
        rates_df = procesar_datos(rates_df, date)
        
        # Conectar a Redshift
        conn, cur = conectar_redshift()
        
        try:
            # Eliminar registros existentes
            eliminar_registros(cur, rates_df)
            
            # Insertar nuevos datos
            insertar_datos(cur, conn, rates_df, base_currency, date)
            
        except Exception as e:
            print(f"Error al procesar los datos en Redshift: {e}")
            conn.rollback()  # Si algo falla, revertir los cambios
        finally:
            # Cerrar conexión
            cerrar_conexion(cur, conn)
    
    except Exception as e:
        print(f"Error en la función principal: {e}")
    else:
        print("Datos insertados correctamente")

# Ejecutar el proceso
main()


Datos insertados correctamente
