# Funciones en PySpark

En este notebook aprenderemos algunas funciones avanzadas para optimizar el rendimiento de Spark, para imputar valores faltantes o a crear funciones definidas por el usuario (UDF).

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *

### Crea la sesión de SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()

23/11/09 10:52:48 WARN Utils: Your hostname, MacBook-Air-de-Rocio.local resolves to a loopback address: 127.0.0.1; using 192.168.1.42 instead (on interface en0)
23/11/09 10:52:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/09 10:52:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/09 10:52:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Crear el DataFrame

In [4]:
emp = [(1, "AAA", "dept1", 1000),
    (2, "BBB", "dept1", 1100),
    (3, "CCC", "dept1", 3000),
    (4, "DDD", "dept1", 1500),
    (5, "EEE", "dept2", 8000),
    (6, "FFF", "dept2", 7200),
    (7, "GGG", "dept3", 7100),
    (None, None, None, 7500),
    (9, "III", None, 4500),
    (10, None, "dept5", 2500)]

dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")
       ]

df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])
deptdf = spark.createDataFrame(dept, ["id", "name"]) 

# Create Temp Tables
df.createOrReplaceTempView("empdf")
deptdf.createOrReplaceTempView("deptdf")

23/11/09 10:53:01 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


#  Expresiones SQL

También podemos usar la expresión SQL para la manipulación de datos. Tenemos la función **expr** y también una variante de un método de selección como **selectExpr** para la evaluación de expresiones SQL.

In [5]:
from pyspark.sql.functions import expr

# Intentemos categorizar el salario en Bajo, Medio y Alto según la categorización a continuación.

# 0-2000: salario_bajo
# 2001 - 5000: mid_salary
#> 5001: high_salary

cond = """case when salary > 5000 then 'high_salary'
               else case when salary > 2000 then 'mid_salary'
                    else case when salary > 0 then 'low_salary'
                         else 'invalid_salary'
                              end
                         end
                end as salary_level"""

newdf = df.withColumn("salary_level", expr(cond))
newdf.show()

                                                                                

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|NULL|NULL| NULL|  7500| high_salary|
|   9| III| NULL|  4500|  mid_salary|
|  10|NULL|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



### Usando la función selectExpr

In [6]:
newdf = df.selectExpr("*", cond)
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|NULL|NULL| NULL|  7500| high_salary|
|   9| III| NULL|  4500|  mid_salary|
|  10|NULL|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



### Funciones definidas por el usuario (UDF)
A menudo necesitamos escribir la función en función de nuestro requisito muy específico. Aquí podemos aprovechar las udfs. Podemos escribir nuestras propias funciones en un lenguaje como python y registrar la función como udf, luego podemos usar la función para operaciones de DataFrame.

* Función de Python para encontrar el nivel_salario para un salario dado.

In [7]:
def detSalary_Level(sal):
    level = None

    if(sal > 5000):
        level = 'high_salary'
    elif(sal > 2000):
        level = 'mid_salary'
    elif(sal > 0):
        level = 'low_salary'
    else:
        level = 'invalid_salary'
    return level

* Luego registre la función "detSalary_Level" como UDF.

In [8]:
sal_level = udf(detSalary_Level, StringType())

* Aplicar función para determinar el salario_level para un salario dado.

In [9]:
newdf = df.withColumn("salary_level", sal_level("salary"))
newdf.show()

[Stage 6:>                                                          (0 + 1) / 1]                                                                                

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|NULL|NULL| NULL|  7500| high_salary|
|   9| III| NULL|  4500|  mid_salary|
|  10|NULL|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



### Trabajando con valores NULL

Los valores NULL siempre son difíciles de manejar independientemente del Framework o lenguaje que usemos. Aquí en Spark tenemos pocas funciones específicas para lidiar con valores NULL.

- **es nulo()**

Esta función nos ayudará a encontrar los valores nulos para cualquier columna dada. Por ejemplo si necesitamos encontrar las columnas donde las columnas id contienen los valores nulos.

In [10]:
newdf = df.filter(df["dept"].isNull())
newdf.show()

+----+----+----+------+
|  id|name|dept|salary|
+----+----+----+------+
|NULL|NULL|NULL|  7500|
|   9| III|NULL|  4500|
+----+----+----+------+



* **No es nulo()**

Esta función funciona de manera opuesta a la función isNull () y devolverá todos los valores no nulos para una función en particular.

In [11]:
newdf = df.filter(df["dept"].isNotNull())
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
| 10|NULL|dept5|  2500|
+---+----+-----+------+



* **fillna ()**

Esta función nos ayudará a reemplazar los valores nulos.

In [12]:
# Replace -1 where the salary is null.
newdf = df.fillna("INVALID", ["dept"])
newdf.show()

+----+----+-------+------+
|  id|name|   dept|salary|
+----+----+-------+------+
|   1| AAA|  dept1|  1000|
|   2| BBB|  dept1|  1100|
|   3| CCC|  dept1|  3000|
|   4| DDD|  dept1|  1500|
|   5| EEE|  dept2|  8000|
|   6| FFF|  dept2|  7200|
|   7| GGG|  dept3|  7100|
|NULL|NULL|INVALID|  7500|
|   9| III|INVALID|  4500|
|  10|NULL|  dept5|  2500|
+----+----+-------+------+



* **dropna ()**

Esta función nos ayudará a eliminar las filas con valores nulos.

In [13]:
# Remove all rows which contains any null values.
newdf = df.dropna()
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
+---+----+-----+------+



## Partitioning


El particionamiento es un aspecto muy importante para controlar el paralelismo de la aplicación Spark.

* Consultar número de particiones.

In [14]:
df.rdd.getNumPartitions()

8

* Incrementar el número de particiones. Por ejemplo, aumentar las particiones a 6

In [15]:
newdf = df.repartition(6)
newdf.rdd.getNumPartitions()

6

**Nota: se trata de operaciones costosas, ya que requiere la mezcla de datos entre los trabajadores.**

* Disminuir el número de particiones. Por ejemplo disminuir las particiones a 2.

In [16]:
newdf = df.coalesce(2)
newdf.rdd.getNumPartitions()

2

* De forma predeterminada, el número de particiones para Spark SQL es 200.
* Pero también podemos establecer el número de particiones en el nivel de aplicación Spark. Por ejemplo establecido en 500

In [17]:
# Set number of partitions as Spark Application.
spark.conf.set("spark.sql.shuffle.partitions", "500")

# Check the number of patitions.
num_part = spark.conf.get("spark.sql.shuffle.partitions")
print("No of Partitions : {0}".format(num_part))

No of Partitions : 500
