# Uso de esquemas para acelerar la lectura en Spark DataFrames

Si bien Spark es lo mejor desde que se inventó el pan de molde para trabajar con grandes cantidades de datos, definitivamente me doy cuenta de que tengo mucho que aprender antes de poder aprovechar todo su potencial. Un truco que descubrí recientemente fue usar esquemas explícitos para acelerar la velocidad con la que PySpark puede leer un CSV en un DataFrame.

Al utilizar PySpark `spark.read_csv` para leer un CSV , la forma más sencilla es establecer el `inferSchema` argumento en `True`. Esto significa que PySpark intentará verificar los datos para determinar qué tipo de datos es cada columna.

El problema con esta operación es que consume mucha memoria, especialmente para conjuntos de datos grandes, ya que Spark necesita analizar una cantidad suficiente de datos para inferir correctamente el tipo. Imagine que tiene una columna con números enteros en las primeras 1000 filas, pero luego una cadena en la fila 1001. Si PySpark hubiera inferido que esta columna se basaba `IntegerType` en las primeras filas, terminaríamos con valores faltantes para cada una de las filas que contienen una cadena. Por lo tanto, PySpark necesita escanear todo el conjunto de datos o tomar muestras aleatorias de suficientes filas para inferir el tipo.

Una forma de solucionar este problema es determinar el tipo de datos de antemano y pasar esta información a PySpark mediante un esquema. La sintaxis para esto es bastante sencilla. El nombre y el tipo de cada columna se definen mediante el `StructField` método `from pyspark.sql`:

