# Review: Pyarrow Dataset

### Mis consideraciones

Creo que es una herramienta muy interesante. Los **beneficios** que se me ocurren de primeras son:
- Para leer la informacion de varios ficheros, no es necesario iterar y append.
- Se pueden hacer queries (selection of columns, filtering of records) antes de leer la informacion.
- Las particiones facilitan la organizacion de la informacion y lectura en paralelo.

La gran **pega** que le veo es:
- No se le pueden incluir nuevos campos, el esquema es cerrado. Es decir, todos los ficheros tienen que tener las mismas columnas o menos. **Esto resta flexibilidad**.
> NOTA: Para resolver esto se podria poner los valores de varios campos como nuevos registros (append) teniendo una nueva columna para identificar el nombre del campo correspondiente a cada valor. Para pasar estos campos a columnas habria que usar unstacking concatenando las columnas en base a un index. (**ver Dataset 4***).

### Consideraciones sobre el rendimiento de la partición de datasets

La partición de los conjuntos de datos tiene dos aspectos que afectan al rendimiento: aumenta el número de archivos y crea una estructura de directorios alrededor de los archivos. Ambos tienen tanto beneficios como costos. Dependiendo de la configuración y el tamaño de su conjunto de datos, los costos pueden superar los beneficios.

Debido a que las particiones dividen el conjunto de datos en varios archivos, los conjuntos de datos particionados se pueden leer y escribir con paralelismo. Sin embargo, cada archivo adicional añade un poco de sobrecarga en el procesamiento para la interacción del sistema de archivos. También aumenta el tamaño general del conjunto de datos, ya que cada archivo tiene algunos metadatos compartidos. Por ejemplo, cada archivo de parquet contiene el esquema y las estadísticas a nivel de grupo. El número de particiones es un piso para el número de archivos. Si particiona un conjunto de datos por fecha con un año de datos, tendrá al menos 365 archivos. Si divides por otra dimensión con 1.000 valores únicos, tendrás hasta 365.000 archivos. Esta multa de partición a menudo conduce a archivos pequeños que consisten principalmente en metadatos.

Los conjuntos de datos particionados crean estructuras de carpetas anidadas, y esas nos permiten podar qué archivos se cargan en un escaneo. Sin embargo, esto añade la sobrecarga al descubrimiento de archivos en el conjunto de datos, ya que tendremos que "listar recursivamente el directorio" para encontrar los archivos de datos. Las particiones demasiado finas pueden causar problemas aquí: la partición de un conjunto de datos por fecha durante un año de datos requerirá 365 llamadas de lista para encontrar todos los archivos; agregar otra columna con la cardinalidad 1.000 hará que 365 365 llamadas.

El diseño de partición más óptimo dependerá de sus datos, los patrones de acceso y los sistemas que leerán los datos. La mayoría de los sistemas, incluido Arrow, deberían funcionar en una gama de tamaños de archivo y diseños de partición, pero hay extremos que debe evitar. Estas pautas pueden ayudar a evitar algunos de los peores casos conocidos:

- Evite los archivos de menos de 20 MB y más de 2 GB.
- Evite la partición de diseños con más de 10.000 particiones distintas.

Para los formatos de archivo que tienen una noción de grupos dentro de un archivo, como Parquet, se aplican directrices similares. Los grupos de filas pueden proporcionar paralelismo al leer y permitir el salto de datos en función de las estadísticas, pero los grupos muy pequeños pueden hacer que los metadatos sean una parte significativa del tamaño del archivo. El escritor de archivos de Arrow proporciona valores predeterminados razonables para el tamaño del grupo en la mayoría de los casos.

### References:

- [Tabular Dataset](https://arrow.apache.org/docs/python/dataset.html)

In [4]:
import pathlib
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import numpy as np

In [2]:
# folder base
base = pathlib.Path("data")

### Dataset 1

Dos ficheros con mismas columnas pero diferentes registros.

In [36]:
# create new folder
(base / "parquet_dataset_1").mkdir(exist_ok=True)
# create tables
table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5})
# write tables
pq.write_table(table.slice(0, 5), base / "parquet_dataset_1/data1.parquet")
pq.write_table(table.slice(5, 10), base / "parquet_dataset_1/data2.parquet")

