# Introducción a Spark SQL y DataFrames

Spark SQL es un módulo de Spark que permite el procesamiento estructurado de datos. A través de Spark SQL, puedes ejecutar consultas SQL directamente sobre tus datos en Spark, y también puedes leer datos de diversas fuentes estructuradas como Hive, Avro, Parquet, ORC, JSON y JDBC. 

Los `DataFrames` en Spark representan una tabla estructurada de datos. Es similar a un DataFrame en R o en Python con Pandas, pero con optimizaciones para ejecución distribuida y escalable. Puedes pensar en un DataFrame como una hoja de cálculo distribuida.




In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("IntroSparkSQL").getOrCreate()


23/09/21 21:04:38 WARN Utils: Your hostname, MacBook-Air-de-Ivan.local resolves to a loopback address: 127.0.0.1; using 192.168.0.2 instead (on interface en0)
23/09/21 21:04:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/21 21:04:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Una vez que hayas importado las bibliotecas necesarias, el siguiente paso es crear una `SparkSession`. La `SparkSession` es una interfaz para trabajar con datos estructurados en Spark. Puedes pensar en esto como una conexión a Spark. El parámetro `appName` simplemente da un nombre a tu aplicación para que puedas identificarla en la UI de Spark.


## Operaciones Básicas con DataFrames

Una vez que tengas un DataFrame, hay muchas operaciones que puedes realizar con él. Estas operaciones se pueden categorizar en:

1. **Transformaciones:** Estas operaciones crean un nuevo DataFrame a partir de uno existente, como seleccionar ciertas columnas o filtrar filas basadas en una condición.
2. **Acciones:** Estas operaciones devuelven un valor al controlador o escriben datos a un almacenamiento externo. Ejemplos incluyen contar el número de filas o escribir el DataFrame a un archivo.

A continuación, veremos algunas operaciones básicas que te ayudarán a familiarizarte con los DataFrames en Spark.


In [2]:
# Crear un DataFrame de ejemplo
data = [("James", "Smith", "USA", 34),
        ("Michael", "Rose", "USA", 40),
        ("Robert", "Williams", "Canada", 37),
        ("Maria", "Jones", "Mexico", 27)
       ]

columns = ["first_name", "last_name", "country", "age"]

df = spark.createDataFrame(data, columns)
df.show()


                                                                                

+----------+---------+-------+---+
|first_name|last_name|country|age|
+----------+---------+-------+---+
|     James|    Smith|    USA| 34|
|   Michael|     Rose|    USA| 40|
|    Robert| Williams| Canada| 37|
|     Maria|    Jones| Mexico| 27|
+----------+---------+-------+---+



En el código anterior, creamos un DataFrame llamado `df` usando datos de ejemplo. Las columnas son "first_name", "last_name", "country" y "age". La función `show()` nos permite visualizar el contenido del DataFrame. 

Es importante notar que Spark opera en modo "lazy", lo que significa que las transformaciones no se ejecutan hasta que se llama a una acción. En este caso, la acción es `show()`, que desencadena la ejecución y muestra el resultado.


## Selección de Columnas

Una de las operaciones más comunes que podrías querer realizar con un DataFrame es seleccionar una o más columnas. Esto se puede hacer utilizando la función `select`.


In [3]:
# Seleccionar columnas first_name y country
selected_df = df.select("first_name", "country")
selected_df.show()


+----------+-------+
|first_name|country|
+----------+-------+
|     James|    USA|
|   Michael|    USA|
|    Robert| Canada|
|     Maria| Mexico|
+----------+-------+



En el código anterior, seleccionamos solo las columnas "first_name" y "country" del DataFrame original `df`. El resultado es un nuevo DataFrame, `selected_df`, que contiene solo estas dos columnas. 


## Filtrado de Filas

Otra operación común es filtrar filas basadas en ciertas condiciones. Esto se puede hacer utilizando la función `filter`.


In [4]:
# Filtrar las filas donde el país es "USA"
usa_df = df.filter(df.country == "USA")
usa_df.show()


+----------+---------+-------+---+
|first_name|last_name|country|age|
+----------+---------+-------+---+
|     James|    Smith|    USA| 34|
|   Michael|     Rose|    USA| 40|
+----------+---------+-------+---+



En el código anterior, filtramos las filas donde la columna "country" es igual a "USA". El resultado, `usa_df`, es un nuevo DataFrame que contiene solo las filas que cumplen con esta condición.


## Agregación

La agregación se refiere a cualquier operación que tome múltiples filas y las produzca como una sola fila de salida. Spark SQL proporciona funciones integradas para realizar agregaciones, como `count`, `sum`, `max`, `min`, entre otras.


In [5]:
# Contar el número de personas por país
count_by_country = df.groupBy("country").count()
count_by_country.show()




+-------+-----+
|country|count|
+-------+-----+
|    USA|    2|
| Canada|    1|
| Mexico|    1|
+-------+-----+



                                                                                

En el código anterior, agrupamos el DataFrame por la columna "country" y luego contamos el número de registros para cada país. El resultado, `count_by_country`, muestra el número de personas por país.


## Join (Unión)

El join es una operación que combina filas de dos o más tablas basadas en columnas relacionadas. Esto es similar a los "joins" en SQL.


In [6]:
# Suponiendo que tenemos un segundo DataFrame con la información de los códigos de los países
country_codes_df = spark.createDataFrame([("USA", "United States"), ("MEX", "Mexico"), ("CAN", "Canada")], ["code", "full_name"])

# Realizamos un join basado en la columna 'country'
joined_df = df.join(country_codes_df, df.country == country_codes_df.code)
joined_df.show()




