# Extracción de datos
## Bases de datos relacionales

A continuación vamos a ver una demostración con Python sobre como realizar extracción de de datos, específicamente de bases de datos relacionales, aplicando las técnicas de:
- extracción **full**
- extracción **incremental**
donde sea oportuno.

Vamos a trabajar con dos tablas:
- `customers` la cual contiene datos sobre clientes de una empresa. ***Supongamos*** que esa tabla no posee muchos registros y no se actualiza con mucha frecuencia. Por ende, conviene aplicar una extracción de tipo **full**.
- `payments`, posee registros sobre pagos registrados. Esta tabla posee una gran cantidad de registros, acumula un histórico enorme de pagos y se actualiza diariamente a partir de las nuevas operaciones realizadas. Dado este contexto, es oportuno aplicar una extracción **incremental**.

Toda la lógica requerida para aplicar estas técnicas se encuentran en los scripts `utils_db.py` y `utils_state.py`.

`utils_state.py` va a intervenir en la extracción incremental. Este tipo de extracción podemos considerarla como **stateful (con estado)** ya que debe mantener un registro de la última extracción realizada. Este registro debe contener algún valor referido a los datos de la última extracción. De esa forma, podrá determinar qué datos han cambiado desde la última extracción y obtener solo esos datos específicos. En este caso, el programa de extracción *recuerda* la última ejecución realizada.

Ese estado lo vamos a gestionar por medio de un archivo `.json` que tiene esta estructura
```json
{
    "table_name": {
        "incremental_column": "column_name",
        "last_value": "last_value"
    }
}
```
El archivo se llama `metadata_ingestion.json` y está en la carpeta `metadata/`.

Para este tipo de extracción, la tabla origen debe ofrecer una columna de tipo `date` o `datetime` que permita identificar registros nuevos.

`last_value` inicializará con un valor anómalo como 1900-01-01, de modo que la primera ejecución pueda capturar todos los registros. En las siguientes ejecuciones, `last_value` será actualizado con el máximo valor obtenido en la última ejecución.

La creación del archivo JSON y su inicialización lo haremos de forma manual. Mientras que la actualización del archivo será gestionada por el script `utils_state.py`.

*Cabe aclarar que una extracción incremental podría ser tipo stateless (sin estado).
En vez de almacenar el último valor registrado de la extracción, podríamos aplicar filtros de forma dinámica basándonos en la fecha de ejecución actual y obtener registros vinculados a esa fecha. Sin embargo, si el programa falla y no se ejecuta durante varios días, habría un esfuerzo adicional para recuperar los datos faltantes. A diferencia de la extracción stateful, que puede continuar desde el último punto de extracción.*
*

Manos a la obra.
Vamos a importar todas las funciones de `utils_db.py`

In [1]:
from utils_db import *

En primer lugar, vamos a establecer conexión con la base de datos e instanciaremos un objeto `engine` que nos permitirá interactuar con la base de datos.
La función `connect_to_db` se encarga de realizar esta tarea. Espera tres parámetros *(ante cualquier duda, ver el anexo)*:
- La ruta a un archivo de configuración que contiene los datos de conexión a la base de datos.
- La sección del archivo de configuración que contiene los datos de conexión.
- El nombre del driver que se utilizará para la conexión.

In [2]:
engine = connect_to_db(
    'pipeline.conf', 'mysql', 'mysql+pymysql'
    )

Una vez establecida la conexión, procedemos a obtener metadatos de todas las tablas de la base de datos. Esto lo hacemos con la función `get_metadata` que recibe como parámetro el objeto `engine` y genera un archivo JSON con la metadata en la carpeta `metadata/`.

Esta metadata contiene datos sobre las columnas de cada tabla, como el nombre, tipo de dato, si es clave primaria, si es clave foránea, si admite nulos, entre otros. Esta metadata es relevante para  de **comprender y documentar la estructura de la fuente de datos a consultar.**

In [3]:
get_metadata_db(engine)

### Extracción full
La función `extract_full_data` será la responsable de esta técnica.

Recibe como parámetro el objeto `engine` y el nombre de la tabla a extraer.

Se encarga de realizar una consulta SQL a la base de datos, utiliza la metadata vista anteriormente para obtener el nombre de las columnas y utilizarlas durante la ejecución de la consulta.

La función retorna un DataFrame con los datos extraídos.

In [11]:
df_customers = extract_full_data(engine, 'customers')

In [13]:
# Veamos los primeros registros de los datos
df_customers.head()

Unnamed: 0,customerNumber,customerName,phone,addressLine1,addressLine2,city,province,postalCode,country,createdDate,updatedDate
0,1,Mario Santos,+54 9 11 1234 5678,Calle Falsa 123,,Buenos Aires,,,Argentina,2025-03-15 14:58:59,2025-03-15 14:58:59
1,2,Emilio Ravenna,+54 9 11 8765 4321,Avenida Simulación 456,,Córdoba,,,Argentina,2025-03-15 14:58:59,2025-03-15 14:58:59
2,3,Pablo Lamponne,+54 9 11 2468 1357,Carrera Simulada 789,,Rosario,,,Argentina,2025-03-15 14:58:59,2025-03-15 14:58:59
3,4,Gabriel Medina,+54 9 11 7531 8642,Pasaje Fingido 987,,Mendoza,,,Argentina,2025-03-15 14:58:59,2025-03-15 14:58:59
4,5,Franco Milazzo,+54 9 11 9999 9999,Calle Actualizada 789,,Salta,,,Argentina,2025-03-15 14:58:59,2025-03-15 18:50:55


In [12]:
# Consultemos la cantidad de filas obtenidas
print(f"La cantidad de registros obtenidos es: {df_customers.shape[0]}")

La cantidad de registros obtenidos es: 11


Este tipo de extracción escanea la fuente de datos en su totalidad. Si ejecutamos esta función indefinidamente, obtendremos siempre los mismos datos, salvo que la tabla sea modificada.

### Extracción incremental
La función `extract_incremental_data` será la responsable de esta técnica.

Recibe como parámetro el objeto `engine`, el nombre de la tabla a extraer y la ruta al archivo JSON que contiene el estado de la última extracción *(recuerda lo que vimos mas arriba sobre **stateful**)*

Esta función ejecuta una consulta SQL a la base de datos,
- utilizando la metadata para obtener el nombre de las columnas
- y el archivo JSON con el estado de la última extración para poder obtener para poder filtrar solo los registros nuevos.

In [17]:
# Esta primera ejecución obtiene todos los datos
# porque en el archivo JSON inicializamos el valor 1900-01-01

df_payments = extract_incremental_data(
    engine, 'payments', 'metadata/metadata_ingestion.json'
    )

In [18]:
# Veamos los primeros registros de los datos
df_payments.head()

Unnamed: 0,customerNumber,checkNumber,paymentDate,amount


In [19]:
print(f"La cantidad de registros obtenidos es: {df_payments.shape[0]}")

La cantidad de registros obtenidos es: 0


In [21]:
# Vamos a ejecutar otra vez la extracción incremental
# solo para demostrar que no se obtendrán datos nuevos

df_customers = extract_incremental_data(
    engine, 'payments', 'metadata/metadata_ingestion.json'
    )
print(f"La cantidad de registros obtenidos es: {df_customers.shape[0]}")

df_customers.head()

La cantidad de registros obtenidos es: 3


Unnamed: 0,customerNumber,checkNumber,paymentDate,amount
0,1,3001,2023-07-26,10000.5
1,2,3002,2023-07-26,7500.2
2,3,3003,2023-07-26,5075.0