In [37]:
# create dataset (parquet format)
dataset = ds.dataset(base / "parquet_dataset_1", format="parquet")
# display
print(dataset.files)
print(dataset.schema.to_string(show_field_metadata=False))
# get tabla to df
dataset.to_table().to_pandas()

['data/parquet_dataset_1/data1.parquet', 'data/parquet_dataset_1/data2.parquet']
a: int64
b: double
c: int64


Unnamed: 0,a,b,c
0,0,-0.481995,1
1,1,0.203396,2
2,2,0.108684,1
3,3,-0.175066,2
4,4,-0.849862,1
5,5,-0.143516,2
6,6,1.337337,1
7,7,0.461611,2
8,8,1.247307,1
9,9,2.165032,2


In [38]:
# columns selection and filtering
dataset.to_table(columns = ['a', 'b'], filter = (ds.field('c') == 2) & (ds.field('b') < 0)).to_pandas()

Unnamed: 0,a,b
0,3,-0.175066
1,5,-0.143516


### Dataset 2

Dos ficheros con alguna columna en comun y alguna columna no comun.

In [39]:
# create new folder
(base / "parquet_dataset_2").mkdir(exist_ok=True)
# create tables
table1 = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5})
table2 = pa.table({'a': range(10), 'b': np.random.randn(10)})
# write tables
pq.write_table(table2, base / "parquet_dataset_2/data1.parquet")
pq.write_table(table1, base / "parquet_dataset_2/data2.parquet")

In [40]:
# create dataset (parquet format)
dataset = ds.dataset(base / "parquet_dataset_2", format="parquet")
# display
print(dataset.files)
print(dataset.schema.to_string(show_field_metadata=False))
# get tabla to df
dataset.to_table().to_pandas()

['data/parquet_dataset_2/data1.parquet', 'data/parquet_dataset_2/data2.parquet']
a: int64
b: double


Unnamed: 0,a,b
0,0,-0.000807
1,1,2.585116
2,2,1.534764
3,3,-0.163543
4,4,0.973584
5,5,0.959005
6,6,-0.305535
7,7,-0.175971
8,8,0.830626
9,9,1.436221


Dependiendo del orden de creacion de los ficheros, la columna c aparace (con NaN values en aquellos registros inexistentes) o directamente no aparece. Esto no interesa.

### Dataset 3

Dataset en carpetas diferentes dentro de la carpeta del dataset.

In [84]:
# create new folder
(base / "parquet_dataset_3" / "folder_1").mkdir(exist_ok=True, parents=True)
(base / "parquet_dataset_3" / "folder_2").mkdir(exist_ok=True, parents=True)
# create tables
table1 = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5})
table2 = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [10, 20] * 5})
# write tables
pq.write_table(table2, base / "parquet_dataset_3/folder_1/data1.parquet")
pq.write_table(table1, base / "parquet_dataset_3/folder_2/data2.parquet")

In [85]:
# create dataset (parquet format)
dataset = ds.dataset(base / "parquet_dataset_3", format="parquet")
# display
print(dataset.files)
print(dataset.schema.to_string(show_field_metadata=False))
# get tabla to df
dataset.to_table().to_pandas()

['data/parquet_dataset_3/folder_1/data1.parquet', 'data/parquet_dataset_3/folder_2/data2.parquet']
a: int64
b: double
c: int64


Unnamed: 0,a,b,c
0,0,0.876306,10
1,1,2.251593,20
2,2,-0.269898,10
3,3,0.377358,20
4,4,-1.388461,10
5,5,1.089539,20
6,6,-1.050133,10
7,7,1.359397,20
8,8,0.865195,10
9,9,-1.044451,20


No tiene ningun problema de agrupar toda la informacion aunque este en diferentes carpetas.

### Dataset 4

Una columna fija (sera una de index) una de valores de varios campos y otra mas para identificar los campos. Luego se hari unstacking para poner los campos en registros como columnas.

