## A tener en cuenta:

#### Respecto a la BBDD:
- La base de datos se implementa en Amazon RDS, utilizando el motor MySQL.
- La base de datos tiene acceso público por si es necesario validar los datos.
- Los parámetros de conexión (HOST, USER, PASSWORD y DB_NAME) deben definirse a través de variables de entorno o en un archivo .env para evitar su exposición en el código fuente.
- La conexión a la base de datos se realiza utilizando un administrador de contexto, lo que asegura que la conexión se cierre adecuadamente una vez completadas las operaciones.

### Respecto al Pipeline y las validaciones
- Se implementaron generadores (`yiled`) y se usó el método `chunk` de `pandas` para procesar los datos por _micro batches_ y asegurar un uso eficiente de memoria y almacenamiento durante el procesamiento de los archivos.
- El seguimiento a las estadísticas solicitadas se encuentra en los archivo <font color="Khaki">metrics-pipeline.log</font> y <font color="Khaki">metrics-validation.log</font>, respectivamente.
- Las estadísticas se actualizan después de la inserción de cada fila.
- Para calcular las estadísticas acumuladas <font size="1">(recuento total de filas, valor promedio, valor mínimo y valor máximo para el campo “price”)</font> los resultados intermedios se almacenan  en memoria, así se evita hacer “SELECT avg (price)..." a los nuevos registros.
- El archivo <font color="Khaki">logs.log</font> se utiliza para registrar el flujo de ejecución del proceso.

### Know Issues:
- Se debe mejorar la claridad y el nivel de detalle en algunos _docstrings_.
- Se debe agregar pruebas unitarias para validar el comportamiento esperado de las diferentes funciones.
- Se debe mejorar el manejo de excepciones en el código para capturar y manejar específicamente los diferentes tipos de errores que pueden ocurrir en cada etapa del proceso.

### Recomendaciones
- Se debería crear una rama de desarrollo en Git para seguir el flujo de trabajo Gitflow.
- Se debería crear de una clase `BaseModel`  de `Pydantic`  para la validación de los tipos de datos antes de ser insertados en las tablas de la base de datos.
- Se debería implementar un DAG de Airflow para ejecutar automáticamente el proceso, monitorear el flujo de trabajo y, en caso de fallo, reiniciar el pipeline desde el último punto exitoso (evitando la re-ejecución completa del proceso)

In [1]:
import config.config as cfg
from config import LoggerConfig, logger
from etl import get_data_files
from run_data_pipeline import run_data_pipeline
from utils import MySQLDatabase

In [2]:
# Define process constants
TABLE_NAME = "pragma_sales"
CHUNK_SIZE = 5

DB_HOST = cfg.DB_HOST
DB_USER = cfg.DB_USER
DB_PASSWORD = cfg.DB_PASSWORD
DB_NAME = cfg.DB_NAME

# Set up loggers
metrics_logger = LoggerConfig("metrics-pipeline", stream_handler=False).get_logger()

# Set up database connection
db = MySQLDatabase(DB_HOST, DB_USER, DB_PASSWORD, DB_NAME)

In [3]:
# Run the ETL process 
logger.info("Starting ETL process")
with db:
    # Create the table 'pragma_sales' in the database
    logger.info(f"Creating table '{TABLE_NAME}'...")
    db.drop_table(f"DROP TABLE IF EXISTS {TABLE_NAME}")
    db.create_table(
        f"""
        CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
            pk INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
            timestamp DATETIME DEFAULT NULL,
            day INT DEFAULT NULL,
            month INT DEFAULT NULL,
            year INT DEFAULT NULL,
            price FLOAT DEFAULT NULL,
            user_id INT DEFAULT NULL,
            processed_date DATETIME DEFAULT NULL
        )
        """
    )

    # Get files to process, excluding validation file
    files = get_data_files(exclude_validation=True)

    # Process the data files and load it into the database
    run_data_pipeline(
        files,
        db,
        table_name=TABLE_NAME,
        chunk_size=CHUNK_SIZE,
        metrics_logger=metrics_logger,
    )
    logger.info("ETL process finished successfully")

2024-11-18 14:48:16 - INFO - Starting ETL process
2024-11-18 14:48:17 - INFO - Connected successfully to the RDS 'database-pragma.chowsgwustl6.us-east-1.rds.amazonaws.com'
2024-11-18 14:48:17 - INFO - Creating table 'pragma_sales'...
2024-11-18 14:48:17 - DEBUG - Executing SQL statement 'DROP TABLE IF EXISTS pragma_sales'...
2024-11-18 14:48:17 - DEBUG - SQL statement executed successfully
2024-11-18 14:48:17 - DEBUG - Executing SQL statement 'CREATE TABLE IF NOT EXISTS pragma_sales ( pk INT NOT NULL AUTO_INCREMENT PRIMARY KEY, timestamp DATETIME DEFAULT NULL, day INT DEFAULT NULL, month INT DEFAULT NULL, year INT DEFAULT NULL, price FLOAT '...
2024-11-18 14:48:17 - DEBUG - SQL statement executed successfully
2024-11-18 14:48:17 - INFO - Processing file './data/2012-1.csv'...
2024-11-18 14:48:17 - DEBUG - Processing microbatch 1...
2024-11-18 14:48:17 - DEBUG - Executing SQL statement 'INSERT INTO pragma_sales(timestamp, day, month, year, price, user_id, processed_date) VALUES (2012-01

