In [1]:
!pip install pyspark -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


# Importando lo necesario y creando la Spark Session

In [2]:
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder\
        .master("local")\
        .appName("Api Estructurada Humai")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# Dataframes y Datasets

## ¿Qué sucede con los Datasets en PySpark?

El Dataset es 1 de las 3 estructuras de datos fundamentales de Spark, su mayor ventaja frente a las otras dos (RDDs y Dataframes) es que es "type-safe", esto quiere decir que no puedes accidentalmente ver los objetos en un Dataset como si fueran de otra clase que la clase que colocaste que eran inicialmente. Esta ultima caracteristica hace los Datasets especialmente atractivos para escribir grandes aplicaciones de Spark, en donde muchos ingenieros o ingenieras deben interactuar con interfaces correctamente definidas.

Sin embargo los Datasets **no estan disponibles** en Python (ni en R) ya que es nuestro querido lenguaje es de tipeado dinamico. Los Datasets estan disponibles en las versiones escritas para lenguajes de tipeado estatico (Java y Scala).


## Entonces trabajemos con los Dataframes!! 😁

Son colecciones estructuradas en forma de tabla, con filas y columnas bien definidas. Cada columna tiene el mismo numero de filas e informacion sobre el tipo de dato que maneja. Al igual que los RDDs los Dataframes son inmutables y Lazy Evaluated (no se ejecutan las transformaciones hasta que se de una accion).

Un pequeño resumen de como sucede la magia:

1. Escribes el codigo  de DataFrame/Dataset/SQL con PySpark.
2. Si el codigo es valido, Spark convierte esto en un Plan Logico.
3. Spark Transforma su Plan Logico en un Plan Fisico, y en esta etapa verifica si se pueden realizar optimizaciones.
4. Spark ejecuta entonces el Plan Fisico en el Cluster (A traves de operaciones con RDDs).

### Leyendolos

In [4]:
# una manera interesante de leer csv (pequeños o medianos) de una url usando spark, buen truco para practicar con data de juguete!
spark.sparkContext.addFile("https://raw.githubusercontent.com/engcarlosperezmolero/resources_and_tools/main/data/csv/energias-alternativas.csv")

In [5]:
energia_df = spark.read.csv(pyspark.SparkFiles.get("energias-alternativas.csv"), header=True, inferSchema=True)

In [6]:
energia_df.show(5, truncate=False)

+---------+---------------------+-----------+-------------------------+----------+----------------+-------+-----------------+----------------+------------+----------+--------------+-------------+-------+
|sector_id|sector_nombre        |variable_id|actividad_producto_nombre|indicador |unidad_de_medida|fuente |frecuencia_nombre|cobertura_nombre|alcance_tipo|alcance_id|alcance_nombre|indice_tiempo|valor  |
+---------+---------------------+-----------+-------------------------+----------+----------------+-------+-----------------+----------------+------------+----------+--------------+-------------+-------+
|25       |Energias alternativas|440        |Energia biogas           |Generacion|MWh             |CAMMESA|Mensual          |Nacional        |PROVINCIA   |6         |BUENOS AIRES  |2012-01-01   |0.0    |
|25       |Energias alternativas|440        |Energia biogas           |Generacion|MWh             |CAMMESA|Mensual          |Nacional        |PROVINCIA   |6         |BUENOS AIRES  |201

### Creandolos manualmente

In [7]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

esquema_manual = StructType([
StructField("saludo", StringType(), True),
StructField("valor_random", StringType(), True),
StructField("numeros", LongType(), False)
])

fila_1 = Row("Holi", None, 132334)
fila_2 = Row("Buenassss", "como va", 8930827)
fila_3 = Row(None, None, 182931)

df_manual = spark.createDataFrame([fila_1, fila_2, fila_3], esquema_manual)
df_manual.show()
df_manual.printSchema()

+---------+------------+-------+
|   saludo|valor_random|numeros|
+---------+------------+-------+
|     Holi|        null| 132334|
|Buenassss|     como va|8930827|
|     null|        null| 182931|
+---------+------------+-------+

root
 |-- saludo: string (nullable = true)
 |-- valor_random: string (nullable = true)
 |-- numeros: long (nullable = false)



In [8]:
# esta manera es mas facil pero el esquema es inferido
datos = [
    {"saludo": "Holi", "valor_random": None, "numeros": 132334},
    {"saludo": "Buenassss", "valor_random": "como va", "numeros": 8930827},
    {"saludo": None, "valor_random": None, "numeros": 182931}
]

df_manual_2 = spark.createDataFrame(datos)
df_manual_2.show()
df_manual_2.printSchema()

+-------+---------+------------+
|numeros|   saludo|valor_random|
+-------+---------+------------+
| 132334|     Holi|        null|
|8930827|Buenassss|     como va|
| 182931|     null|        null|
+-------+---------+------------+

root
 |-- numeros: long (nullable = true)
 |-- saludo: string (nullable = true)
 |-- valor_random: string (nullable = true)



### Explorando y Procesando los datos

In [9]:
# veamos un poquito que nos ofrece la interfaz del Dataframe
[i for i in dir(energia_df) if not i.startswith("_")]

