# Lectura 15: Parquet

## `read_parquet`

La primera función que vamos a estudiar es `read_parquet`, la cual nos va a permitir crear un DataFrame a partir de la lectura de un archivo parquet.

In [1]:
import polars as pl

### Leer un solo archivo parquet

In [2]:
df = pl.read_parquet('./data/parquet/datos.parquet')

In [3]:
print(df)

shape: (48_137, 16)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ video_id  ┆ trending_ ┆ title     ┆ channel_t ┆ … ┆ comments_ ┆ ratings_d ┆ video_err ┆ descript │
│ ---       ┆ date      ┆ ---       ┆ itle      ┆   ┆ disabled  ┆ isabled   ┆ or_or_rem ┆ ion      │
│ str       ┆ ---       ┆ str       ┆ ---       ┆   ┆ ---       ┆ ---       ┆ oved      ┆ ---      │
│           ┆ str       ┆           ┆ str       ┆   ┆ str       ┆ str       ┆ ---       ┆ str      │
│           ┆           ┆           ┆           ┆   ┆           ┆           ┆ str       ┆          │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ 2kyS6SvSY ┆ 17.14.11  ┆ WE WANT   ┆ CaseyNeis ┆ … ┆ False     ┆ False     ┆ False     ┆ SHANTELL │
│ SE        ┆           ┆ TO TALK   ┆ tat       ┆   ┆           ┆           ┆           ┆ 'S       │
│           ┆           ┆ ABOUT OUR ┆           ┆   ┆           ┆      

#### Otra alternativa que leería todos los archivos parquet dentro de una carpeta

In [4]:
df1 = pl.read_parquet('./data/multi_parquet/*.parquet')

In [5]:
print(df1)

shape: (898_505, 30)
┌──────┬───────┬─────┬─────────────┬───┬──────────────┬──────────────┬──────────────┬──────────────┐
│ YEAR ┆ MONTH ┆ DAY ┆ DAY_OF_WEEK ┆ … ┆ SECURITY_DEL ┆ AIRLINE_DELA ┆ LATE_AIRCRAF ┆ WEATHER_DELA │
│ ---  ┆ ---   ┆ --- ┆ ---         ┆   ┆ AY           ┆ Y            ┆ T_DELAY      ┆ Y            │
│ i32  ┆ i32   ┆ i32 ┆ i32         ┆   ┆ ---          ┆ ---          ┆ ---          ┆ ---          │
│      ┆       ┆     ┆             ┆   ┆ i32          ┆ i32          ┆ i32          ┆ i32          │
╞══════╪═══════╪═════╪═════════════╪═══╪══════════════╪══════════════╪══════════════╪══════════════╡
│ 2015 ┆ 1     ┆ 1   ┆ 4           ┆ … ┆ null         ┆ null         ┆ null         ┆ null         │
│ 2015 ┆ 10    ┆ 25  ┆ 7           ┆ … ┆ null         ┆ null         ┆ null         ┆ null         │
│ 2015 ┆ 1     ┆ 1   ┆ 4           ┆ … ┆ null         ┆ null         ┆ null         ┆ null         │
│ 2015 ┆ 10    ┆ 25  ┆ 7           ┆ … ┆ null         ┆ null         ┆

La forma recomendada de leer un archivo parquet cuando existan varios archivos parquet en una sola carpeta es utilizar el parámetro `use_pyarrow=True`. El propio Polars en su documentación expresa que esta opción es más estable que la que la opción de parquet reader que trae Rust que es la opción por defecto.

In [6]:
df2 = pl.read_parquet('./data/multi_parquet/', use_pyarrow=True)

In [7]:
print(df2)