In [4]:
logger.info("Validating data...")
with db:
    # Select
    result = db.select(
        f"""
        SELECT COUNT(*), AVG(price), MIN(price), MAX(price)        
        FROM {TABLE_NAME}
        """,
    )
    if result:
        logger.info(f"Number of rows: {result[0][0]}")
        logger.info(f"Average price: {result[0][1]}")
        logger.info(f"Min price: {result[0][2]}")
        logger.info(f"Max price: {result[0][3]}")

2024-11-18 14:48:45 - INFO - Validating data...
2024-11-18 14:48:46 - INFO - Connected successfully to the RDS 'database-pragma.chowsgwustl6.us-east-1.rds.amazonaws.com'
2024-11-18 14:48:46 - DEBUG - Executing SQL statement 'SELECT COUNT(*), AVG(price), MIN(price), MAX(price) FROM pragma_sales'...
2024-11-18 14:48:46 - DEBUG - SQL statement executed successfully
2024-11-18 14:48:46 - INFO - Number of rows: 143
2024-11-18 14:48:46 - INFO - Average price: 57.884892086330936
2024-11-18 14:48:46 - INFO - Min price: 10.0
2024-11-18 14:48:46 - INFO - Max price: 100.0
2024-11-18 14:48:46 - INFO - Connection to the database 'database-pragma.chowsgwustl6.us-east-1.rds.amazonaws.com' has been closed


In [5]:
# Set up loggers
metrics_logger = LoggerConfig("metrics-validation", stream_handler=False).get_logger()

# Run the validation process
logger.info("Starting validation process")
with db:
    # Get files to process, only validation file
    files = get_data_files(only_validation=True)

    # Process the data files and load it into the database
    run_data_pipeline(
        files,
        db,
        table_name=TABLE_NAME,
        chunk_size=CHUNK_SIZE,
        metrics_logger=metrics_logger,
    )
    logger.info("Validation process finished successfully")

2024-11-18 14:48:51 - INFO - Starting validation process
2024-11-18 14:48:52 - INFO - Connected successfully to the RDS 'database-pragma.chowsgwustl6.us-east-1.rds.amazonaws.com'
2024-11-18 14:48:52 - INFO - Processing file './data/validation.csv'...
2024-11-18 14:48:52 - DEBUG - Processing microbatch 1...
2024-11-18 14:48:52 - DEBUG - Executing SQL statement 'INSERT INTO pragma_sales(timestamp, day, month, year, price, user_id, processed_date) VALUES (2012-06-01 00:00:00, 1, 6, 2012, 26, 10, 2024-11-18 14:48:52.290710)'...
2024-11-18 14:48:52 - DEBUG - SQL statement executed successfully
2024-11-18 14:48:52 - DEBUG - Executing SQL statement 'INSERT INTO pragma_sales(timestamp, day, month, year, price, user_id, processed_date) VALUES (2012-06-02 00:00:00, 2, 6, 2012, 11, 8, 2024-11-18 14:48:52.290710)'...
2024-11-18 14:48:52 - DEBUG - SQL statement executed successfully
2024-11-18 14:48:52 - DEBUG - Executing SQL statement 'INSERT INTO pragma_sales(timestamp, day, month, year, price, u

In [6]:
logger.info("Validating data with validation file...")
with db:
    # Select
    result = db.select(
        f"""
        SELECT COUNT(*), AVG(price), MIN(price), MAX(price)        
        FROM {TABLE_NAME}
        """,
    )
    if result:
        logger.info(f"Number of rows: {result[0][0]}")
        logger.info(f"Average price: {result[0][1]}")
        logger.info(f"Min price: {result[0][2]}")
        logger.info(f"Max price: {result[0][3]}")

2024-11-18 14:48:55 - INFO - Validating data with validation file...
2024-11-18 14:48:56 - INFO - Connected successfully to the RDS 'database-pragma.chowsgwustl6.us-east-1.rds.amazonaws.com'
2024-11-18 14:48:56 - DEBUG - Executing SQL statement 'SELECT COUNT(*), AVG(price), MIN(price), MAX(price) FROM pragma_sales'...
2024-11-18 14:48:56 - DEBUG - SQL statement executed successfully
2024-11-18 14:48:56 - INFO - Number of rows: 151
2024-11-18 14:48:56 - INFO - Average price: 57.006802721088434
2024-11-18 14:48:56 - INFO - Min price: 10.0
2024-11-18 14:48:56 - INFO - Max price: 100.0
2024-11-18 14:48:56 - INFO - Connection to the database 'database-pragma.chowsgwustl6.us-east-1.rds.amazonaws.com' has been closed