In [86]:
# create new folder
(base / "parquet_dataset_4").mkdir(exist_ok=True)
# create tables
table1 = pa.table({'index': range(5), 'values': np.random.randn(5), 'varname': ['a'] * 5})
table2 = pa.table({'index': range(5), 'values': np.random.randn(5), 'varname': ['b'] * 5})
# write tables
pq.write_table(table2, base / "parquet_dataset_4/data1.parquet")
pq.write_table(table1, base / "parquet_dataset_4/data2.parquet")

In [87]:
# create dataset (parquet format)
dataset = ds.dataset(base / "parquet_dataset_4", format="parquet")
# display
print(dataset.files)
print(dataset.schema.to_string(show_field_metadata=False))
# get tabla to df
dataset.to_table().to_pandas()

['data/parquet_dataset_4/data1.parquet', 'data/parquet_dataset_4/data2.parquet']
index: int64
values: double
varname: string


Unnamed: 0,index,values,varname
0,0,0.529821,b
1,1,-2.140198,b
2,2,-0.461633,b
3,3,-0.787426,b
4,4,0.054207,b
5,0,0.655681,a
6,1,-1.487814,a
7,2,1.095133,a
8,3,-1.013163,a
9,4,-0.035556,a


In [172]:
# table to df
df = dataset.to_table().to_pandas()
# fields: records to columns
df = df.set_index(["index","varname"]).unstack(level = "varname")
df.columns = df.columns.droplevel(0)
df.columns.name = None
df


Unnamed: 0_level_0,a,b
index,Unnamed: 1_level_1,Unnamed: 2_level_1
0,0.655681,0.529821
1,-1.487814,-2.140198
2,1.095133,-0.461633
3,-1.013163,-0.787426
4,-0.035556,0.054207


### Partitioned data

Al escribir una nueva tabla en la misma carpeta con la misma particion, es incluida como data adicional. Si ademas la columna usada para hacer la particion tiene nuevos valores, crea nueva carpeta de particion.

Si se incluyen nuevos campos, pasa lo mismo que antes, depende del orden, el nuevo campo aparece o no.

In [77]:
# create table
table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5,
                  'part': ['a'] * 5 + ['b'] * 5})
# write parquet
pq.write_to_dataset(table, base / "parquet_dataset_partitioned",
                    partition_cols=['part'])

In [78]:
# create table
table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [10, 20] * 5,
                  'part': ['a'] * 5 + ['c'] * 5})
# write parquet
pq.write_to_dataset(table, base / "parquet_dataset_partitioned",
                    partition_cols=['part'])

In [79]:
# create dataset (parquet format)
dataset = ds.dataset(base / "parquet_dataset_partitioned", format="parquet", partitioning="hive")
# display
print(dataset.files)
print(dataset.schema.to_string(show_field_metadata=False))
# get tabla to df
dataset.to_table().to_pandas().head()

['data/parquet_dataset_partitioned/part=a/839b655bca854027a9bcf1203d69fa0b-0.parquet', 'data/parquet_dataset_partitioned/part=a/c218a888a4f8403b9ac5dec37c131c46-0.parquet', 'data/parquet_dataset_partitioned/part=b/839b655bca854027a9bcf1203d69fa0b-0.parquet', 'data/parquet_dataset_partitioned/part=c/c218a888a4f8403b9ac5dec37c131c46-0.parquet']
a: int64
b: double
c: int64
part: string


Unnamed: 0,a,b,c,part
0,0,0.406247,1,a
1,1,-0.342056,2,a
2,2,-0.436684,1,a
3,3,0.577777,2,a
4,4,0.473169,1,a


In [80]:
# query
dataset.to_table(filter=ds.field("part") == "c").to_pandas()

Unnamed: 0,a,b,c,part
0,5,-0.146489,20,c
1,6,-0.706687,10,c
2,7,0.536978,20,c
3,8,-0.155612,10,c
4,9,-0.819512,20,c


### Writing a dataset directly

In [66]:
# create table
table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 5})
# write dataset
ds.write_dataset(table, base / "sample_dataset", format="parquet")

### Partition selection manually

In [68]:
# create partition
part = ds.partitioning(pa.schema([("c", pa.int16())]), flavor="hive")
# write dataset
ds.write_dataset(table, base / "partitioned_dataset", format="parquet", partitioning=part)