In [58]:
import pandas as pd 
import pyarrow

# Apache Spark
## SparkSession
> Las aplicaciones de PySpark comienzan inicializando `SparkSession`, que es el punto de entrada de PySpark, como se muestra a continuación. En el caso de ejecutarlo en la consola de PySpark mediante el ejecutable `pyspark`, la consola crea automáticamente la sesión en la variable `spark` para los usuarios.

In [59]:
# Importa la clase SparkSession del módulo pyspark.sql
from pyspark.sql import SparkSession
# Crea o recupera una instancia de SparkSession con el nombre de la aplicación "example"
# SparkSession es el punto de entrada principal para trabajar con datos en PySpark
spark = SparkSession.builder.appName("example").getOrCreate()

## Creación del DataFrame
> Para empezar, puedes crear un DataFrame en PySpark a partir de una lista de filas. Si no tiene un esquema específico, lo infiere automáticamente.

In [60]:
# Importa las clases datetime y date del módulo datetime y la clase Row del módulo pyspark.sql
from datetime import datetime, date
from pyspark.sql import Row
# Crea un DataFrame en PySpark a partir de una lista de filas utilizando la instancia de SparkSession llamada 'spark'
df = spark.createDataFrame([
    # Cada elemento de la lista es una instancia de la clase Row que representa una fila en el DataFrame
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=3, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# El DataFrame 'df' se crea con éxito y ahora puedes realizar operaciones en él
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

> Crea un DataFrame en PySpark con un esquema explícito.

In [61]:
# Crea un DataFrame en PySpark con un esquema explícito utilizando la instancia de SparkSession llamada 'spark'
df1 = spark.createDataFrame([
    # Cada elemento de la lista es una tupla que representa una fila en el DataFrame
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
# El DataFrame 'df1' se crea con éxito y ahora puedes realizar operaciones en él
df1

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

>Create a PySpark DataFrame from a pandas DataFrame

In [62]:
# Crea un DataFrame en PySpark con un esquema explícito utilizando la instancia de SparkSession llamada 'spark'
df2 = spark.createDataFrame([
    # Cada elemento de la lista es una tupla que representa una fila en el DataFrame
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
# El DataFrame 'df1' se crea con éxito y ahora puedes realizar operaciones en él
df2

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

## Lectura
>CSV. Requiere que digas explícitamente si trae nombres de columnas con el header y si quieres que infiera el tipo de datos con inferSchema.

In [63]:
dfcsv = spark.read.csv('datos.csv', header=True, inferSchema=True)
dfcsv.show()

+---------+---+----------+------+
|     Name|age|experience|salary|
+---------+---+----------+------+
| Mauricio| 21|         0| 30000|
| Santiago| 35|         3|  4000|
|   Carlos| 54|        21| 15000|
|   Javier| 26|         6| 18000|
|  Valeria| 45|        13| 20100|
|Francisco| 34|        21| 30000|
+---------+---+----------+------+



>Parquet. No requiere información extra, porque ya está almacenada directo en la información.

In [64]:
dfpar = spark.read.parquet('datos.parquet')
dfpar.show()

+---------+---+----------+------+
|     Name|age|experience|salary|
+---------+---+----------+------+
| Mauricio| 21|         0| 30000|
| Santiago| 35|         3|  4000|
|   Carlos| 54|        21| 15000|
|   Javier| 26|         6| 18000|
|  Valeria| 45|        13| 20100|
|Francisco| 34|        21| 30000|
+---------+---+----------+------+



## Formas de ver el DF
> Un DataFrame se puede mostrar utilizando DataFrame.show().

In [65]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



>Las primeras filas de un DataFrame pueden mostrarse con DataFrame.show(n).

In [66]:
df.show(2)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 2 rows



>Las filas también se pueden mostrar verticalmente. Esto es útil cuando las filas son demasiado largas para mostrarse horizontalmente.

In [67]:
df.show(2, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
-RECORD 1------------------
 a   | 2                   
 b   | 3.0                 
 c   | string2             
 d   | 2000-02-01          
 e   | 2000-01-02 12:00:00 
only showing top 2 rows



>Puedes ver el esquema del DataFrame y los nombres de las columnas de la siguiente forma:

In [68]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



>Muestra el resumen del DataFrame.

In [69]:
df.select("a", "b", "c").describe().show()

+-------+---+------------------+-------+
|summary|  a|                 b|      c|
+-------+---+------------------+-------+
|  count|  3|                 3|      3|
|   mean|2.0|3.3333333333333335|   NULL|
| stddev|1.0|1.5275252316519465|   NULL|
|    min|  1|               2.0|string1|
|    max|  3|               5.0|string3|
+-------+---+------------------+-------+



>`DataFrame.collect()` recopila los datos distribuidos del driver como datos locales en Python. Ten en cuenta que esto puede generar un error de falta de memoria cuando el conjunto de datos es demasiado grande para ajustarse en el driver, ya que recopila todos los datos de los ejecutores en el driver.

In [70]:
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=5.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

>Para evitar lanzar una excepción de falta de memoria, utiliza `DataFrame.take()` o `DataFrame.tail()`.

In [71]:
df.take(2) # Devuelve una lista de las primeras 2 filas

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0))]

In [72]:
df.tail(2) # Devuelve una lista de las últimas 2 filas

[Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=5.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

>El DataFrame de PySpark también permite la conversión a un DataFrame de pandas para aprovechar la API de pandas. Ten en cuenta que `toPandas` también recopila todos los datos en el lado del controlador, lo que puede causar fácilmente un error de falta de memoria cuando los datos son demasiado grandes para ajustarse en el lado del controlador.

In [73]:
df.toPandas()

  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):


Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,5.0,string3,2000-03-01,2000-01-03 12:00:00


## Selección, acceso y filtro a los datos
>El DataFrame de PySpark se evalúa de forma lazy y selecciona una columna no desencadena la computación, sino que devuelve una instancia de Columna. 

In [74]:
df.a

Column<'a'>

>Estas Columnas se pueden utilizar para seleccionar las columnas de un DataFrame. Por ejemplo, `DataFrame.select()` toma las instancias de Columna que devuelven otro DataFrame.

In [75]:
df.select(df.c).show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+



>Para seleccionar un subconjunto de filas, utiliza `DataFrame.filter()`.

In [76]:
df.filter(df.a == 1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



>Para esta parte utilizaremos el DF obtenido del csv.

In [77]:
dfcsv.show()

+---------+---+----------+------+
|     Name|age|experience|salary|
+---------+---+----------+------+
| Mauricio| 21|         0| 30000|
| Santiago| 35|         3|  4000|
|   Carlos| 54|        21| 15000|
|   Javier| 26|         6| 18000|
|  Valeria| 45|        13| 20100|
|Francisco| 34|        21| 30000|
+---------+---+----------+------+



>Se puede utilizar esta notación para hacer filtros.

In [78]:
dfcsv.filter('salary>=20000').show() #filtra los registros que cumplan la condición
dfcsv.filter('salary>=20000').select(['Name', 'age']).show() #filtra los registros que cumplan la condición, limitando el número de columnas seleccionadas

+---------+---+----------+------+
|     Name|age|experience|salary|
+---------+---+----------+------+
| Mauricio| 21|         0| 30000|
|  Valeria| 45|        13| 20100|
|Francisco| 34|        21| 30000|
+---------+---+----------+------+

+---------+---+
|     Name|age|
+---------+---+
| Mauricio| 21|
|  Valeria| 45|
|Francisco| 34|
+---------+---+



>También puede utilizarse notación de pandas.

In [79]:
dfcsv.filter(dfcsv['salary']>=20000).show() #otra forma de hacerlo con notación de pandas
dfcsv.filter((dfcsv['salary']<=20000)&(dfcsv['salary']>=10000)).show() #puedes anidar condiciones con & y |
dfcsv.filter(~(dfcsv['salary']>=20000)).show() #negación

+---------+---+----------+------+
|     Name|age|experience|salary|
+---------+---+----------+------+
| Mauricio| 21|         0| 30000|
|  Valeria| 45|        13| 20100|
|Francisco| 34|        21| 30000|
+---------+---+----------+------+

+------+---+----------+------+
|  Name|age|experience|salary|
+------+---+----------+------+
|Carlos| 54|        21| 15000|
|Javier| 26|         6| 18000|
+------+---+----------+------+

+--------+---+----------+------+
|    Name|age|experience|salary|
+--------+---+----------+------+
|Santiago| 35|         3|  4000|
|  Carlos| 54|        21| 15000|
|  Javier| 26|         6| 18000|
+--------+---+----------+------+



## Aplicando una función
> PySpark admite diversas UDFs (funciones definidas por el usuario) y APIs que permiten a los usuarios ejecutar funciones nativas de Python. Consulta también las últimas UDFs de Pandas y las APIs de Funciones de Pandas. Por ejemplo, el siguiente ejemplo permite a los usuarios utilizar directamente las APIs en una Serie de pandas dentro de una función nativa de Python.

In [80]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

In [81]:
@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1
df2.select(pandas_plus_one(df2.a)).show()

+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 4|
+------------------+



>Otro ejemplo es `DataFrame.mapInPandas`, que permite a los usuarios utilizar directamente las APIs en un DataFrame de pandas sin restricciones, como la longitud del resultado.

In [82]:
# Define una función de filtro de Pandas llamada pandas_filter_func que toma un iterador como entrada.
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        # Filtra las filas del DataFrame de Pandas donde la columna 'a' es igual a 1.
        yield pandas_df[pandas_df.a == 1]
# Aplica la función de filtro a cada partición del DataFrame 'df' utilizando mapInPandas,
# y muestra el resultado con el mismo esquema que el DataFrame original.
df.mapInPandas(pandas_filter_func, schema=df.schema).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



## Grouping data
> El DataFrame de PySpark también ofrece una forma de manejar datos agrupados mediante el uso del enfoque común, conocido como split-apply-combine strategy. Agrupa los datos según cierta condición, aplica una función a cada grupo y luego los combina de nuevo en el DataFrame.

In [83]:
spark.stop()
spark2 = SparkSession.builder.appName("example2").getOrCreate()
# Crea un DataFrame en PySpark con datos proporcionados y un esquema definido
dfg = spark2.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
dfg.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



>Agrupar y luego aplicar la función avg() a los grupos resultantes.

In [84]:
dfg.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



>También puedes aplicar una función nativa de Python a cada grupo utilizando la API de pandas.

In [85]:
# Define una función llamada plus_mean que toma un DataFrame de pandas como entrada,
# y devuelve el DataFrame con la columna 'v1' ajustada restando la media de 'v1'.
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())
# Agrupa el DataFrame 'df' por la columna 'color' y aplica la función plus_mean a cada grupo utilizando applyInPandas.
# Muestra el resultado con el mismo esquema que el DataFrame original.
dfg.groupby('color').applyInPandas(plus_mean, schema=dfg.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



>Co-agrupando y aplicando una función.

In [86]:
spark2.stop()
spark3 = SparkSession.builder.appName("example3").getOrCreate()
# Crea dos DataFrames en PySpark con datos proporcionados y esquemas definidos
dfc1 = spark3.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))
dfc2 = spark3.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))
# Define una función llamada merge_ordered que realiza una fusión ordenada de dos DataFrames de pandas.
def merge_ordered(l, r):
    return pd.merge_ordered(l, r)
# Realiza una operación de co-agrupamiento en los DataFrames 'df1' y 'df2' por la columna 'id',
# luego aplica la función merge_ordered a cada grupo utilizando applyInPandas.
# Muestra el resultado con un esquema especificado.
dfc1.groupby('id').cogroup(dfc2.groupby('id')).applyInPandas(
    merge_ordered, schema='time int, id int, v1 double, v2 string').show()

+--------+---+---+----+
|    time| id| v1|  v2|
+--------+---+---+----+
|20000101|  1|1.0|   x|
|20000102|  1|3.0|NULL|
|20000101|  2|2.0|   y|
|20000102|  2|4.0|NULL|
+--------+---+---+----+



## Utilizando SQL
> Los DataFrames y Spark SQL comparten el mismo motor de ejecución, por lo que se pueden utilizar de manera intercambiable sin problemas. Para esto, tienes que crear una vista temporal del DataFrame que es una representación lógica de los datos del DF permitiendo usar SQL con SparkSQL. Por ejemplo, puedes registrar el DataFrame como una tabla y ejecutar una consulta SQL fácilmente de la siguiente manera:

In [87]:
spark3.stop()
spark4 = SparkSession.builder.appName("example4").getOrCreate()
# Crear un DataFrame de ejemplo
data = [("Alice", 28), ("Bob", 22), ("Charlie", 35)]
columns = ["Name", "Age"]
df_example = spark4.createDataFrame(data, columns)
# Registrar el DataFrame como una vista temporal llamada "people"
df_example.createOrReplaceTempView("people")
# Ejecutar una consulta SQL tradicional utilizando Spark SQL
result = spark4.sql("SELECT Name, Age FROM people WHERE Age >= 30")
# Mostrar el resultado en la consola
result.show()

+-------+---+
|   Name|Age|
+-------+---+
|Charlie| 35|
+-------+---+



In [88]:
spark4.stop()
spark5 = SparkSession.builder.appName("example5").getOrCreate()
# Crea un DataFrame en PySpark con datos proporcionados y un esquema definido
df = spark5.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



>Creando una vista temporal de la tabla, se puede acceder a esta como si fuera una tabla de SQL noraml.

In [89]:
# Registra el DataFrame 'df' como una vista temporal llamada "tableA"
df.createOrReplaceTempView("tableA")
# Ejecuta una consulta SQL utilizando Spark SQL, contando el número de filas en la vista "tableA"
spark5.sql("SELECT count(*) FROM tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+



>También, UDFs pueden requistrarse e invocarse en SQL directo: 

In [90]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1
spark5.udf.register("add_one", add_one)
spark5.sql("SELECT add_one(v1) FROM tableA").show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+



>Estas consultas de SQL pueden combinarse y usarsae con columnas de PySpark.

In [91]:
from pyspark.sql.functions import expr
# Selecciona la columna resultante de la función 'add_one' aplicada a la columna 'v1'
df.selectExpr('add_one(v1)').show()
# Selecciona un valor booleano que indica si la cuenta de todas las filas es mayor que 0
df.select(expr('count(*) > 0')).show()


+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+



## Agregaciones
>El método agg se utiliza para realizar agregaciones en un DataFrame. Se utiliza comúnmente para calcular estadísticas agregadas o realizar operaciones de resumen en los datos.

In [92]:
from pyspark.sql.functions import avg, count

>Crear el DataFrame para ejemplificar.

In [93]:
spark5.stop()
spark6 = SparkSession.builder.appName("example5").getOrCreate()
# Crear un DataFrame de ejemplo
data = [("Alice", 28), ("Bob", 22), ("Charlie", 35), ("Alice", 40), ("Bob", 25)]
columns = ["Name", "Age"]
df = spark6.createDataFrame(data, columns)
df.show()

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 28|
|    Bob| 22|
|Charlie| 35|
|  Alice| 40|
|    Bob| 25|
+-------+---+



>Puede usarse con notación de diccionario.

In [94]:
df.agg({'Age': 'max'}).show()

+--------+
|max(Age)|
+--------+
|      40|
+--------+



>Se realiza la agregación calculando el promedio de la columna "Age" y el recuento de nombres. La función avg se utiliza para calcular el promedio, y count se utiliza para contar el número de registros. 

In [95]:
# Realizar agregaciones usando agg
result = df.groupBy("Name").agg(avg("Age").alias("average_age"), count("Name").alias("name_count"))
# Mostrar el resultado
result.show()

+-------+-----------+----------+
|   Name|average_age|name_count|
+-------+-----------+----------+
|    Bob|       23.5|         2|
|  Alice|       34.0|         2|
|Charlie|       35.0|         1|
+-------+-----------+----------+