In [0]:
StructField("trip_id", StringType(), True)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-3629425818905074>, line 1[0m
[0;32m----> 1[0m [43mStructField[49m([38;5;124m"[39m[38;5;124mtrip_id[39m[38;5;124m"[39m, StringType(), [38;5;28;01mTrue[39;00m)

[0;31mNameError[0m: name 'StructField' is not defined

Los tres argumentos que `StructField` toma son el nombre que le gustaría darle a la columna, el tipo de datos de la columna y si el campo puede contener valores nulos (como un booleano). Los tipos de datos de columna disponibles también están en `pyspark.sql`, y cubren una amplia variedad de tipos de datos posibles, desde cadenas, flotantes y enteros hasta booleanos y de fecha y hora. Se crea un `StructField` para cada columna y estos se pasan como una lista a `pyspark.sql`. Luego `StructType`, este esquema se puede pasar al argumento `spark.read.csv` de `schema`.

In [0]:
StructType(
    [StructField("trip_id", StringType(), True),
    StructField("call_type", StringType(), True)]
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-3629425818905073>, line 1[0m
[0;32m----> 1[0m [43mStructType[49m(
[1;32m      2[0m     [StructField([38;5;124m"[39m[38;5;124mtrip_id[39m[38;5;124m"[39m, StringType(), [38;5;28;01mTrue[39;00m),
[1;32m      3[0m     StructField([38;5;124m"[39m[38;5;124mcall_type[39m[38;5;124m"[39m, StringType(), [38;5;28;01mTrue[39;00m)]
[1;32m      4[0m )

[0;31mNameError[0m: name 'StructType' is not defined

Para probar cuánto más rápido es usar un esquema, utilizaré el conjunto de datos [Taxi Service Trajectory](https://archive.ics.uci.edu/dataset/339/taxi+service+trajectory+prediction+challenge+ecml+pkdd+2015) del Repositorio de aprendizaje automático de UCI . Este conjunto de datos tiene 9 columnas y alrededor de 1,7 millones de filas. Vamos a cargar el contenido train.csv desde el DBFS.

In [0]:
from pyspark.sql import SparkSession
from time import time

spark = SparkSession.builder.getOrCreate() # SparkSession de forma programativa


Creamos un Dataframe para guardar nuestro `train.csv`.

Veamos primero cuánto tiempo lleva leer estos datos cuando PySpark tiene que inferir el esquema:

In [0]:
# Ruta del archivo CSV que ya tienes
data_location = "/FileStore/dataframes-spark/train.csv"

# Iniciar el temporizador para medir el tiempo
t1 = time()

# Leer el archivo CSV con inferencia de esquema y encabezado
data_inferred = spark.read.csv(data_location, header='True', inferSchema=True)

# Medir el tiempo final
t2 = time()

# Imprimir el tiempo que tardó en leer el archivo
print('Completed in %s sec.' % (str(t2 - t1)))

# Mostrar las primeras filas del DataFrame para verificar los datos
data_inferred.show()

Completed in 31.851423740386963 sec.
+-------------------+---------+-----------+------------+--------+----------+--------+------------+--------------------+
|            TRIP_ID|CALL_TYPE|ORIGIN_CALL|ORIGIN_STAND| TAXI_ID| TIMESTAMP|DAY_TYPE|MISSING_DATA|            POLYLINE|
+-------------------+---------+-----------+------------+--------+----------+--------+------------+--------------------+
|1372636858620000589|        C|       NULL|        NULL|20000589|1372636858|       A|       false|[[-8.618643,41.14...|
|1372637303620000596|        B|       NULL|           7|20000596|1372637303|       A|       false|[[-8.639847,41.15...|
|1372636951620000320|        C|       NULL|        NULL|20000320|1372636951|       A|       false|[[-8.612964,41.14...|
|1372636854620000520|        C|       NULL|        NULL|20000520|1372636854|       A|       false|[[-8.574678,41.15...|
|1372637091620000337|        C|       NULL|        NULL|20000337|1372637091|       A|       false|[[-8.645994,41.18...|
|13

Ahora compararemos cuánto tiempo lleva cuando le decimos explícitamente a PySpark cuál es el esquema de datos:

In [0]:
# Importar las clases necesarias de PySpark
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, BooleanType

t1 = time()

schema = StructType(
    [StructField("trip_id", StringType(), True),
    StructField("call_type", StringType(), True),
    StructField("origin_call", IntegerType(), True),
    StructField("origin_stand", IntegerType(), True),
    StructField("taxi_id", LongType(), True),
    StructField("timestamp", LongType(), True),
    StructField("day_type", StringType(), True),
    StructField("missing_data", BooleanType(), True),
    StructField("polyline", StringType(), True)]
)
data_schema = spark.read.csv(data_location, header = 'False', schema = schema)

t2 = time()
print('Completed in %s sec.' % (str(t2 - t1)))

# Mostrar las primeras filas del DataFrame para verificar los datos
data_schema.show()

Completed in 0.3081221580505371 sec.
+-------------------+---------+-----------+------------+--------+----------+--------+------------+--------------------+
|            trip_id|call_type|origin_call|origin_stand| taxi_id| timestamp|day_type|missing_data|            polyline|
+-------------------+---------+-----------+------------+--------+----------+--------+------------+--------------------+
|            TRIP_ID|CALL_TYPE|       NULL|        NULL|    NULL|      NULL|DAY_TYPE|        NULL|            POLYLINE|
|1372636858620000589|        C|       NULL|        NULL|20000589|1372636858|       A|       false|[[-8.618643,41.14...|
|1372637303620000596|        B|       NULL|           7|20000596|1372637303|       A|       false|[[-8.639847,41.15...|
|1372636951620000320|        C|       NULL|        NULL|20000320|1372636951|       A|       false|[[-8.612964,41.14...|
|1372636854620000520|        C|       NULL|        NULL|20000520|1372636854|       A|       false|[[-8.574678,41.15...|
|13

Los datos se leen aproximadamente 10 veces más rápido cuando le damos el esquema a PySpark en lugar de pedirle que lo deduzca. Obviamente, este es un paso más práctico cuando tienes datos con menos variables, pero cuando lees datos realmente grandes, es una manera fácil de ahorrar algo de tiempo de procesamiento.