# **Pipeline** - **Parquet** *(from HTTP endpoint)* **to MinIO Bucket**

## **Librerías**

- **`dlt`**
 → Framework para definir pipelines de extracción, transformación y carga *(ETL/ELT)*.

- **`pyarrow.parquet`** (`pq`)
→ Módulo de Apache Arrow usado para leer el archivo Parquet como tabla Arrow.

- **`fsspec`**
→ Esta librería unifica el acceso a sistemas de archivos con una sola API, abstrae backends como:
    - **`s3fs`** *(para AWS S3 y compatibles como MinIO)*,
    - **`adlfs`** *(para Azure Data Lake / Blob Storage)*,
    - **`gcsfs`** *(para Google Cloud Storage)*,
    - y también acceso a local filesystem, HTTP/FTP, HDFS, entre otros.

In [None]:
import dlt
import pyarrow.parquet as pq
import fsspec

## **Extrayendo el Parquet del endpoint HTTP**

Para extraer el archivo se empieza definiendo un recurso de dlt (`@dlt.resource`) mediante [un decorador propio de la librería](https://dlthub.com/docs/general-usage/resource): este recurso permite construir un flujo de datos a partir de los datos que le entreguemos.

* dlt permite flujos de datos con una [carga **incremental**](https://dlthub.com/docs/general-usage/incremental-loading). Su aplicación en dlt se observa desde cuando hacemos yield en una función para retornar los datos iterativamente.

    * `yield` convierte a las funciones en un funciones generadoras: resulta necesario iterar *(mediante la palabra clave `next()`, aunque esto es lo que posiblemente se hace detrás)*.

    * En este caso no se aplica en su máximo esplendor *→ Esta carga de datos es un **Full load***.

* [ `fsspec.open()`](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.open) es usado aquí para poder extraer el Parquet del endpoint. Este archivo es leído como binario (`mode="rb"`), dada la naturaleza de este formato.

* Para transformar estos binarios en un formato que sea eficiente (y que no mate al computador) usamos la librería PyArrow con su módulo pyarrow.parquet para que este pueda leer archivos Parquet y, posteriormente, leer este mismo como una tabla Arrow.

    *  Estas tablas [son parecidas a los dataframes de Pandas](https://medium.com/data-engineering-with-dremio/getting-started-with-data-analytics-using-pyarrow-in-python-ac7a100bc569), pero están diseñadas específicamente para soportar el formato columnar que maneja Apache Arrow.

    * Pueden ser particionadas y procesadas en paralelo para mejor el rendimiento en datasets más grandes.

* Si bien desde aquí dlt ya podría realizar la carga como Parquet al bucket, es valioso añadir que en caso de que quisiéramos transformar este formato a una dataframe de Pandas basta solo con usar el método `to_pandas()`.

In [None]:
# Define un recurso que lee datos de un parquet remoto
@dlt.resource(table_name="df_data")
def my_df():
    parquet_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet"
    with fsspec.open(parquet_url, mode="rb") as f:
        table = pq.read_table(f)
        yield table

## **Instanciando el pipeline**
Instanciar un pipeline nos permite mover los datos que extraemos a un destino. Este método acepta tanto [*sources*](https://dlthub.com/docs/general-usage/source) como [*resources*](https://dlthub.com/docs/general-usage/resource) de dlt.

- Para mandarlo a un bucket *(ya sea de Azure BS, Amazon S3 o MinIO)* usamos siempre  `destination="filesystem"`.

- `dataset_name` indica el *namespace* que se va a usar para los datos que se van a guardar.

- No es necesario declarar ninguna credencial dentro de este método ni en ningún otro que vayamos a usar; ya dlt las lee desde el archivo `secrets.toml`.

In [None]:
pipeline = dlt.pipeline(
    pipeline_name="parquet_to_minio",
    destination="filesystem",
    dataset_name="taxis_parquet",
)

## **Ejecutando el pipeline**

Con este método podemos ejecutar el resource que declaramos anteriormente (función `my_df`). 

- Declaramos que el formato en que vamos a cargar el archivo en MinIO va a ser Parquet con `loader_file_format="parquet"`.

- En caso de que la tabla ya exista, declaramos `write_disposition="replace"` para que se sobrescriba.

In [None]:
load_info = pipeline.run(
    my_df,
    loader_file_format="parquet",
    write_disposition="replace"
)
print(load_info)