shape: (898_505, 30)
┌──────┬───────┬─────┬─────────────┬───┬──────────────┬──────────────┬──────────────┬──────────────┐
│ YEAR ┆ MONTH ┆ DAY ┆ DAY_OF_WEEK ┆ … ┆ SECURITY_DEL ┆ AIRLINE_DELA ┆ LATE_AIRCRAF ┆ WEATHER_DELA │
│ ---  ┆ ---   ┆ --- ┆ ---         ┆   ┆ AY           ┆ Y            ┆ T_DELAY      ┆ Y            │
│ i32  ┆ i32   ┆ i32 ┆ i32         ┆   ┆ ---          ┆ ---          ┆ ---          ┆ ---          │
│      ┆       ┆     ┆             ┆   ┆ i32          ┆ i32          ┆ i32          ┆ i32          │
╞══════╪═══════╪═════╪═════════════╪═══╪══════════════╪══════════════╪══════════════╪══════════════╡
│ 2015 ┆ 1     ┆ 1   ┆ 4           ┆ … ┆ null         ┆ null         ┆ null         ┆ null         │
│ 2015 ┆ 10    ┆ 25  ┆ 7           ┆ … ┆ null         ┆ null         ┆ null         ┆ null         │
│ 2015 ┆ 1     ┆ 1   ┆ 4           ┆ … ┆ null         ┆ null         ┆ null         ┆ null         │
│ 2015 ┆ 10    ┆ 25  ┆ 7           ┆ … ┆ null         ┆ null         ┆

### Leer un archivo parquet que esté particionado.
Aquí volvemos a usar el parámetro `use_pyarrow=True`.

In [8]:
df3 = pl.read_parquet('./data/vuelos/', use_pyarrow=True)

In [9]:
print(df3)

shape: (5_819_079, 31)
┌──────┬───────┬─────┬─────────────┬───┬───────────────┬─────────────────┬───────────────┬─────────┐
│ YEAR ┆ MONTH ┆ DAY ┆ DAY_OF_WEEK ┆ … ┆ AIRLINE_DELAY ┆ LATE_AIRCRAFT_D ┆ WEATHER_DELAY ┆ AIRLINE │
│ ---  ┆ ---   ┆ --- ┆ ---         ┆   ┆ ---           ┆ ELAY            ┆ ---           ┆ ---     │
│ i32  ┆ i32   ┆ i32 ┆ i32         ┆   ┆ i32           ┆ ---             ┆ i32           ┆ cat     │
│      ┆       ┆     ┆             ┆   ┆               ┆ i32             ┆               ┆         │
╞══════╪═══════╪═════╪═════════════╪═══╪═══════════════╪═════════════════╪═══════════════╪═════════╡
│ 2015 ┆ 1     ┆ 1   ┆ 4           ┆ … ┆ null          ┆ null            ┆ null          ┆ AA      │
│ 2015 ┆ 10    ┆ 25  ┆ 7           ┆ … ┆ null          ┆ null            ┆ null          ┆ AA      │
│ 2015 ┆ 1     ┆ 1   ┆ 4           ┆ … ┆ null          ┆ null            ┆ null          ┆ AA      │
│ 2015 ┆ 10    ┆ 25  ┆ 7           ┆ … ┆ null          ┆ null       

### Seleccionar solo un conjunto de columnas del archivo parquet
Si deseamos seleccionar un conjunto de columnas del archivo parquet que vamos a leer debemos usar el parámetro `columns`.

In [10]:
df4 = pl.read_parquet('./data/vuelos/', use_pyarrow=True, columns=['YEAR', 'MONTH', 'DAY'])

print(df4)

shape: (5_819_079, 3)
┌──────┬───────┬─────┐
│ YEAR ┆ MONTH ┆ DAY │
│ ---  ┆ ---   ┆ --- │
│ i32  ┆ i32   ┆ i32 │
╞══════╪═══════╪═════╡
│ 2015 ┆ 1     ┆ 1   │
│ 2015 ┆ 10    ┆ 25  │
│ 2015 ┆ 1     ┆ 1   │
│ 2015 ┆ 10    ┆ 25  │
│ 2015 ┆ 1     ┆ 1   │
│ …    ┆ …     ┆ …   │
│ 2015 ┆ 10    ┆ 25  │
│ 2015 ┆ 10    ┆ 25  │
│ 2015 ┆ 10    ┆ 25  │
│ 2015 ┆ 10    ┆ 25  │
│ 2015 ┆ 10    ┆ 25  │
└──────┴───────┴─────┘