+----------+---------+-------+---+----+-------------+
|first_name|last_name|country|age|code|    full_name|
+----------+---------+-------+---+----+-------------+
|     James|    Smith|    USA| 34| USA|United States|
|   Michael|     Rose|    USA| 40| USA|United States|
+----------+---------+-------+---+----+-------------+



                                                                                

En el código anterior, tenemos un DataFrame adicional `country_codes_df` que mapea códigos de país a sus nombres completos. Luego, realizamos una operación de join entre `df` y `country_codes_df` basado en la columna "country". Esto nos da un DataFrame `joined_df` que tiene la información combinada de ambos DataFrames.


## Ordenación de DataFrames

Una tarea común al trabajar con DataFrames es ordenar los datos según uno o más criterios. Spark proporciona el método `orderBy` para ordenar los datos en un DataFrame.


In [7]:
# Ordenar el DataFrame por la columna 'age' en orden descendente
sorted_df = df.orderBy(df.age.desc())
sorted_df.show()


+----------+---------+-------+---+
|first_name|last_name|country|age|
+----------+---------+-------+---+
|   Michael|     Rose|    USA| 40|
|    Robert| Williams| Canada| 37|
|     James|    Smith|    USA| 34|
|     Maria|    Jones| Mexico| 27|
+----------+---------+-------+---+



En el código mostrado, estamos ordenando el DataFrame `df` por la columna 'age' en orden descendente. La función `desc()` se utiliza para indicar el orden descendente. Si quisiéramos un orden ascendente, simplemente omitiríamos `desc()`.


In [10]:
# Ordenar el DataFrame 'df' por la columna 'age' en orden descendente
sorted_df = df.orderBy(F.desc("age"))
sorted_df.show()


+----------+---------+-------+---+
|first_name|last_name|country|age|
+----------+---------+-------+---+
|   Michael|     Rose|    USA| 40|
|    Robert| Williams| Canada| 37|
|     James|    Smith|    USA| 34|
|     Maria|    Jones| Mexico| 27|
+----------+---------+-------+---+



En el ejemplo anterior, hemos ordenado el DataFrame `df` basado en la columna 'age' en orden descendente. Para ello, utilizamos la función `desc` del módulo `functions` para especificar el orden descendente. El resultado muestra el DataFrame con los registros ordenados por edad, desde el más viejo al más joven.


## Grouping y Aggregations Avanzadas

Más allá de contar registros, Spark permite realizar múltiples agregaciones después de agrupar. Esto es útil para obtener estadísticas descriptivas por grupo.


In [9]:
from pyspark.sql import functions as F

# Agrupar por 'country' y obtener el promedio y el máximo de edad
grouped_data = df.groupBy("country").agg(F.avg("age").alias("average_age"), F.max("age").alias("max_age"))
grouped_data.show()



+-------+-----------+-------+
|country|average_age|max_age|
+-------+-----------+-------+
|    USA|       37.0|     40|
| Canada|       37.0|     37|
| Mexico|       27.0|     27|
+-------+-----------+-------+



En el código anterior, después de agrupar el DataFrame `df` por la columna 'country', aplicamos dos funciones de agregación: el promedio y el máximo de la columna 'age'. Usamos `alias` para dar nombres más descriptivos a las columnas resultantes. El resultado es un DataFrame con el promedio y el máximo de edad por país.


## Creación de DataFrames en Spark

En Spark, un DataFrame es una estructura de datos distribuida organizada en columnas nombradas. Es equivalente a una tabla en una base de datos o un dataframe en R o Python, pero con optimizaciones para el procesamiento en memoria y escalabilidad para big data. Se pueden crear DataFrames en Spark de diversas formas:

1. A partir de RDDs.
2. Desde estructuras de datos en Python, como listas y diccionarios.
3. Leyendo desde archivos (CSV, Parquet, etc.).


In [3]:
from pyspark.sql import Row

# Crear un DataFrame a partir de una lista
list_data = [("Alice", 34), ("Bob", 45), ("Catherine", 29)]
rdd = spark.sparkContext.parallelize(list_data)
people = rdd.map(lambda x: Row(name=x[0], age=x[1]))
df_from_list = spark.createDataFrame(people)
df_from_list.show()


                                                                                

+---------+---+
|     name|age|
+---------+---+
|    Alice| 34|
|      Bob| 45|
|Catherine| 29|
+---------+---+



En el ejemplo anterior, comenzamos con una lista simple de tuples en Python. Convertimos esta lista en un RDD usando `sparkContext.parallelize`. Luego, transformamos este RDD en un conjunto de filas (usando `Row`) y finalmente creamos un DataFrame a partir de este RDD transformado. El resultado es un DataFrame con las columnas `name` y `age`.


## Creación de DataFrame a partir de diccionarios

Podemos también crear DataFrames directamente desde diccionarios. Esta es una forma conveniente cuando ya tenemos datos estructurados en forma de diccionarios en Python.


In [6]:
# Crear un DataFrame a partir de una lista de diccionarios
data_list_of_dicts = [{"name": "Alice", "age": 34}, {"name": "Bob", "age": 45}, {"name": "Catherine", "age": 29}]
df_from_list_of_dicts = spark.createDataFrame(data_list_of_dicts)
df_from_list_of_dicts.show()



+---+---------+
|age|     name|
+---+---------+
| 34|    Alice|
| 45|      Bob|
| 29|Catherine|
+---+---------+



En el código anterior, tenemos una lista que contiene tres diccionarios. Cada diccionario representa una fila en nuestro DataFrame deseado. Utilizamos `spark.createDataFrame()` para convertir esta lista de diccionarios en un DataFrame. El resultado es un DataFrame con las mismas columnas y datos que la lista original de diccionarios.