['agg',
 'alias',
 'approxQuantile',
 'cache',
 'checkpoint',
 'coalesce',
 'colRegex',
 'collect',
 'columns',
 'corr',
 'count',
 'cov',
 'createGlobalTempView',
 'createOrReplaceGlobalTempView',
 'createOrReplaceTempView',
 'createTempView',
 'crossJoin',
 'crosstab',
 'cube',
 'describe',
 'distinct',
 'drop',
 'dropDuplicates',
 'drop_duplicates',
 'dropna',
 'dtypes',
 'exceptAll',
 'explain',
 'fillna',
 'filter',
 'first',
 'foreach',
 'foreachPartition',
 'freqItems',
 'groupBy',
 'groupby',
 'head',
 'hint',
 'inputFiles',
 'intersect',
 'intersectAll',
 'isEmpty',
 'isLocal',
 'isStreaming',
 'is_cached',
 'join',
 'limit',
 'localCheckpoint',
 'mapInArrow',
 'mapInPandas',
 'melt',
 'na',
 'observe',
 'orderBy',
 'pandas_api',
 'persist',
 'printSchema',
 'randomSplit',
 'rdd',
 'registerTempTable',
 'repartition',
 'repartitionByRange',
 'replace',
 'rollup',
 'sameSemantics',
 'sample',
 'sampleBy',
 'schema',
 'select',
 'selectExpr',
 'semanticHash',
 'show',
 'sort',
 

In [10]:
energia_df.columns

['sector_id',
 'sector_nombre',
 'variable_id',
 'actividad_producto_nombre',
 'indicador',
 'unidad_de_medida',
 'fuente',
 'frecuencia_nombre',
 'cobertura_nombre',
 'alcance_tipo',
 'alcance_id',
 'alcance_nombre',
 'indice_tiempo',
 'valor']

In [11]:
energia_df.select('actividad_producto_nombre', 'valor').show()

+-------------------------+---------+
|actividad_producto_nombre|    valor|
+-------------------------+---------+
|           Energia biogas|      0.0|
|           Energia biogas|      0.0|
|           Energia biogas|      0.0|
|           Energia biogas|      0.0|
|           Energia biogas|  764.924|
|           Energia biogas| 3478.232|
|           Energia biogas| 3395.954|
|           Energia biogas| 2928.667|
|           Energia biogas| 2841.751|
|           Energia biogas| 6902.033|
|           Energia biogas|  7357.68|
|           Energia biogas| 7971.476|
|           Energia biogas| 7893.987|
|           Energia biogas|  7648.21|
|           Energia biogas| 9011.614|
|           Energia biogas| 8388.014|
|           Energia biogas| 9381.065|
|           Energia biogas| 9167.534|
|           Energia biogas|10502.377|
|           Energia biogas| 9771.545|
+-------------------------+---------+
only showing top 20 rows



In [12]:
energia_df.select("actividad_producto_nombre").distinct().show()

+-------------------------+
|actividad_producto_nombre|
+-------------------------+
|            Energia solar|
|           Energia biogas|
|              Energia pah|
|          Energia biomasa|
|          Energia nuclear|
|           Energia eolica|
+-------------------------+



In [13]:
energia_df.describe().show(truncate=False)

+-------+---------+---------------------+------------------+-------------------------+------------------+----------------+-------+-----------------+----------------+------------+------------------+--------------+------------------+
|summary|sector_id|sector_nombre        |variable_id       |actividad_producto_nombre|indicador         |unidad_de_medida|fuente |frecuencia_nombre|cobertura_nombre|alcance_tipo|alcance_id        |alcance_nombre|valor             |
+-------+---------+---------------------+------------------+-------------------------+------------------+----------------+-------+-----------------+----------------+------------+------------------+--------------+------------------+
|count  |5311     |5311                 |5311              |5311                     |5311              |5311            |5311   |5311             |5311            |5311        |5311              |5311          |5311              |
|mean   |25.0     |null                 |445.45283374129167|null        

#### Creando, renombrando, dropeando y casteando columnas

In [14]:
# crear columnas
energia_df.select("valor").withColumn("valor_nuevo", energia_df["valor"] / 1000).show()

+---------+------------------+
|    valor|       valor_nuevo|
+---------+------------------+
|      0.0|               0.0|
|      0.0|               0.0|
|      0.0|               0.0|
|      0.0|               0.0|
|  764.924|0.7649239999999999|
| 3478.232|3.4782319999999998|
| 3395.954|          3.395954|
| 2928.667|          2.928667|
| 2841.751|2.8417510000000004|
| 6902.033|          6.902033|
|  7357.68|           7.35768|
| 7971.476|          7.971476|
| 7893.987|          7.893987|
|  7648.21|           7.64821|
| 9011.614|          9.011614|
| 8388.014|          8.388014|
| 9381.065| 9.381065000000001|
| 9167.534|          9.167534|
|10502.377|10.502377000000001|
| 9771.545|          9.771545|
+---------+------------------+
only showing top 20 rows



In [15]:
energia_df.withColumn("valor_constante", F.lit("TODO")).select("valor_constante").show()

+---------------+
|valor_constante|
+---------------+
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
|           TODO|
+---------------+
only showing top 20 rows



In [16]:
# renombrando
energia_df.select("valor").withColumnRenamed("valor", "valor_potencia").show()

+--------------+
|valor_potencia|
+--------------+
|           0.0|
|           0.0|
|           0.0|
|           0.0|
|       764.924|
|      3478.232|
|      3395.954|
|      2928.667|
|      2841.751|
|      6902.033|
|       7357.68|
|      7971.476|
|      7893.987|
|       7648.21|
|      9011.614|
|      8388.014|
|      9381.065|
|      9167.534|
|     10502.377|
|      9771.545|
+--------------+
only showing top 20 rows



In [17]:
# dropeando
energia_df.drop("fuente").show()

+---------+--------------------+-----------+-------------------------+----------+----------------+-----------------+----------------+------------+----------+--------------+-------------+---------+
|sector_id|       sector_nombre|variable_id|actividad_producto_nombre| indicador|unidad_de_medida|frecuencia_nombre|cobertura_nombre|alcance_tipo|alcance_id|alcance_nombre|indice_tiempo|    valor|
+---------+--------------------+-----------+-------------------------+----------+----------------+-----------------+----------------+------------+----------+--------------+-------------+---------+
|       25|Energias alternat...|        440|           Energia biogas|Generacion|             MWh|          Mensual|        Nacional|   PROVINCIA|         6|  BUENOS AIRES|   2012-01-01|      0.0|
|       25|Energias alternat...|        440|           Energia biogas|Generacion|             MWh|          Mensual|        Nacional|   PROVINCIA|         6|  BUENOS AIRES|   2012-02-01|      0.0|
|       25|Ener

In [18]:
 # cambiando tipos de datos (casteando)
nuevo_energia_df = energia_df.select("actividad_producto_nombre", "valor")\
                             .withColumn("constante", F.lit("1111"))

In [19]:
nuevo_energia_df.show(5)

+-------------------------+-------+---------+
|actividad_producto_nombre|  valor|constante|
+-------------------------+-------+---------+
|           Energia biogas|    0.0|     1111|
|           Energia biogas|    0.0|     1111|
|           Energia biogas|    0.0|     1111|
|           Energia biogas|    0.0|     1111|
|           Energia biogas|764.924|     1111|
+-------------------------+-------+---------+
only showing top 5 rows



In [20]:
nuevo_energia_df.printSchema()

root
 |-- actividad_producto_nombre: string (nullable = true)
 |-- valor: double (nullable = true)
 |-- constante: string (nullable = false)



In [21]:
nuevo_energia_df.withColumn("constante", nuevo_energia_df["constante"].cast("long")).printSchema()

root
 |-- actividad_producto_nombre: string (nullable = true)
 |-- valor: double (nullable = true)
 |-- constante: long (nullable = true)



#### Filtrando

In [22]:
nuevo_energia_df.filter(nuevo_energia_df["actividad_producto_nombre"] == "Energia nuclear").show()

+-------------------------+----------+---------+
|actividad_producto_nombre|     valor|constante|
+-------------------------+----------+---------+
|          Energia nuclear|216330.975|     1111|
|          Energia nuclear| 213602.22|     1111|
|          Energia nuclear| 117180.99|     1111|
|          Energia nuclear|       0.0|     1111|
|          Energia nuclear| 130608.18|     1111|
|          Energia nuclear|218112.525|     1111|
|          Energia nuclear| 245836.98|     1111|
|          Energia nuclear| 243580.23|     1111|
|          Energia nuclear|234465.885|     1111|
|          Energia nuclear|240846.165|     1111|
|          Energia nuclear|232596.225|     1111|
|          Energia nuclear| 239855.13|     1111|
|          Energia nuclear| 237003.48|     1111|
|          Energia nuclear|  200647.8|     1111|
|          Energia nuclear| 198150.93|     1111|
|          Energia nuclear|169882.335|     1111|
|          Energia nuclear|       0.0|     1111|
|          Energia n

In [23]:
nuevo_energia_df.where("actividad_producto_nombre = 'Energia nuclear'").show(5, False)

+-------------------------+----------+---------+
|actividad_producto_nombre|valor     |constante|
+-------------------------+----------+---------+
|Energia nuclear          |216330.975|1111     |
|Energia nuclear          |213602.22 |1111     |
|Energia nuclear          |117180.99 |1111     |
|Energia nuclear          |0.0       |1111     |
|Energia nuclear          |130608.18 |1111     |
+-------------------------+----------+---------+
only showing top 5 rows



In [24]:
# filtrando y contando
nuevo_energia_df.filter(nuevo_energia_df["actividad_producto_nombre"] == "Energia nuclear").count()

456

#### Trabajando con valores Nulos

In [25]:
df_manual.show()

+---------+------------+-------+
|   saludo|valor_random|numeros|
+---------+------------+-------+
|     Holi|        null| 132334|
|Buenassss|     como va|8930827|
|     null|        null| 182931|
+---------+------------+-------+



In [26]:
df_manual.na.drop('any').show()

+---------+------------+-------+
|   saludo|valor_random|numeros|
+---------+------------+-------+
|Buenassss|     como va|8930827|
+---------+------------+-------+



In [27]:
df_manual.na.fill("random_fill").show()

+-----------+------------+-------+
|     saludo|valor_random|numeros|
+-----------+------------+-------+
|       Holi| random_fill| 132334|
|  Buenassss|     como va|8930827|
|random_fill| random_fill| 182931|
+-----------+------------+-------+



In [28]:
df_manual.na.fill("random_fill_specific", subset=["saludo"]).show()

+--------------------+------------+-------+
|              saludo|valor_random|numeros|
+--------------------+------------+-------+
|                Holi|        null| 132334|
|           Buenassss|     como va|8930827|
|random_fill_specific|        null| 182931|
+--------------------+------------+-------+



#### Agregando (agrupando y reduciendo)

In [29]:
nuevo_energia_df.show()

+-------------------------+---------+---------+
|actividad_producto_nombre|    valor|constante|
+-------------------------+---------+---------+
|           Energia biogas|      0.0|     1111|
|           Energia biogas|      0.0|     1111|
|           Energia biogas|      0.0|     1111|
|           Energia biogas|      0.0|     1111|
|           Energia biogas|  764.924|     1111|
|           Energia biogas| 3478.232|     1111|
|           Energia biogas| 3395.954|     1111|
|           Energia biogas| 2928.667|     1111|
|           Energia biogas| 2841.751|     1111|
|           Energia biogas| 6902.033|     1111|
|           Energia biogas|  7357.68|     1111|
|           Energia biogas| 7971.476|     1111|
|           Energia biogas| 7893.987|     1111|
|           Energia biogas|  7648.21|     1111|
|           Energia biogas| 9011.614|     1111|
|           Energia biogas| 8388.014|     1111|
|           Energia biogas| 9381.065|     1111|
|           Energia biogas| 9167.534|   

In [30]:
nuevo_energia_df.select(F.avg("valor")).show()

+------------------+
|        avg(valor)|
+------------------+
|20086.474353228816|
+------------------+



In [31]:
nuevo_energia_df.select(F.var_pop("valor").alias("varianza_pobl"), F.var_samp("valor").alias("varianza_muestra"), F.stddev_pop("valor").alias("desv_estandar_pob"), F.stddev_samp("valor").alias("desv_estandar_muestra")).show()

+------------------+-------------------+-----------------+---------------------+
|     varianza_pobl|   varianza_muestra|desv_estandar_pob|desv_estandar_muestra|
+------------------+-------------------+-----------------+---------------------+
|5.77199647223549E9|5.773083477220845E9| 75973.6564358692|     75980.8099273813|
+------------------+-------------------+-----------------+---------------------+



In [32]:
nuevo_energia_df.select(F.collect_set("actividad_producto_nombre").alias("lista_unicos")).show(truncate=False)

+----------------------------------------------------------------------------------------------+
|lista_unicos                                                                                  |
+----------------------------------------------------------------------------------------------+
|[Energia pah, Energia eolica, Energia nuclear, Energia solar, Energia biogas, Energia biomasa]|
+----------------------------------------------------------------------------------------------+



In [33]:
nuevo_energia_df.select(F.collect_list("actividad_producto_nombre")).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [34]:
nuevo_energia_df.groupBy("actividad_producto_nombre").count()

DataFrame[actividad_producto_nombre: string, count: bigint]

In [35]:
# (SE DIERON CUENTA DE ALGO??? LA GENTE QUE HIZO EL EJERCICIO DE LA PRIMERA CLASE ENTENDERA!!!)
nuevo_energia_df.groupBy("actividad_producto_nombre").count().show() # es interesante ver que aqui el count() NO es una accion, haz la prueba...

+-------------------------+-----+
|actividad_producto_nombre|count|
+-------------------------+-----+
|            Energia solar|  564|
|           Energia biogas|  552|
|              Energia pah| 2172|
|          Energia biomasa|  540|
|          Energia nuclear|  456|
|           Energia eolica| 1027|
+-------------------------+-----+



In [36]:
nuevo_energia_df.groupBy("actividad_producto_nombre").agg({"valor": "avg"}).withColumnRenamed("avg(valor)", "promedio").show()

+-------------------------+------------------+
|actividad_producto_nombre|          promedio|
+-------------------------+------------------+
|            Energia solar| 4166.521010638324|
|           Energia biogas|2079.9216793478367|
|              Energia pah| 6805.738040055162|
|          Energia biomasa|3862.2890185185174|
|          Energia nuclear|147481.33160745626|
|           Energia eolica| 18560.90006523843|
+-------------------------+------------------+



In [37]:
promedios_df = nuevo_energia_df.groupBy("actividad_producto_nombre").agg({"valor": "avg"}).withColumnRenamed("avg(valor)", "promedio")

### Escribiendolos a un archivo csv

In [38]:
promedios_df.write.csv(path='/content/promedios', header=True, sep=',')

### Joineando
Una de las operaciones mas comunes a la hora de trabajar con datos puede ser la de como unir dos tablas distintas en base a ciertos criterios (normalmente la equivalencia entre una columna de una tabla con una columna de la otra).

Dentro de Spark es bastante facil joinear tablas (Dataframes) sin embargo podriamos encontrar ciertos retos que por estar trabajando con un Framework de procesamiento distribuido vamos a tener que tener en cuenta.

In [39]:
informacion_de_productos = [
        {"categoria": 'A', "sku": 1, "valor": 121.44},
        {"categoria": 'B', "sku": 2, "valor": 300.01},
        {"categoria": 'C', "sku": 3, "valor": 10.99},
        {"categoria": 'C', "sku": 3, "valor": 33.87},
        {"categoria": 'C', "sku": 3, "valor": 45.12},
        {"categoria": 'C', "sku": 3, "valor": 69.15},
        {"categoria": 'A', "sku": 1, "valor": 88.32},
        {"categoria": 'B', "sku": 2, "valor": 11.11},
        {"categoria": 'A', "sku": 1, "valor": 25.65},
        {"categoria": 'B', "sku": 2, "valor": 522.60},
       ]


productos = [
    {"sku_nr": 1, "nombre": "remera"},
    {"sku_nr": 2, "nombre": "pantalon"},
    {"sku_nr": 3, "nombre": "mate"},
]

In [40]:
info_prod_df = spark.createDataFrame(informacion_de_productos)
prod_df = spark.createDataFrame(productos)

In [41]:
info_prod_df.show()
prod_df.show()

+---------+---+------+
|categoria|sku| valor|
+---------+---+------+
|        A|  1|121.44|
|        B|  2|300.01|
|        C|  3| 10.99|
|        C|  3| 33.87|
|        C|  3| 45.12|
|        C|  3| 69.15|
|        A|  1| 88.32|
|        B|  2| 11.11|
|        A|  1| 25.65|
|        B|  2| 522.6|
+---------+---+------+

+--------+------+
|  nombre|sku_nr|
+--------+------+
|  remera|     1|
|pantalon|     2|
|    mate|     3|
+--------+------+



### Ten cuidado al joinear, puede salirte MUY caro

Explicado de manera simple en Spark puedes tener dos tipos de tablas, las grandes y las pequeñas. A pesar de que hablar del tamaño de una tabla es hablar de un espectro, sere radicalista en explicación con el fin de que puedas tomar una decision de manera binaria si llegas a tener una situacion parecida.

#### Tabla grande - Tabla grande
En este caso Spark realiza un Shuffle Join (transformacion wide). En este tipo de join cada uno de los nodos se comunica con topos y cada uno de los otros nodos, lo cual causa un gran trafico en el cluster y puede llegar a ser una de las operaciones mas caras dentro de tu proceso.




In [42]:
info_prod_df.join(prod_df, info_prod_df.sku == prod_df.sku_nr, how="inner").drop("sku_nr").show()

+---------+---+------+--------+
|categoria|sku| valor|  nombre|
+---------+---+------+--------+
|        A|  1|121.44|  remera|
|        A|  1| 88.32|  remera|
|        A|  1| 25.65|  remera|
|        B|  2|300.01|pantalon|
|        B|  2| 11.11|pantalon|
|        B|  2| 522.6|pantalon|
|        C|  3| 10.99|    mate|
|        C|  3| 33.87|    mate|
|        C|  3| 45.12|    mate|
|        C|  3| 69.15|    mate|
+---------+---+------+--------+



#### Tabla grande - Tabla pequeña

(o dicho de otra manera, que cabe en memoria)

En este caso se puede realizar una optimizacion, a pesar de que podemos dejar que suceda un Shuffle Join, es normalmente mejor usar la estrategia del Broadcast Join (transformacion narrow y menos wide). Esto ultimo consiste en hacer una copia de la tabla pequeña en cada una de las memorias de los nodos workers. Puede sonar como que se va a realizar un aumento en el trafico pero en realidad es solo al principio cuando se le envia la tabla pequeñar a los workers, despues el trabajo sucede en cada worker.

In [43]:
info_prod_df.join(F.broadcast(prod_df), info_prod_df.sku == prod_df.sku_nr, how="inner").drop("sku_nr").show()

+---------+---+------+--------+
|categoria|sku| valor|  nombre|
+---------+---+------+--------+
|        A|  1|121.44|  remera|
|        B|  2|300.01|pantalon|
|        C|  3| 10.99|    mate|
|        C|  3| 33.87|    mate|
|        C|  3| 45.12|    mate|
|        C|  3| 69.15|    mate|
|        A|  1| 88.32|  remera|
|        B|  2| 11.11|pantalon|
|        A|  1| 25.65|  remera|
|        B|  2| 522.6|pantalon|
+---------+---+------+--------+



# UDF (User Defined Functions)
Sirven para extender las funciones que trae el framework por defecto, y poder usar estas "nuevas funciones" en multiples dataframes de manera automatizada.

- Para un Dataframe: importas la funcion udf desde ```pyspark.sql.functions.udf(nombre_de_la_funcion, tipo_de_dato_del_return)```
- Para queries SQL: importas ```spark.udf.register(string, nombre_de_la_funcion, tipo_de_dato_del_return)```

In [44]:
energia_df_2 = spark.read.format("csv")\
                         .option("inferSchema", True)\
                         .option("header", True)\
                         .load(pyspark.SparkFiles.get("energias-alternativas.csv"))

In [45]:
energia_df_2.show(10, False)

+---------+---------------------+-----------+-------------------------+----------+----------------+-------+-----------------+----------------+------------+----------+--------------+-------------+--------+
|sector_id|sector_nombre        |variable_id|actividad_producto_nombre|indicador |unidad_de_medida|fuente |frecuencia_nombre|cobertura_nombre|alcance_tipo|alcance_id|alcance_nombre|indice_tiempo|valor   |
+---------+---------------------+-----------+-------------------------+----------+----------------+-------+-----------------+----------------+------------+----------+--------------+-------------+--------+
|25       |Energias alternativas|440        |Energia biogas           |Generacion|MWh             |CAMMESA|Mensual          |Nacional        |PROVINCIA   |6         |BUENOS AIRES  |2012-01-01   |0.0     |
|25       |Energias alternativas|440        |Energia biogas           |Generacion|MWh             |CAMMESA|Mensual          |Nacional        |PROVINCIA   |6         |BUENOS AIRES  

In [46]:
# hay funciones que trae por defecto (es interesante ver como usar el F.col en el where)
energia_df_2.select("sector_nombre", F.sqrt("valor"))\
            .withColumnRenamed("SQRT(valor)", "sqrt_valor")\
            .where(F.col("sqrt_valor") > 100)\
            .show(5, False)

+---------------------+------------------+
|sector_nombre        |sqrt_valor        |
+---------------------+------------------+
|Energias alternativas|102.4811055756133 |
|Energias alternativas|106.34200957288704|
|Energias alternativas|103.76036815663291|
|Energias alternativas|104.92300986914167|
|Energias alternativas|105.40138519013874|
+---------------------+------------------+
only showing top 5 rows



In [47]:
[i for i in dir(pyspark.sql.types) if not i.startswith("_") if i[0] == i[0].upper()]

['Any',
 'ArrayType',
 'AtomicType',
 'BinaryType',
 'BooleanType',
 'ByteType',
 'Callable',
 'CharType',
 'ClassVar',
 'CloudPickleSerializer',
 'DataType',
 'DataTypeSingleton',
 'DateConverter',
 'DateType',
 'DatetimeConverter',
 'DatetimeNTZConverter',
 'DayTimeIntervalType',
 'DayTimeIntervalTypeConverter',
 'DecimalType',
 'Dict',
 'DoubleType',
 'FloatType',
 'FractionalType',
 'GatewayClient',
 'IntegerType',
 'IntegralType',
 'Iterable',
 'Iterator',
 'JVMView',
 'JavaClass',
 'JavaGateway',
 'JavaObject',
 'List',
 'LongType',
 'MapType',
 'NullType',
 'NumericType',
 'NumpyArrayConverter',
 'NumpyScalarConverter',
 'Optional',
 'Row',
 'ShortType',
 'StringType',
 'StructField',
 'StructType',
 'T',
 'TYPE_CHECKING',
 'TimestampNTZType',
 'TimestampType',
 'Tuple',
 'Type',
 'TypeVar',
 'U',
 'Union',
 'UserDefinedType',
 'VarcharType']

In [48]:
def categorizar_de_energia_solar_str(energia: str) -> str:
    if energia == "Energia solar":
        return "solar"
    else:
        return "no solar"

def categorizar_de_energia_solar_int(energia: str) -> int:
    if energia == "Energia solar":
        return 1
    else:
        return 0

categorizar_solar_str_udf = F.udf(categorizar_de_energia_solar_str, pyspark.sql.types.StringType())
categorizar_solar_int_udf_malo = F.udf(categorizar_de_energia_solar_int)
categorizar_solar_int_udf2 = F.udf(categorizar_de_energia_solar_int, pyspark.sql.types.IntegerType())

In [49]:
# fijense que aqui no necesito usar F.col()
energia_nuevo = energia_df_2.select("actividad_producto_nombre").withColumn("es_energia_solar", categorizar_solar_str_udf(energia_df_2["actividad_producto_nombre"]))

In [50]:
energia_nuevo.filter(energia_nuevo["actividad_producto_nombre"] == "Energia solar").show(4, False)

+-------------------------+----------------+
|actividad_producto_nombre|es_energia_solar|
+-------------------------+----------------+
|Energia solar            |solar           |
|Energia solar            |solar           |
|Energia solar            |solar           |
|Energia solar            |solar           |
+-------------------------+----------------+
only showing top 4 rows



In [51]:
energia_nuevo.printSchema()

root
 |-- actividad_producto_nombre: string (nullable = true)
 |-- es_energia_solar: string (nullable = true)



In [52]:
energia_nuevo_malo = energia_df_2.select("actividad_producto_nombre").withColumn("es_energia_solar", categorizar_solar_int_udf_malo(energia_df_2["actividad_producto_nombre"]))

In [53]:
energia_nuevo_malo.show(4, False)

+-------------------------+----------------+
|actividad_producto_nombre|es_energia_solar|
+-------------------------+----------------+
|Energia biogas           |0               |
|Energia biogas           |0               |
|Energia biogas           |0               |
|Energia biogas           |0               |
+-------------------------+----------------+
only showing top 4 rows



In [54]:
energia_nuevo_malo.printSchema()

root
 |-- actividad_producto_nombre: string (nullable = true)
 |-- es_energia_solar: string (nullable = true)



In [55]:
energia_nuevo_int = energia_df_2.select("actividad_producto_nombre").withColumn("es_energia_solar", categorizar_solar_int_udf2(energia_df_2["actividad_producto_nombre"]))

In [56]:
energia_nuevo_int.show(5)

+-------------------------+----------------+
|actividad_producto_nombre|es_energia_solar|
+-------------------------+----------------+
|           Energia biogas|               0|
|           Energia biogas|               0|
|           Energia biogas|               0|
|           Energia biogas|               0|
|           Energia biogas|               0|
+-------------------------+----------------+
only showing top 5 rows



In [57]:
energia_nuevo_int.printSchema()

root
 |-- actividad_producto_nombre: string (nullable = true)
 |-- es_energia_solar: integer (nullable = true)



In [58]:
energia_nuevo.groupBy("es_energia_solar").count().show()

+----------------+-----+
|es_energia_solar|count|
+----------------+-----+
|           solar|  564|
|        no solar| 4747|
+----------------+-----+



In [59]:
energia_nuevo_int.groupBy("es_energia_solar").count().show()

+----------------+-----+
|es_energia_solar|count|
+----------------+-----+
|               1|  564|
|               0| 4747|
+----------------+-----+



## Pandas UDF (en caso de que quieras mejorar el performance o crear UDAFs)

In [60]:
import time
import pandas as pd
from pyspark.sql.functions import pandas_udf

In [61]:
print(spark.conf.get("spark.sql.execution.arrow.enabled"))
print(spark.conf.get("spark.sql.execution.arrow.pyspark.enabled"))

false
false


In [62]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [63]:
data = [
        {"categoria": 'A', "ID": 1, "valor": 121.44},
        {"categoria": 'B', "ID": 2, "valor": 300.01},
        {"categoria": 'C', "ID": 3, "valor": 10.99},
        {"categoria": 'E', "ID": 4, "valor": 33.87}
       ]

In [64]:
df_test = spark.createDataFrame(data)

In [65]:
df_test.show()

+---+---------+------+
| ID|categoria| valor|
+---+---------+------+
|  1|        A|121.44|
|  2|        B|300.01|
|  3|        C| 10.99|
|  4|        E| 33.87|
+---+---------+------+



### Mejora en la performance al realizar operaciones vectorizadas

In [66]:
def get_delay(x):
    time.sleep(5)
    return x

get_delay = F.udf(get_delay)

@pandas_udf(returnType=pyspark.sql.types.StringType())
def get_delay_pandas(x):
    time.sleep(5)
    return x



In [67]:
%%time
df_test.select(get_delay("categoria")).show()

+--------------------+
|get_delay(categoria)|
+--------------------+
|                   A|
|                   B|
|                   C|
|                   E|
+--------------------+

CPU times: user 129 ms, sys: 24.4 ms, total: 154 ms
Wall time: 20.4 s


In [68]:
%%time
df_test.select(get_delay_pandas("categoria")).show() # esta es mas rapida porque trabaja con la vectorizacion de numpy y pandas

+---------------------------+
|get_delay_pandas(categoria)|
+---------------------------+
|                          A|
|                          B|
|                          C|
|                          E|
+---------------------------+

CPU times: user 46.7 ms, sys: 4.66 ms, total: 51.4 ms
Wall time: 6.41 s


###  UDAF (User Defined Aggregation Function)

In [69]:
@pandas_udf(pyspark.sql.types.DoubleType())
def avg_udf_two_decimals(group: pd.Series) -> float:
    return round(group.mean(), 2)

In [70]:
nuevo_energia_df.show()

+-------------------------+---------+---------+
|actividad_producto_nombre|    valor|constante|
+-------------------------+---------+---------+
|           Energia biogas|      0.0|     1111|
|           Energia biogas|      0.0|     1111|
|           Energia biogas|      0.0|     1111|
|           Energia biogas|      0.0|     1111|
|           Energia biogas|  764.924|     1111|
|           Energia biogas| 3478.232|     1111|
|           Energia biogas| 3395.954|     1111|
|           Energia biogas| 2928.667|     1111|
|           Energia biogas| 2841.751|     1111|
|           Energia biogas| 6902.033|     1111|
|           Energia biogas|  7357.68|     1111|
|           Energia biogas| 7971.476|     1111|
|           Energia biogas| 7893.987|     1111|
|           Energia biogas|  7648.21|     1111|
|           Energia biogas| 9011.614|     1111|
|           Energia biogas| 8388.014|     1111|
|           Energia biogas| 9381.065|     1111|
|           Energia biogas| 9167.534|   

In [71]:
nuevo_energia_df.groupBy("actividad_producto_nombre").agg({"valor": "avg"}).show()

+-------------------------+------------------+
|actividad_producto_nombre|        avg(valor)|
+-------------------------+------------------+
|            Energia solar| 4166.521010638324|
|           Energia biogas|2079.9216793478367|
|              Energia pah| 6805.738040055162|
|          Energia biomasa|3862.2890185185174|
|          Energia nuclear|147481.33160745626|
|           Energia eolica| 18560.90006523843|
+-------------------------+------------------+



In [72]:
nuevo_energia_df.groupBy("actividad_producto_nombre").agg(avg_udf_two_decimals("valor")).show()

+-------------------------+---------------------------+
|actividad_producto_nombre|avg_udf_two_decimals(valor)|
+-------------------------+---------------------------+
|           Energia biogas|                    2079.92|
|          Energia biomasa|                    3862.29|
|           Energia eolica|                    18560.9|
|          Energia nuclear|                  147481.33|
|              Energia pah|                    6805.74|
|            Energia solar|                    4166.52|
+-------------------------+---------------------------+



## Bonus Informativo (mas unificado imposible 😂)

In [73]:
[i for i in dir(spark.udf) if not i.startswith("_")]

['register', 'registerJavaFunction', 'registerJavaUDAF', 'sparkSession']

# SparkSQL

## Conceptos Claves

### Catalogo (Catalog):
Es el nivel de abstraccion mas alto en Spark SQL. Es una abstraccion para el almacenamiento de metadata sobre la data almacenada en tus tablas asi como otras entidades utiles como las bases de datos, tablas, funciones y vistas. Lo consigues en de ```pyspark.sql.Catalog```.

### Tablas (Tables):
Para hacer cualquier cosa util con Spark SQL primero necesitamos definir tablas. La tablas son logicamente equivalentes a un Dataframe en cuanto a que son las estructuras de datos contra las cuales estaras ejecutando comandos. La principal diferencia entre un Dataframe y una Tabla es que tu definiras un Dataframe en el Scope del lenguaje de programacion mientras que la Tabla sera definida a nivel de la Base de Datos. Las tablas siempre contienen Data, no existe la nocion de Tabla Temporal, solo el de Vista (la cual no contiene data). Esto es importante porque si desechas una tabla correras el riesgo de estar perdiendo data al hacerlo.

```spark.sql("""SHOW tables""").show()```



In [74]:
spark.sql("""SHOW tables""").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [75]:
# creando una tabla (busar tabla pequeñita para este ejemplo (4 columnas y algunas filas de tipos de datos distintos))
import requests
with open("./credits.csv", "wb") as f:
    f.write(requests.get("https://raw.githubusercontent.com/engcarlosperezmolero/resources_and_tools/main/data/csv/movies-credits.csv").content)

In [76]:
spark.sql("""
CREATE TABLE credits (
                        PERSON_ID LONG,
                        ID STRING COMMENT 'Un comentario acerca de la columna',
                        NAME STRING,
                        CHARACTER STRING,
                        ROLE STRING
                     )
USING CSV OPTIONS (header true, path '/content/credits.csv')
""")


DataFrame[]

In [77]:
spark.sql("""SHOW tables""").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|  credits|      false|
+---------+---------+-----------+



In [78]:
#spark.sql('DROP TABLE credits')

In [79]:
spark.sql("""
SELECT *
FROM credits
""").show(10, truncate=False)

+---------+-------+---------------+-----------------------------+-----+
|PERSON_ID|ID     |NAME           |CHARACTER                    |ROLE |
+---------+-------+---------------+-----------------------------+-----+
|3748     |tm84618|Robert De Niro |Travis Bickle                |ACTOR|
|14658    |tm84618|Jodie Foster   |Iris Steensma                |ACTOR|
|7064     |tm84618|Albert Brooks  |Tom                          |ACTOR|
|3739     |tm84618|Harvey Keitel  |Matthew 'Sport' Higgins      |ACTOR|
|48933    |tm84618|Cybill Shepherd|Betsy                        |ACTOR|
|32267    |tm84618|Peter Boyle    |Wizard                       |ACTOR|
|519612   |tm84618|Leonard Harris |Senator Charles Palantine    |ACTOR|
|29068    |tm84618|Diahnne Abbott |Concession Girl              |ACTOR|
|519613   |tm84618|Gino Ardito    |Policeman at Rally           |ACTOR|
|3308     |tm84618|Martin Scorsese|Passenger Watching Silhouette|ACTOR|
+---------+-------+---------------+-----------------------------

In [80]:
spark.sql("""
DESCRIBE TABLE credits
""").show(10, False)

+---------+---------+----------------------------------+
|col_name |data_type|comment                           |
+---------+---------+----------------------------------+
|PERSON_ID|bigint   |null                              |
|ID       |string   |Un comentario acerca de la columna|
|NAME     |string   |null                              |
|CHARACTER|string   |null                              |
|ROLE     |string   |null                              |
+---------+---------+----------------------------------+



In [81]:
spark.sql("""
SHOW TABLES
""").show(10)

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|  credits|      false|
+---------+---------+-----------+



In [82]:
# usar un Dataframe como si fuera una tabla de SQL
energia_df.createOrReplaceTempView("energia_df")

In [83]:
spark.sql("""
SHOW TABLES
""").show(10)

+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|  default|   credits|      false|
|         |energia_df|       true|
+---------+----------+-----------+



In [84]:
spark.sql(f"""
SELECT *
FROM energia_df
""").show(10, False)

+---------+---------------------+-----------+-------------------------+----------+----------------+-------+-----------------+----------------+------------+----------+--------------+-------------+--------+
|sector_id|sector_nombre        |variable_id|actividad_producto_nombre|indicador |unidad_de_medida|fuente |frecuencia_nombre|cobertura_nombre|alcance_tipo|alcance_id|alcance_nombre|indice_tiempo|valor   |
+---------+---------------------+-----------+-------------------------+----------+----------------+-------+-----------------+----------------+------------+----------+--------------+-------------+--------+
|25       |Energias alternativas|440        |Energia biogas           |Generacion|MWh             |CAMMESA|Mensual          |Nacional        |PROVINCIA   |6         |BUENOS AIRES  |2012-01-01   |0.0     |
|25       |Energias alternativas|440        |Energia biogas           |Generacion|MWh             |CAMMESA|Mensual          |Nacional        |PROVINCIA   |6         |BUENOS AIRES  

In [85]:
spark.sql("DESCRIBE TABLE energia_df").show(truncate=False)

+-------------------------+---------+-------+
|col_name                 |data_type|comment|
+-------------------------+---------+-------+
|sector_id                |int      |null   |
|sector_nombre            |string   |null   |
|variable_id              |int      |null   |
|actividad_producto_nombre|string   |null   |
|indicador                |string   |null   |
|unidad_de_medida         |string   |null   |
|fuente                   |string   |null   |
|frecuencia_nombre        |string   |null   |
|cobertura_nombre         |string   |null   |
|alcance_tipo             |string   |null   |
|alcance_id               |int      |null   |
|alcance_nombre           |string   |null   |
|indice_tiempo            |date     |null   |
|valor                    |double   |null   |
+-------------------------+---------+-------+



### UDF y UDAF en Spark SQL
Recuerdas las UDF y UDAF que habiamos definido en la sección de UDF's? No?

No hay problema te refresco la memoria...

```python
def categorizar_de_energia_solar_str(energia: str) -> str:
    if energia == "Energia solar":
        return "solar"
    else:
        return "no solar"
```

```python
@pandas_udf(pyspark.sql.types.DoubleType())
def avg_udf_two_decimals(group: pd.Series) -> float:
    return round(group.mean(), 2)
```

In [86]:
# registrar una UDF para usar con SQL
spark.udf.register("categorizar_de_energia_solar", categorizar_de_energia_solar_str, pyspark.sql.types.StringType())

<function __main__.categorizar_de_energia_solar_str(energia: str) -> str>

In [87]:
spark.sql(f"""
SELECT actividad_producto_nombre, categorizar_de_energia_solar(actividad_producto_nombre) AS es_energia_solar
FROM energia_df
""").show()

+-------------------------+----------------+
|actividad_producto_nombre|es_energia_solar|
+-------------------------+----------------+
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|           Energia biogas|        no solar|
|         

In [88]:
# Algo bastante util es que puedes realizar tus analisis trabajando con AMBAS apis al mismo tiempo.
spark.sql(f"""
SELECT actividad_producto_nombre, categorizar_de_energia_solar(actividad_producto_nombre) as es_energia_solar
FROM energia_df
""").select("es_energia_solar").distinct().show()

+----------------+
|es_energia_solar|
+----------------+
|           solar|
|        no solar|
+----------------+



In [89]:
# registrando una UDAF
spark.udf.register('avf_two_decimals', avg_udf_two_decimals)

<function __main__.avg_udf_two_decimals(group: pandas.core.series.Series) -> float>

In [90]:
spark.sql(f"""
SELECT actividad_producto_nombre, avf_two_decimals(valor) as valor_promedio
FROM energia_df
GROUP BY actividad_producto_nombre
""").createOrReplaceTempView("promedios")

In [91]:
spark.sql("""
SHOW TABLES
""").show()

+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|  default|   credits|      false|
|         |energia_df|       true|
|         | promedios|       true|
+---------+----------+-----------+



In [92]:
spark.sql("""
SELECT *
FROM promedios
""").show()

+-------------------------+--------------+
|actividad_producto_nombre|valor_promedio|
+-------------------------+--------------+
|           Energia biogas|       2079.92|
|          Energia biomasa|       3862.29|
|           Energia eolica|       18560.9|
|          Energia nuclear|     147481.33|
|              Energia pah|       6805.74|
|            Energia solar|       4166.52|
+-------------------------+--------------+



# Hive Context y SQL Context (Los viejos titanes 🏛️)

En versiones antiguas de Spark, el SQL Context y Hive Context dieron la habilidad de trabajar con Dataframes y Spark SQL y eran normalmente guardados dentro la variable ```sqlContext``` en ejemplos, documentacion y codigos legacy.

Historicamente Spark 1.X tenia dos contextos: SparkContext y SQLContext, cada uno realizaba cosas distintas. El primero mas enfocado en un control muy detallado de las abstracciones centrales de Spark y el segundo mas enfocado en herramientas de alto nivel como el Spark SQL.

En Spark 2.X se combinaron las 2 API's en el SparkSession centralizado con el que hemos venido trabajando. Sin embargo aun siguen existiendo y puedes acceder a traves de ellas usando el SparkSession.

Es importante hacer la aclaracion de que tanto el SQL Context como el Hive context no deberian ser ya usados normalmente, y rara vez el SparkContext.


#### La relacion de Spark con Hive
Spark SQL puede conectarse a meta-almacenamientos de Hive.  El meta-almacenamiento de Hive (Hive Metastore) es una manera en la que cual Hive mantiene informaciones de tablas para usar a traves de sesiones. Esto es algo popular para usuarios que esten migrando de ambientes legacy de Hadoop y estan comenzando a ejecutar sus cargas de trabajo usando Spark.

Spark proporciona compatibilidad completa al poder usar tanto SQL como HiveQL.

In [93]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("App Name") \
    .enableHiveSupport() \
    .getOrCreate()