## `scan_parquet`

Lee de forma laizy desde un archivo(o archivos) parquet. Estos archivos pueden estar alojados localmente o en la nube. Esta función permite que el optimizador de consultas baje los predicados y las proyecciones al nivel del escaneo, lo que generalmente aumenta el rendimiento y reduce la sobrecarga de memoria.

In [11]:
df5 = pl.scan_parquet('./data/multi_parquet/*.parquet')

print(df5.collect())

shape: (898_505, 30)
┌──────┬───────┬─────┬─────────────┬───┬──────────────┬──────────────┬──────────────┬──────────────┐
│ YEAR ┆ MONTH ┆ DAY ┆ DAY_OF_WEEK ┆ … ┆ SECURITY_DEL ┆ AIRLINE_DELA ┆ LATE_AIRCRAF ┆ WEATHER_DELA │
│ ---  ┆ ---   ┆ --- ┆ ---         ┆   ┆ AY           ┆ Y            ┆ T_DELAY      ┆ Y            │
│ i32  ┆ i32   ┆ i32 ┆ i32         ┆   ┆ ---          ┆ ---          ┆ ---          ┆ ---          │
│      ┆       ┆     ┆             ┆   ┆ i32          ┆ i32          ┆ i32          ┆ i32          │
╞══════╪═══════╪═════╪═════════════╪═══╪══════════════╪══════════════╪══════════════╪══════════════╡
│ 2015 ┆ 1     ┆ 1   ┆ 4           ┆ … ┆ null         ┆ null         ┆ null         ┆ null         │
│ 2015 ┆ 10    ┆ 25  ┆ 7           ┆ … ┆ null         ┆ null         ┆ null         ┆ null         │
│ 2015 ┆ 1     ┆ 1   ┆ 4           ┆ … ┆ null         ┆ null         ┆ null         ┆ null         │
│ 2015 ┆ 10    ┆ 25  ┆ 7           ┆ … ┆ null         ┆ null         ┆

## `read_parquet_schema`

Con esta función obtendremos un diccionario con el schema del archivo parquet sin necesidad de leerlo.

In [12]:
schema = pl.read_parquet_schema('./data/parquet/datos.parquet')

schema

{'video_id': String,
 'trending_date': String,
 'title': String,
 'channel_title': String,
 'category_id': String,
 'publish_time': Datetime(time_unit='ns', time_zone=None),
 'tags': String,
 'views': Int32,
 'likes': Int32,
 'dislikes': Int32,
 'comment_count': Int32,
 'thumbnail_link': String,
 'comments_disabled': String,
 'ratings_disabled': String,
 'video_error_or_removed': String,
 'description': String}

## `write_parquet`

Con esta función escribiremos un archivo parquet. Para este ejemplo usaremos el parámetro `use_pyarrow=True` lo cual utilizará la implementación de parquet C++ en vez de la implementación de parquet Rust.

In [13]:
df.write_parquet('./data/salida.parquet', use_pyarrow=True)

A continuación vamos a escribir un parquet particionado, para ello vamos a utilizar el `df3` que previamente hemos creado y lo escribiremos particionado por la columna `MONTH`. Por último, cambiaremos el formato de compresión a snappy con el parámetro `compression='snappy'`.

Respecto a la compresión en su documentación Polars nos dice lo siguiente:
- Seleccionar `zstd` (opción por defecto) par un un buen desempeño de compresión.
- Seleccionar `lz4` para una rápida compresión/descompresión.
- Seleccionar `snappy` para garantizar una mayor compatibilidad con lectores de parquet más antigüos

In [14]:
df3.write_parquet(
    './data/salida_particionada',
    use_pyarrow=True,
    pyarrow_options={"partition_cols": ['MONTH']},
    compression='snappy'
)

## `sink_parquet`

Esta función nos permitirá evaluar la query que define al lazyframe y escribirla en un archivo parquet. Esto permite que los resultados que sean más grandes que la memoria RAM puedan ser escribidos en disco.

In [15]:
df5.sink_parquet('./data/salida_lazy.parquet')