## Creación de DataFrame desde archivos

Una de las formas más comunes de crear DataFrames en Spark es leer desde archivos. Spark soporta varios formatos de archivos como CSV, Parquet, Avro, entre otros. Aquí veremos cómo crear un DataFrame desde un archivo CSV.


In [8]:
# Suponiendo que tenemos un archivo 'data.csv' con las columnas 'name' y 'age'
# df_from_csv = spark.read.csv('path_to_data.csv', header=True, inferSchema=True)
# df_from_csv.show()


En el código anterior (comentado para evitar errores al no tener un archivo real), mostramos cómo leer un archivo CSV y convertirlo en un DataFrame. Utilizamos `spark.read.csv` y especificamos algunas opciones como `header=True` para indicar que la primera fila del CSV contiene los nombres de las columnas y `inferSchema=True` para que Spark infiera automáticamente el tipo de datos de cada columna. El resultado sería un DataFrame con los datos del archivo CSV.


## SQL puro vs operaciones de DataFrame

Spark SQL proporciona dos principales formas de interactuar con los datos:

1. Usando SQL puro.
2. Usando operaciones de DataFrame.

Ambas formas tienen sus propias ventajas y depende del desarrollador decidir cuál usar según la situación. Veamos ejemplos de ambos para entender mejor.


In [11]:
# Crear una vista temporal para ejecutar consultas SQL
df_from_list_of_dicts.createOrReplaceTempView("people")


Antes de poder ejecutar consultas SQL puro en un DataFrame, necesitamos crear una "vista temporal" de ese DataFrame. En el código anterior, hemos creado una vista temporal llamada "people" a partir de nuestro DataFrame `df_from_list_of_dicts`.


In [12]:
# Ejecutar una consulta SQL puro
result_sql = spark.sql("SELECT name, age FROM people WHERE age > 30")
result_sql.show()


+-----+---+
| name|age|
+-----+---+
|Alice| 34|
|  Bob| 45|
+-----+---+



Usando la vista temporal "people", hemos ejecutado una consulta SQL puro para seleccionar nombres y edades de personas mayores de 30 años. El resultado es otro DataFrame.


In [13]:
# Realizar la misma consulta usando operaciones de DataFrame
result_df = df_from_list_of_dicts.filter(df_from_list_of_dicts.age > 30).select("name", "age")
result_df.show()


+-----+---+
| name|age|
+-----+---+
|Alice| 34|
|  Bob| 45|
+-----+---+



Aquí, hemos logrado el mismo resultado que la consulta SQL puro, pero usando operaciones de DataFrame. Usamos el método `filter` para filtrar las filas y el método `select` para seleccionar las columnas deseadas. Ambos enfoques, ya sea SQL puro o operaciones de DataFrame, ofrecen flexibilidad y potencia en la manipulación y análisis de datos. La elección entre uno y otro a menudo se reduce a la familiaridad y preferencia personal del desarrollador.


## Funciones definidas por el usuario (UDFs)

En Spark, a veces puede que necesites realizar operaciones que no están directamente soportadas por las funciones incorporadas. En tales casos, puedes definir tus propias funciones, conocidas como UDFs (User Defined Functions). Estas UDFs se pueden usar en tus DataFrames para realizar transformaciones personalizadas en tus datos. Vamos a ver cómo crear y usar una UDF.


In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Definir una UDF para convertir el nombre en mayúsculas
def name_to_upper(name):
    return name.upper()

# Registrar la UDF
name_to_upper_udf = udf(name_to_upper, StringType())

# Aplicar la UDF al DataFrame
df_with_upper_names = df_from_list_of_dicts.withColumn("upper_name", name_to_upper_udf(df_from_list_of_dicts["name"]))
df_with_upper_names.show()


                                                                                

+---+---------+----------+
|age|     name|upper_name|
+---+---------+----------+
| 34|    Alice|     ALICE|
| 45|      Bob|       BOB|
| 29|Catherine| CATHERINE|
+---+---------+----------+



En el código anterior, definimos una función `name_to_upper` que toma un nombre y devuelve su versión en mayúsculas. Luego, registramos esta función como una UDF llamada `name_to_upper_udf`. Después de registrar la UDF, podemos usarla como cualquier otra función en nuestro DataFrame.

En el resultado, verás una nueva columna "upper_name" que contiene los nombres convertidos a mayúsculas. Las UDFs son extremadamente útiles para operaciones personalizadas que no están cubiertas por las funciones predeterminadas de Spark.

Las UDFs proporcionan una gran flexibilidad al trabajar con DataFrames en Spark. Aunque Spark tiene una amplia gama de funciones incorporadas, las UDFs aseguran que siempre puedas realizar las transformaciones que necesitas.

## Ventanas y operaciones de ventana

Las operaciones de ventana en Spark te permiten realizar cálculos en un conjunto definido de filas relacionadas con la fila actual dentro del DataFrame. Es similar a la función de GROUP BY, pero te permite mantener el resultado a nivel de fila en lugar de agruparlo. Estas operaciones son esenciales para ciertos cálculos, como los cálculos acumulativos.

Vamos a ver un ejemplo donde calculamos el salario acumulado por país, ordenado por edad.


In [17]:
from pyspark.sql import Row

# Crear un DataFrame de ejemplo
data = [
    Row(country="USA", age=25, salary=50000),
    Row(country="USA", age=30, salary=55000),
    Row(country="USA", age=35, salary=60000),
    Row(country="Mexico", age=25, salary=40000),
    Row(country="Mexico", age=30, salary=45000),
    Row(country="Mexico", age=35, salary=50000),
]

