<a href="https://colab.research.google.com/github/lucianoalessi/data-engineering/blob/main/CEL_Data_Eng_Extraccion_de_bases_de_datos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Extracción de datos
## Bases de datos relacionales

### Preparación de entorno

In [None]:
# Instalar librerías
!pip install sqlalchemy
!pip install pymysql

Collecting pymysql
  Downloading PyMySQL-1.1.0-py3-none-any.whl (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.8/44.8 kB[0m [31m972.3 kB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pymysql
Successfully installed pymysql-1.1.0


Antes de continuar:
1.   Subir el archivo `pipeline.conf`y editarlo ingresando los datos que correspondan.
2.   Crear una carpeta metadata y allí dentro subir el archivo `metadata_ingestion.json`.



### Funciones

In [None]:
# Antes de definir funciones importamos librerías
import json
import os
from datetime import datetime, date
from configparser import ConfigParser

import pandas as pd
from sqlalchemy import MetaData, create_engine, text

In [None]:
# utils_state.py

def read_state_from_json(file_path):
    """
    Lee un archivo JSON que contiene el ultimo valor incremental extraído
    de cada tabla de la base de datos.

    Parámetros:
        file_path (str): Ruta del archivo JSON

    Retorna:
        Diccionario con el contenido del archivo JSON
    """
    try:
        with open(file_path, 'r') as file:
            state = json.load(file)
            return state
    except FileNotFoundError:
        raise FileNotFoundError(f"El archivo JSON en la ruta {file_path} no existe.")
    except json.JSONDecodeError:
        raise json.JSONDecodeError(f"El archivo JSON en la ruta {file_path} no es válido.")

def write_state_to_json(file_path, state):
    """
    Escribe el estado de la replicación en un archivo JSON

    Parámetros:
        file_path (str): Ruta del archivo JSON
        state (dict): Objeto con el estado de la replicación
    """
    try:
        with open(file_path, 'w') as file:
            json.dump(state, file, default=str, indent=4)
    except FileNotFoundError:
        raise FileNotFoundError(f"El archivo JSON en la ruta {file_path} no existe.")
    except json.JSONDecodeError:
        raise json.JSONDecodeError(f"El archivo JSON en la ruta {file_path} no es válido.")

def get_last_incremental_value(state, table_name):
    """
    Obtiene el último valor incremental de una tabla

    Parámetros:
        state (dict): Objeto con el estado de la replicación
        table_name (str): Nombre de la tabla

    Retorna:
        Ultimo valor incremental de la tabla. Debe ser date, datetime, timestamp.
    """
    try:
        return state[table_name]['last_value']
    except KeyError:
        raise KeyError(f"La tabla {table_name} no existe en el archivo JSON.")

def update_incremental_value(state, file_path, table_name, new_value):
    """
    Actualiza el valor incremental de una tabla en el estado de la replicación

    Parámetros:
        state (dict): Objeto con el estado de la replicación
        file_path (str): Ruta donde guardar el archivo JSON
        table_name (str): Nombre de la tabla
        new_value: Nuevo valor incremental. Puede ser date, datetime, timestamp.
    """

    last_incremental_value = get_last_incremental_value(state, table_name)

    # Chequeamos el tipo de datos
    if isinstance(new_value, date):
        last_incremental_value = datetime.fromisoformat(last_incremental_value).date()
    elif isinstance(new_value, datetime):
        last_incremental_value = datetime.fromisoformat(last_incremental_value)
    else:
        raise TypeError(f"El tipo de dato {type(new_value)} no está soportado. Debe ser date, datetime o timestamp.")

    if new_value < last_incremental_value:
        raise ValueError(f"El nuevo valor incremental {new_value} es menor al valor anterior {last_incremental_value}.")
    elif new_value is None:
        raise ValueError(f"El nuevo valor incremental {new_value} no puede ser nulo.")

    # Actualizamos el valor incremental
    state[table_name]['last_value'] = new_value
    write_state_to_json(file_path, state)

In [None]:
# utils_db.py

def connect_to_db(config_file, section, driverdb):
    """
    Crea una conexión a la base de datos especificada en el archivo de configuración.

    Parámetros:
    config_file (str): La ruta del archivo de configuración.
    section (str): La sección del archivo de configuración que contiene los datos de la base de datos.
    driverdb (str): El driver de la base de datos a la que se conectará.

    Retorna:
    Un objeto de conexión a la base de datos.
    """
    try:
        # Lectura del archivo de configuración
        parser = ConfigParser()
        parser.read(config_file)

        # Creación de un diccionario
        # donde cargaremos los parámetros de la base de datos
        db = {}
        if parser.has_section(section):
            params = parser.items(section)
            db = {param[0]: param[1] for param in params}

            # Creación de la conexión a la base de datos
            engine = create_engine(
                f"{driverdb}://{db['user']}:{db['pwd']}@{db['host']}:{db['port']}/{db['dbname']}"
            )
            return engine

        else:
            print(
                f"Sección {section} no encontrada en el archivo de configuración.")
            return None
    except Exception as e:
        print(f"Error al conectarse a la base de datos: {e}")
        return None


def get_metadata_db(sqlalchemy_engine):
    """
    Genera un archivo JSON con la metadata de la base de datos,
    con el formato
    {
        table_name: {
            column_name: {column_metadata}
                    }
    }

    Parámetros:
        sqlalchemy_engine: Objeto de conexión de SQLAlchemy

    Retorna:
        None
    """

    try:
        # Establecer conexión con la metadata de la base de datos
        metadata = MetaData()
        metadata.reflect(bind=sqlalchemy_engine)
    except Exception as e:
        print(f"Error al conectar con la base de datos: {e}")
        return

    # Crear un diccionario donde se almacenará la metadata
    metadata_dict = {}
    for tbl in metadata.tables.values():
        table_dict = {}
        for column in tbl.c:
            col_dict = {
                'type': str(column.type),
                'nullable': column.nullable,
                'default': column.default,
                'primary_key': column.primary_key,
                'references': ''.join([str(fk.column) for fk in column.foreign_keys])
            }
            table_dict[column.name] = col_dict
        metadata_dict[tbl.name] = table_dict

    try:
        # Guardar el diccionario en un archivo JSON
        os.makedirs('metadata', exist_ok=True)
        metadata_obj = json.dumps(metadata_dict, indent=4)
        with open('metadata/metadata_tables.json', 'w') as file:
            file.write(metadata_obj)
    except Exception as e:
        print(f"Error al guardar el archivo JSON: {e}")


def get_columns_from_table(table_name):
    """
    Obtiene las columnas de una tabla a partir del archivo de metadata.

    Parámetros:
    - table_name (str): Nombre de la tabla.

    Retorna:
    - Un string con los nombres de las columnas separados por comas.
    """

    try:
        with open('metadata/metadata_tables.json', 'r') as file:
            metadata = json.load(file)
    except Exception as e:
        print(f"Error al cargar el archivo JSON: {e}")
        return

    columns = list(metadata.get(table_name, {}).keys())
    if not columns:
        print(
            f"No se encontraron columnas para la tabla {table_name} en el archivo JSON.")
        return
    columns_str = ', '.join(columns)
    return columns_str


def extract_full_data(sqlalchemy_engine, table_name):
    """
    Extracción FULL de datos desde una tabla de una base de datos SQL.

    Parámetros:
    - sqlalchemy_engine: Objeto de conexión de SQLAlchemy.
    - table_name (str): Nombre de la tabla desde la cual extraer los datos.

    Retorna:
    - Un DataFrame con todos los datos de la tabla.
    """

    # Obtener las columnas de la tabla a partir del archivo de metadata
    columns = get_columns_from_table(table_name)
    query = text(f"SELECT {columns} FROM {table_name}")
    with engine.connect() as conn:
      df = pd.read_sql_query(query, conn)
    return df


def extract_incremental_data(sqlalchemy_engine, table_name, state_file_path):
    """
    Extracción INCREMENTAL de datos desde una tabla de una base de datos SQL
    utilizando un archivo JSON para gestionar el ultimo valor incremental extraído.

    Parámetros:
    - sqlalchemy_engine: Objeto de conexión de SQLAlchemy.
    - table_name (str): Nombre de la tabla desde la cual extraer los datos.
    - state_file_path (str): Ruta del archivo JSON que contiene el estado de la replicación.

    Retorna:
    - Un DataFrame con los datos incrementales de la tabla.
    """
    # Obtener las columnas de la tabla a partir del archivo de metadata
    columns = get_columns_from_table(table_name)

    # Obtener la columna incremental y su último valor
    state = read_state_from_json(state_file_path)
    last_value = get_last_incremental_value(state, table_name)
    incremental_column = state[table_name]["incremental_column"]

    # Obtener los datos nuevos de la tabla
    query = text(f"SELECT {columns} FROM {table_name} WHERE {incremental_column} > '{last_value}'")
    with engine.connect() as conn:
      df = pd.read_sql_query(query, conn)

    if not df.empty:
        new_value = df[incremental_column].max()
        update_incremental_value(
            state, state_file_path, table_name, new_value)

    return df

### Hands-on!

A continuación vamos a ver una demostración con Python sobre como realizar extracción de de datos, específicamente de bases de datos relacionales, aplicando las técnicas de:
- extracción **full**
- extracción **incremental**
donde sea oportuno.

Vamos a trabajar con dos tablas:
- `customers` la cual contiene datos sobre clientes de una empresa. ***Supongamos*** que esa tabla no posee muchos registros y no se actualiza con mucha frecuencia. Por ende, conviene aplicar una extracción de tipo **full**.
- `payments`, posee registros sobre pagos registrados. Esta tabla posee una gran cantidad de registros, acumula un histórico enorme de pagos y se actualiza diariamente a partir de las nuevas operaciones realizadas. Dado este contexto, es oportuno aplicar una extracción **incremental**.

Toda la lógica requerida para aplicar estas técnicas se encuentran en los scripts `utils_db.py` y `utils_state.py`.

`utils_state.py` va a intervenir en la extracción incremental. Este tipo de extracción podemos considerarla como **stateful (con estado)** ya que debe mantener un registro de la última extracción realizada. Este registro debe contener algún valor referido a los datos de la última extracción. De esa forma, podrá determinar qué datos han cambiado desde la última extracción y obtener solo esos datos específicos. En este caso, el programa de extracción *recuerda* la última ejecución realizada.

Ese estado lo vamos a gestionar por medio de un archivo `.json` que tiene esta estructura
```json
{
    "table_name": {
        "incremental_column": "column_name",
        "last_value": "last_value"
    }
}
```
El archivo se llama `metadata_ingestion.json` y está en la carpeta `metadata/`.

Para este tipo de extracción, la tabla origen debe ofrecer una columna de tipo `date` o `datetime` que permita identificar registros nuevos.

`last_value` inicializará con un valor anómalo como 1900-01-01, de modo que la primera ejecución pueda capturar todos los registros. En las siguientes ejecuciones, `last_value` será actualizado con el máximo valor obtenido en la última ejecución.

La creación del archivo JSON y su inicialización lo haremos de forma manual. Mientras que la actualización del archivo será gestionada por el script `utils_state.py`.

*Cabe aclarar que una extracción incremental podría ser tipo stateless (sin estado).
En vez de almacenar el último valor registrado de la extracción, podríamos aplicar filtros de forma dinámica basándonos en la fecha de ejecución actual y obtener registros vinculados a esa fecha. Sin embargo, si el programa falla y no se ejecuta durante varios días, habría un esfuerzo adicional para recuperar los datos faltantes. A diferencia de la extracción stateful, que puede continuar desde el último punto de extracción.*
*

En primer lugar, vamos a establecer conexión con la base de datos e instanciaremos un objeto `engine` que nos permitirá interactuar con la base de datos.
La función `connect_to_db` se encarga de realizar esta tarea. Espera tres parámetros *(ante cualquier duda, ver el anexo)*:
- La ruta a un archivo de configuración que contiene los datos de conexión a la base de datos.
- La sección del archivo de configuración que contiene los datos de conexión.
- El nombre del driver que se utilizará para la conexión.

In [None]:
engine = connect_to_db(
    'pipeline.conf', 'mysql', 'mysql+pymysql'
    )

Una vez establecida la conexión, procedemos a obtener metadatos de todas las tablas de la base de datos. Esto lo hacemos con la función `get_metadata` que recibe como parámetro el objeto `engine` y genera un archivo JSON con la metadata en la carpeta `metadata/`.

Esta metadata contiene datos sobre las columnas de cada tabla, como el nombre, tipo de dato, si es clave primaria, si es clave foránea, si admite nulos, entre otros. Esta metadata es relevante para  de **comprender y documentar la estructura de la fuente de datos a consultar.**

In [None]:
get_metadata_db(engine)

### Extracción full
La función `extract_full_data` será la responsable de esta técnica.

Recibe como parámetro el objeto `engine` y el nombre de la tabla a extraer.

Se encarga de realizar una consulta SQL a la base de datos, utiliza la metadata vista anteriormente para obtener el nombre de las columnas y utilizarlas durante la ejecución de la consulta.

La función retorna un DataFrame con los datos extraídos.

In [None]:
df_customers = extract_full_data(engine, 'customers')

In [None]:
# Veamos los primeros registros de los datos
df_customers.head()

Unnamed: 0,customerNumber,customerName,phone,addressLine1,addressLine2,city,province,postalCode,country,createdDate,updatedDate
0,1,Mario Santos,+54 9 11 1234 5678,Calle Falsa 123,,Buenos Aires,,,Argentina,2024-02-29 02:10:10,2024-02-29 02:10:10
1,2,Emilio Ravenna,+54 9 11 8765 4321,Avenida Simulación 456,,Córdoba,,,Argentina,2024-02-29 02:10:10,2024-02-29 02:10:10
2,3,Pablo Lamponne,+54 9 11 2468 1357,Carrera Simulada 789,,Rosario,,,Argentina,2024-02-29 02:10:10,2024-02-29 02:10:10
3,4,Gabriel Medina,+54 9 11 7531 8642,Pasaje Fingido 987,,Mendoza,,,Argentina,2024-02-29 02:10:10,2024-02-29 02:10:10
4,5,Franco Milazzo,+54 9 11 9999 9999,Calle Actualizada 789,,Salta,,,Argentina,2024-02-29 02:10:10,2024-02-29 11:37:00


In [None]:
# Consultemos la cantidad de filas obtenidas
print(f"La cantidad de registros obtenidos es: {df_customers.shape[0]}")

La cantidad de registros obtenidos es: 11


Este tipo de extracción escanea la fuente de datos en su totalidad. Si ejecutamos esta función indefinidamente, obtendremos siempre los mismos datos, salvo que la tabla sea modificada.

### Extracción incremental
La función `extract_incremental_data` será la responsable de esta técnica.

Recibe como parámetro el objeto `engine`, el nombre de la tabla a extraer y la ruta al archivo JSON que contiene el estado de la última extracción *(recuerda lo que vimos mas arriba sobre **stateful**)*

Esta función ejecuta una consulta SQL a la base de datos,
- utilizando la metadata para obtener el nombre de las columnas
- y el archivo JSON con el estado de la última extración para poder obtener para poder filtrar solo los registros nuevos.

In [None]:
# Esta primera ejecución obtiene todos los datos
# porque en el archivo JSON inicializamos el valor 1900-01-01

df_payments = extract_incremental_data(
    engine, 'payments', 'metadata/metadata_ingestion.json'
    )

In [None]:
# Veamos los primeros registros de los datos
df_payments.head()

Unnamed: 0,customerNumber,checkNumber,paymentDate,amount
0,1,1001,2023-07-01,348.5
1,1,1002,2023-07-05,953.25
2,1,11001,2023-07-11,590.7
3,1,11002,2023-07-15,445.9
4,1,3001,2023-07-26,10000.5


In [None]:
print(f"La cantidad de registros obtenidos es: {df_payments.shape[0]}")

La cantidad de registros obtenidos es: 33


In [None]:
# Vamos a ejecutar otra vez la extracción incremental
# solo para demostrar que no se obtendrán datos nuevos

df_customers = extract_incremental_data(
    engine, 'payments', 'metadata/metadata_ingestion.json'
    )
print(f"La cantidad de registros obtenidos es: {df_customers.shape[0]}")

df_customers.head()

La cantidad de registros obtenidos es: 0


Unnamed: 0,customerNumber,checkNumber,paymentDate,amount