df = spark.createDataFrame(data)
df.show()


+-------+---+------+
|country|age|salary|
+-------+---+------+
|    USA| 25| 50000|
|    USA| 30| 55000|
|    USA| 35| 60000|
| Mexico| 25| 40000|
| Mexico| 30| 45000|
| Mexico| 35| 50000|
+-------+---+------+



In [19]:
from pyspark.sql import Window
from pyspark.sql.functions import sum as _sum

# Definir una ventana
window_spec = Window.partitionBy("country").orderBy("age").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Calcular el salario acumulado
df_with_cumulative_salary = df.withColumn("cumulative_salary", _sum("salary").over(window_spec))
df_with_cumulative_salary.show()


+-------+---+------+-----------------+
|country|age|salary|cumulative_salary|
+-------+---+------+-----------------+
| Mexico| 25| 40000|            40000|
| Mexico| 30| 45000|            85000|
| Mexico| 35| 50000|           135000|
|    USA| 25| 50000|            50000|
|    USA| 30| 55000|           105000|
|    USA| 35| 60000|           165000|
+-------+---+------+-----------------+



En el código anterior, primero definimos una ventana con `Window.partitionBy("country").orderBy("age")`. Esto significa que queremos realizar cálculos dentro de cada país, ordenando las filas por edad. 

La parte `rowsBetween(Window.unboundedPreceding, Window.currentRow)` indica que para una fila dada, consideramos todas las filas anteriores (desde el principio de la ventana) hasta la fila actual para el cálculo.

Luego, usamos la función `sum` con `over(window_spec)` para calcular el salario acumulado dentro de esa ventana.

El resultado muestra el salario acumulado por país, con las filas ordenadas por edad dentro de cada país.

Las operaciones de ventana son cruciales para realizar cálculos que requieren un contexto más amplio que la fila actual, especialmente en análisis financieros y de series temporales.


## Pivoteo de DataFrames

El pivoteo es una técnica que permite transformar datos de un formato largo a un formato ancho. En otras palabras, es la conversión de datos que están en formato vertical (una columna) a un formato horizontal (muchas columnas). 

Spark permite pivotar fácilmente DataFrames usando el método `groupBy` junto con `pivot`.


In [20]:
# Supongamos que tenemos un DataFrame con ventas por producto y país
data = [
    ("MX", "ProductoA", 100),
    ("MX", "ProductoB", 150),
    ("US", "ProductoA", 200),
    ("US", "ProductoB", 250),
    ("CA", "ProductoA", 50),
    ("CA", "ProductoB", 300)
]
df_sales = spark.createDataFrame(data, ["country", "product", "sales"])

# Pivoteo del DataFrame para obtener ventas por producto en columnas separadas para cada país
df_pivoted = df_sales.groupBy("country").pivot("product").sum("sales")
df_pivoted.show()


+-------+---------+---------+
|country|ProductoA|ProductoB|
+-------+---------+---------+
|     MX|      100|      150|
|     CA|       50|      300|
|     US|      200|      250|
+-------+---------+---------+



En el código anterior, creamos un DataFrame `df_sales` que contiene ventas por producto y país. A continuación, queremos pivotar este DataFrame para que cada producto tenga su propia columna y las filas representen a los países. 

El resultado es un DataFrame donde cada país tiene las ventas de "ProductoA" y "ProductoB" en columnas separadas.

El pivoteo es especialmente útil cuando se tienen datos categóricos que se quieren convertir en múltiples columnas. Por ejemplo, si se tiene un registro de ventas por día y producto, y se quiere obtener un DataFrame donde cada columna represente un producto y las filas representen días, el pivoteo sería la técnica adecuada para realizar esta transformación.


## Manejo de fechas y timestamps

Spark proporciona una serie de funciones para trabajar con fechas y timestamps. Estas funciones facilitan tareas como extraer componentes de una fecha (por ejemplo, el día, mes o año), calcular la diferencia entre fechas, agregar o restar días a una fecha, entre otras operaciones.

El manejo de fechas y timestamps es crucial en muchos flujos de trabajo de procesamiento de datos, especialmente cuando se trata de series temporales, registros de logs, entre otros.

Para ilustrar el uso de estas funciones, vamos a trabajar con un DataFrame que contiene timestamps.


In [21]:
from pyspark.sql.functions import current_date, current_timestamp

# Creación de un DataFrame con fechas actuales y timestamps
df_dates = spark.range(10).withColumn("current_date", current_date()).withColumn("current_timestamp", current_timestamp())
df_dates.show(truncate=False)


+---+------------+-----------------------+
|id |current_date|current_timestamp      |
+---+------------+-----------------------+
|0  |2023-09-21  |2023-09-21 21:47:19.156|
|1  |2023-09-21  |2023-09-21 21:47:19.156|
|2  |2023-09-21  |2023-09-21 21:47:19.156|
|3  |2023-09-21  |2023-09-21 21:47:19.156|
|4  |2023-09-21  |2023-09-21 21:47:19.156|
|5  |2023-09-21  |2023-09-21 21:47:19.156|
|6  |2023-09-21  |2023-09-21 21:47:19.156|
|7  |2023-09-21  |2023-09-21 21:47:19.156|
|8  |2023-09-21  |2023-09-21 21:47:19.156|
|9  |2023-09-21  |2023-09-21 21:47:19.156|
+---+------------+-----------------------+



En el código anterior, creamos un DataFrame `df_dates` que contiene una columna con la fecha actual y otra con el timestamp actual para cada fila. Hemos utilizado las funciones `current_date()` y `current_timestamp()` para obtener estos valores.


In [22]:
from pyspark.sql.functions import year, month, dayofmonth, date_add

# Extracción de componentes y suma de días a la fecha
df_transformed = df_dates.select(
    "current_date",
    year("current_date").alias("year"),
    month("current_date").alias("month"),
    dayofmonth("current_date").alias("day"),
    date_add("current_date", 5).alias("date_plus_5_days")
)
df_transformed.show()


+------------+----+-----+---+----------------+
|current_date|year|month|day|date_plus_5_days|
+------------+----+-----+---+----------------+
|  2023-09-21|2023|    9| 21|      2023-09-26|
|  2023-09-21|2023|    9| 21|      2023-09-26|
|  2023-09-21|2023|    9| 21|      2023-09-26|
|  2023-09-21|2023|    9| 21|      2023-09-26|
|  2023-09-21|2023|    9| 21|      2023-09-26|
|  2023-09-21|2023|    9| 21|      2023-09-26|
|  2023-09-21|2023|    9| 21|      2023-09-26|
|  2023-09-21|2023|    9| 21|      2023-09-26|
|  2023-09-21|2023|    9| 21|      2023-09-26|
|  2023-09-21|2023|    9| 21|      2023-09-26|
+------------+----+-----+---+----------------+



En este ejemplo, hemos extraído el año, mes y día de la columna `current_date` usando las funciones `year()`, `month()` y `dayofmonth()`, respectivamente. También hemos añadido 5 días a la fecha original usando la función `date_add()`. 

Estas son solo algunas de las muchas operaciones que puedes realizar con fechas y timestamps en Spark. Las funciones relacionadas con fechas son esenciales para la manipulación y análisis de datos temporales.

## Manipulación de strings

Spark SQL proporciona una amplia variedad de funciones para manipular strings en DataFrames. Estas funciones te permiten realizar tareas como cambiar a mayúsculas o minúsculas, substring, reemplazar caracteres, encontrar la longitud de un string, y muchas otras operaciones útiles.

Vamos a explorar algunas de estas funciones con ejemplos prácticos.


In [23]:
from pyspark.sql.functions import lit

# Creación de un DataFrame con una columna de strings
data = [("John Doe",), ("Jane Smith",), ("Robert Johnson",), ("Lucy Brown",)]
df_strings = spark.createDataFrame(data, ["full_name"])
df_strings.show()


+--------------+
|     full_name|
+--------------+
|      John Doe|
|    Jane Smith|
|Robert Johnson|
|    Lucy Brown|
+--------------+



Para ilustrar la manipulación de strings, hemos creado un DataFrame `df_strings` que contiene una columna `full_name` con nombres completos de personas.


In [24]:
from pyspark.sql.functions import upper, lower, length, substring

# Transformación y manipulación de strings
df_transformed_strings = df_strings.select(
    "full_name",
    upper("full_name").alias("upper_name"),
    lower("full_name").alias("lower_name"),
    length("full_name").alias("name_length"),
    substring("full_name", 1, 4).alias("first_4_chars")
)
df_transformed_strings.show()


+--------------+--------------+--------------+-----------+-------------+
|     full_name|    upper_name|    lower_name|name_length|first_4_chars|
+--------------+--------------+--------------+-----------+-------------+
|      John Doe|      JOHN DOE|      john doe|          8|         John|
|    Jane Smith|    JANE SMITH|    jane smith|         10|         Jane|
|Robert Johnson|ROBERT JOHNSON|robert johnson|         14|         Robe|
|    Lucy Brown|    LUCY BROWN|    lucy brown|         10|         Lucy|
+--------------+--------------+--------------+-----------+-------------+



En el código anterior, hemos aplicado varias transformaciones a la columna `full_name`:

1. `upper_name`: Convertimos el nombre completo a mayúsculas.
2. `lower_name`: Convertimos el nombre completo a minúsculas.
3. `name_length`: Calculamos la longitud del nombre completo.
4. `first_4_chars`: Extraemos los primeros 4 caracteres del nombre completo.

Estas son solo algunas de las muchas funciones de manipulación de strings disponibles en Spark. Con estas funciones, puedes realizar una amplia variedad de operaciones en strings para adecuar, limpiar y transformar tus datos según tus necesidades.


## Operaciones de Set (conjunto)

Las operaciones de conjunto en Spark permiten trabajar con DataFrames como si fueran conjuntos, lo que facilita tareas como uniones, intersecciones y diferencias. Estas operaciones son esenciales cuando se necesita combinar o comparar datos de diferentes fuentes o DataFrames.

Exploraremos algunas de las operaciones de conjunto más comunes.


In [26]:
# Creación de dos DataFrames de ejemplo
data1 = [("A", 1), ("B", 2), ("C", 3)]
data2 = [("B", 2), ("C", 3), ("D", 4)]

df1 = spark.createDataFrame(data1, ["letter", "number"])
df2 = spark.createDataFrame(data2, ["letter", "number"])

df1.show()
df2.show()


+------+------+
|letter|number|
+------+------+
|     A|     1|
|     B|     2|
|     C|     3|
+------+------+

+------+------+
|letter|number|
+------+------+
|     B|     2|
|     C|     3|
|     D|     4|
+------+------+



Hemos creado dos DataFrames, `df1` y `df2`, que contienen letras y números. Estos DataFrames se utilizarán para ilustrar las operaciones de conjunto.


In [27]:
# Unión de los DataFrames
union_df = df1.union(df2)
union_df.show()


+------+------+
|letter|number|
+------+------+
|     A|     1|
|     B|     2|
|     C|     3|
|     B|     2|
|     C|     3|
|     D|     4|
+------+------+



La función `union` combina las filas de ambos DataFrames. Si hay filas duplicadas (como en nuestro caso con "B", 2 y "C", 3), estas aparecerán múltiples veces en el resultado.


In [29]:
# Intersección de los DataFrames
intersect_df = df1.intersect(df2)
intersect_df.show()




+------+------+
|letter|number|
+------+------+
|     C|     3|
|     B|     2|
+------+------+



                                                                                

La función `intersect` devuelve solo las filas que están presentes en ambos DataFrames.


In [30]:
# Diferencia de los DataFrames
diff_df = df1.subtract(df2)
diff_df.show()


                                                                                

+------+------+
|letter|number|
+------+------+
|     A|     1|
+------+------+



La función `subtract` devuelve las filas que están en `df1` pero no en `df2`. Es decir, muestra la diferencia entre los dos DataFrames.

## Lectura y Escritura en Diferentes Formatos

Apache Spark es conocido por su capacidad de trabajar con una amplia variedad de fuentes de datos. Puede leer y escribir en formatos populares como CSV, Parquet, JSON, entre otros. Esta flexibilidad permite a Spark integrarse fácilmente en arquitecturas de datos y trabajar con diferentes sistemas de almacenamiento.

A continuación, exploraremos cómo leer y escribir en algunos de estos formatos.


In [32]:
# Lectura de un archivo CSV
#csv_path = "/path/to/your/csv/file.csv"
#csv_df = spark.read.csv(csv_path, header=True, inferSchema=True)
#csv_df.show()


Para leer archivos CSV, utilizamos la función `read.csv` de Spark. La opción `header=True` indica que la primera fila del archivo contiene los nombres de las columnas. Con `inferSchema=True`, Spark intenta inferir automáticamente el tipo de datos de cada columna.


In [33]:
# Escritura en formato Parquet
#parquet_path = "/path/to/save/parquet/file.parquet"
#csv_df.write.parquet(parquet_path)


Parquet es un formato de archivo columnar que es altamente eficiente para operaciones analíticas. Spark puede escribir DataFrames directamente en formato Parquet usando la función `write.parquet`. Este formato es especialmente útil cuando trabajamos con grandes conjuntos de datos, ya que ofrece una compresión eficiente y mejora el rendimiento de las consultas.


In [34]:
# Lectura de un archivo JSON
#json_path = "/path/to/your/json/file.json"
#json_df = spark.read.json(json_path)
#json_df.show()


El formato JSON es ampliamente utilizado para la representación de datos estructurados. Spark proporciona la función `read.json` para leer archivos en este formato. A diferencia de otros formatos, JSON no requiere que se defina un esquema por adelantado; Spark puede inferir el esquema directamente del archivo.


## Tratamiento de Valores Nulos

En la vida real, los conjuntos de datos suelen tener valores faltantes o nulos debido a diversas razones. Estos valores nulos pueden afectar el resultado de cualquier operación o análisis. Por lo tanto, es crucial tratar adecuadamente estos valores antes de realizar cualquier procesamiento o análisis.

Spark ofrece varias herramientas para identificar, eliminar o reemplazar estos valores nulos. Vamos a explorar algunas de estas herramientas.


In [37]:
from pyspark.sql.functions import isnull, col

# Supongamos que tenemos el siguiente DataFrame
data_with_nulls = [
    ("Alice", None, 25),
    ("Bob", "Engineering", None),
    ("Charlie", "Finance", 30),
    ("David", None, 28)
]

df_with_nulls = spark.createDataFrame(data_with_nulls, ["name", "department", "age"])

# Identificar valores nulos
null_counts = df_with_nulls.select([isnull(col(c)).alias(c) for c in df_with_nulls.columns]).groupby().sum().collect()[0]
null_counts


Row()

En el ejemplo anterior, creamos un DataFrame `df_with_nulls` con algunos valores nulos. Utilizamos la función `isnull` junto con un `select` para identificar qué valores en el DataFrame son nulos. Luego, con la función `groupby().sum()`, sumamos el total de valores nulos por columna.


In [38]:
# Eliminar filas con valores nulos
df_no_nulls = df_with_nulls.na.drop()
df_no_nulls.show()


+-------+----------+---+
|   name|department|age|
+-------+----------+---+
|Charlie|   Finance| 30|
+-------+----------+---+



Para eliminar las filas que contienen valores nulos, podemos usar el método `na.drop()` en un DataFrame. Sin embargo, esta es una operación drástica, ya que podría resultar en la pérdida de datos valiosos. Es recomendable usarla con precaución y asegurarse de que es la mejor opción para el contexto específico.


In [39]:
# Reemplazar valores nulos con valores por defecto
values_to_replace = {"department": "Unknown", "age": -1}
df_replaced = df_with_nulls.na.fill(values_to_replace)
df_replaced.show()


+-------+-----------+---+
|   name| department|age|
+-------+-----------+---+
|  Alice|    Unknown| 25|
|    Bob|Engineering| -1|
|Charlie|    Finance| 30|
|  David|    Unknown| 28|
+-------+-----------+---+



Otra opción es reemplazar los valores nulos con valores predeterminados. En el ejemplo anterior, reemplazamos los valores nulos en la columna `department` con la cadena "Unknown" y los valores nulos en la columna `age` con -1. La función `na.fill()` es útil para esta tarea y toma un diccionario que especifica los valores de reemplazo por columna.

El manejo adecuado de valores nulos es crucial para garantizar la calidad y precisión de los resultados. Es importante decidir cuidadosamente cómo se tratarán estos valores, teniendo en cuenta el contexto y las implicaciones de cada elección.

## Reparticionamiento y Coalescencia

Los DataFrames en Spark están divididos en particiones que representan una fracción del conjunto total de datos. El número de particiones determina cómo se distribuyen los datos en los nodos del clúster y afecta directamente el rendimiento de las operaciones. Es posible que, en ciertos escenarios, necesitemos ajustar el número de particiones para optimizar el rendimiento.

Spark proporciona dos operaciones principales para este propósito: `repartition` y `coalesce`.


In [40]:
# Creando un DataFrame
data = [(i, f"val_{i}") for i in range(100)]
df = spark.createDataFrame(data, ["id", "value"])

# Observando el número de particiones
num_partitions = df.rdd.getNumPartitions()
num_partitions


8

En el ejemplo anterior, hemos creado un DataFrame `df` y hemos verificado el número de particiones usando `getNumPartitions()`. El número de particiones por defecto puede variar según la configuración de Spark y el tamaño del conjunto de datos.


In [42]:
# Cambiando el número de particiones
df_repartitioned = df.repartition(5)

# Verificando el nuevo número de particiones
new_partitions = df_repartitioned.rdd.getNumPartitions()
new_partitions


5

La función `repartition` permite cambiar el número de particiones. En este caso, hemos reparticionado el DataFrame `df` para que tenga 5 particiones. Es importante notar que la operación de reparticionamiento puede ser costosa en términos de tiempo y recursos, ya que implica el movimiento de datos entre las particiones.


In [43]:
# Reduciendo el número de particiones
df_coalesced = df_repartitioned.coalesce(2)

# Verificando el nuevo número de particiones
coalesced_partitions = df_coalesced.rdd.getNumPartitions()
coalesced_partitions


2

La operación `coalesce` es similar a `repartition`, pero está diseñada específicamente para reducir el número de particiones. La ventaja de `coalesce` sobre `repartition` es que no implica el movimiento completo de datos entre las particiones y, por lo tanto, es más eficiente cuando se necesita reducir el número de particiones.

Es vital comprender el impacto del número de particiones en el rendimiento y saber cuándo y cómo ajustar este número. Las operaciones de reparticionamiento y coalescencia son herramientas valiosas en el arsenal de un ingeniero de datos para optimizar las operaciones en Spark.


## Estadísticas Descriptivas

Spark ofrece una serie de funciones integradas que facilitan la obtención de estadísticas descriptivas de un DataFrame. Estas estadísticas son esenciales para comprender las tendencias, variaciones y distribución de los datos.


In [44]:
from pyspark.sql import functions as F

# Creando un DataFrame de ejemplo
data = [("A", 1), ("B", 2), ("A", 3), ("B", 4), ("A", 5)]
df_stats = spark.createDataFrame(data, ["category", "value"])

df_stats.show()


+--------+-----+
|category|value|
+--------+-----+
|       A|    1|
|       B|    2|
|       A|    3|
|       B|    4|
|       A|    5|
+--------+-----+



In [45]:
# Calculando estadísticas descriptivas básicas
df_stats.describe().show()


[Stage 92:>                                                         (0 + 8) / 8]

+-------+--------+------------------+
|summary|category|             value|
+-------+--------+------------------+
|  count|       5|                 5|
|   mean|    null|               3.0|
| stddev|    null|1.5811388300841898|
|    min|       A|                 1|
|    max|       B|                 5|
+-------+--------+------------------+



                                                                                

La función `describe` proporciona estadísticas descriptivas básicas como el recuento, la media, la desviación estándar, el mínimo y el máximo para todas las columnas numéricas del DataFrame. Es una herramienta útil para obtener un resumen rápido de los datos.


In [47]:
# Calculando estadísticas agrupadas por 'category'
df_stats.groupBy("category").agg(
    F.mean("value").alias("mean"),
    F.stddev("value").alias("stddev"),
    F.min("value").alias("min"),
    F.max("value").alias("max")
).show()


+--------+----+------------------+---+---+
|category|mean|            stddev|min|max|
+--------+----+------------------+---+---+
|       A| 3.0|               2.0|  1|  5|
|       B| 3.0|1.4142135623730951|  2|  4|
+--------+----+------------------+---+---+



Podemos combinar las operaciones de agrupación (`groupBy`) con funciones de agregación para calcular estadísticas descriptivas para diferentes categorías o grupos en el DataFrame. En el ejemplo anterior, calculamos la media, desviación estándar, mínimo y máximo para cada `category` en el DataFrame.


## Correlación y Covarianza

La correlación y la covarianza son dos medidas que indican la relación entre dos variables:

1. **Correlación:** Mide el grado en el que dos variables cambian juntas. Si una variable tiende a aumentar cuando la otra aumenta, hay una correlación positiva. Si una variable tiende a disminuir cuando la otra aumenta, hay una correlación negativa. 
2. **Covarianza:** Es similar a la correlación, pero no está normalizada. Así, mientras que la correlación está limitada entre -1 y 1, la covarianza puede ser cualquier número.


In [48]:
# Calculando la correlación entre las columnas
correlation = df_stats.corr("value", "value")
print(f"Correlación: {correlation}")

# Calculando la covarianza entre las columnas
covariance = df_stats.cov("value", "value")
print(f"Covarianza: {covariance}")


Correlación: 1.0
Covarianza: 2.5


## Frecuencia

La frecuencia de un valor es el número de veces que aparece en el conjunto de datos. Podemos utilizar la operación `groupBy` junto con `count` para calcular las frecuencias de los diferentes valores en una columna.


In [49]:
# Calculando la frecuencia de cada 'category'
frequency = df_stats.groupBy("category").count()
frequency.show()


+--------+-----+
|category|count|
+--------+-----+
|       A|    3|
|       B|    2|
+--------+-----+



La tabla anterior muestra la cantidad de veces que cada categoría aparece en el DataFrame. Esta información puede ser útil para comprender la distribución de los datos en diferentes categorías.


## Sampleo y Partición de DataFrames

El sampleo se refiere al proceso de seleccionar una muestra aleatoria de nuestros datos. Esto es útil para trabajar con un subconjunto representativo de nuestros datos, especialmente cuando el DataFrame es muy grande.

La partición, por otro lado, implica dividir el DataFrame en varios subconjuntos. Un uso común es en machine learning, donde se divide el conjunto de datos en un conjunto de entrenamiento y un conjunto de prueba.

El sampleo y la partición son herramientas poderosas en el arsenal del ingeniero de datos y del científico de datos. Permiten trabajar con conjuntos de datos manejables y preparar datos para diferentes etapas del análisis y modelado.


In [50]:
# Tomar un sampleo del 20% de los datos sin reemplazo
sampled_df = df.sample(False, 0.2)
sampled_df.show()


+---+------+
| id| value|
+---+------+
|  3| val_3|
| 21|val_21|
| 24|val_24|
| 27|val_27|
| 28|val_28|
| 34|val_34|
| 41|val_41|
| 44|val_44|
| 46|val_46|
| 50|val_50|
| 53|val_53|
| 64|val_64|
| 74|val_74|
| 80|val_80|
| 84|val_84|
| 86|val_86|
| 91|val_91|
| 94|val_94|
| 97|val_97|
| 98|val_98|
+---+------+



En el código anterior, hemos tomado un sampleo del 20% de nuestro DataFrame original (`df`). El primer argumento (`False`) indica que no queremos reemplazo, lo que significa que una vez que se selecciona una fila, no se puede seleccionar de nuevo.


In [51]:
# Dividir el DataFrame en un conjunto de entrenamiento (80%) y un conjunto de prueba (20%)
train_df, test_df = df.randomSplit([0.8, 0.2])


El método `randomSplit` permite dividir el DataFrame en múltiples subconjuntos según las proporciones dadas. En el ejemplo anterior, dividimos el DataFrame en un conjunto de entrenamiento del 80% y un conjunto de prueba del 20%.


## Caching y Persistencia

El caching y la persistencia son técnicas que permiten almacenar DataFrames (o RDDs) en la memoria o en el disco para que las operaciones repetitivas sobre esos DataFrames sean más rápidas. Esto es especialmente útil cuando se tienen operaciones de transformación costosas que se quieren evitar repetir.

La diferencia principal entre caching y persistencia radica en la configuración. Mientras que el caching almacena el DataFrame en la memoria por defecto, la persistencia permite elegir dónde se quiere almacenar el DataFrame (memoria, disco, o ambos).

El caching y la persistencia son esenciales cuando se trabaja con operaciones iterativas o se desea optimizar el rendimiento de operaciones repetitivas sobre un DataFrame. Es importante considerar el equilibrio entre el uso de memoria y el rendimiento para decidir cuál técnica utilizar.


In [52]:
# Cache the DataFrame
df.cache()

# Realizar operaciones en el DataFrame cacheado
df.count()
df.show()


                                                                                

+---+------+
| id| value|
+---+------+
|  0| val_0|
|  1| val_1|
|  2| val_2|
|  3| val_3|
|  4| val_4|
|  5| val_5|
|  6| val_6|
|  7| val_7|
|  8| val_8|
|  9| val_9|
| 10|val_10|
| 11|val_11|
| 12|val_12|
| 13|val_13|
| 14|val_14|
| 15|val_15|
| 16|val_16|
| 17|val_17|
| 18|val_18|
| 19|val_19|
+---+------+
only showing top 20 rows



En el código anterior, hemos cacheado el DataFrame `df` en la memoria. Una vez que un DataFrame está cacheado, cualquier operación sobre ese DataFrame será más rápida después de la primera vez, ya que los datos ya están en la memoria y Spark no necesita recomputar nada desde el origen.


In [53]:
from pyspark.storagelevel import StorageLevel

# Persistir el DataFrame en memoria y disco
df.persist(StorageLevel.MEMORY_AND_DISK)

# Realizar operaciones en el DataFrame persistido
df.count()
df.show()

# Eliminar la persistencia
df.unpersist()


23/09/21 22:54:51 WARN CacheManager: Asked to cache already cached data.


+---+------+
| id| value|
+---+------+
|  0| val_0|
|  1| val_1|
|  2| val_2|
|  3| val_3|
|  4| val_4|
|  5| val_5|
|  6| val_6|
|  7| val_7|
|  8| val_8|
|  9| val_9|
| 10|val_10|
| 11|val_11|
| 12|val_12|
| 13|val_13|
| 14|val_14|
| 15|val_15|
| 16|val_16|
| 17|val_17|
| 18|val_18|
| 19|val_19|
+---+------+
only showing top 20 rows



DataFrame[id: bigint, value: string]

La persistencia permite más flexibilidad que el simple caching. En el ejemplo anterior, hemos persistido el DataFrame `df` tanto en la memoria como en el disco. Si la memoria se llena, Spark comenzará a escribir los datos en el disco. Una vez persistido, al igual que con el caching, las operaciones subsiguientes sobre ese DataFrame se beneficiarán de tiempos de acceso más rápidos.

Es importante recordar liberar recursos después de usarlos, por lo que es una buena práctica utilizar `unpersist()` para eliminar un DataFrame de la caché o persistencia cuando ya no es necesario.


In [None]:
spark.stop()