### ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Trabajar con UDF

Es un fragmento de código que realiza una determinada tarea y puede reutilizarse para realizar la misma tarea en múltiples escenarios.
- UDF es una operación costosa en el desarrollo con Spark
- Intente disminuir el uso de UDF y aplique las funciones incorporadas

- ¿Por qué UDF no es eficiente?

Las UDF se crean en Python o Scala, pero los dataframes están en formato JVM. Así que cuando llamamos a UDF para ejecutar cierta tarea, esto ocurriría a través de la API de Java, que requiere serialización/deserialización de datos para realizar la tarea. Y como UDF es una caja negra para Spark (ya que no está en JVM), no puede aplicar técnicas de optimización por defecto.

In [0]:
from pyspark.sql.functions import *

estudiante = [(1,"Alfonso","Science",86,"A",90),
              (1,"Alfonso","Science",86,"A",90),
              (2,"Maria","Math",56,"R",55),
              (3,"Javiera","English",77,"A",85),
              (3,"Javiera","English",77,"A",85),
              (4,"Andres","Science",89,"A",98),
              (5,"Tomas","Math",69,"A",70),
              (5,"Tomas","Math",69,"A",70),
              (6,"Raul",'Phisics',45,"R",50),
              (7,"Pedro","English",61,"A",70)
             ]
schema = ["id","nombre","ramo","puntaje","status","asistencia"]

df = spark.createDataFrame(data=estudiante, schema=schema)
df.printSchema()
df.show()

root
 |-- id: long (nullable = true)
 |-- nombre: string (nullable = true)
 |-- ramo: string (nullable = true)
 |-- puntaje: long (nullable = true)
 |-- status: string (nullable = true)
 |-- asistencia: long (nullable = true)

+---+-------+-------+-------+------+----------+
| id| nombre|   ramo|puntaje|status|asistencia|
+---+-------+-------+-------+------+----------+
|  1|Alfonso|Science|     86|     A|        90|
|  1|Alfonso|Science|     86|     A|        90|
|  2|  Maria|   Math|     56|     R|        55|
|  3|Javiera|English|     77|     A|        85|
|  3|Javiera|English|     77|     A|        85|
|  4| Andres|Science|     89|     A|        98|
|  5|  Tomas|   Math|     69|     A|        70|
|  5|  Tomas|   Math|     69|     A|        70|
|  6|   Raul|Phisics|     45|     R|        50|
|  7|  Pedro|English|     61|     A|        70|
+---+-------+-------+-------+------+----------+



#### Definir UDF

Los UDF solo permiten UN PARAMETRO

##### Definir UDF para renombrar columnas

In [0]:
import pyspark.sql.functions as f

def rename_columns (rename_df):
    for column in rename_df.columns:
        new_column = "Col_" + column
        rename_df = rename_df.withColumnRenamed(column, new_column)
    return rename_df

renamed_df = rename_columns(df)
display(renamed_df)

Col_id,Col_nombre,Col_ramo,Col_puntaje,Col_status,Col_asistencia
1,Alfonso,Science,86,A,90
1,Alfonso,Science,86,A,90
2,Maria,Math,56,R,55
3,Javiera,English,77,A,85
3,Javiera,English,77,A,85
4,Andres,Science,89,A,98
5,Tomas,Math,69,A,70
5,Tomas,Math,69,A,70
6,Raul,Phisics,45,R,50
7,Pedro,English,61,A,70


##### Definir UDF para cambiar a mayúscula los registros de una columna

In [0]:
from pyspark.sql.functions import upper

def upperCase_col (df):
    upper_df = df.withColumn('columna_con_mayusculas',upper(df.nombre)).drop(col('nombre'))
    return upper_df

renamed_df = upperCase_col(df)
display(renamed_df)

id,ramo,puntaje,status,asistencia,columna_con_mayusculas
1,Science,86,A,90,ALFONSO
1,Science,86,A,90,ALFONSO
2,Math,56,R,55,MARIA
3,English,77,A,85,JAVIERA
3,English,77,A,85,JAVIERA
4,Science,89,A,98,ANDRES
5,Math,69,A,70,TOMAS
5,Math,69,A,70,TOMAS
6,Phisics,45,R,50,RAUL
7,English,61,A,70,PEDRO


##### Definir UDF para obtener primera letra de una cadena

Definir una función en Python/Scala local para obtener la primera letra de una cadena:

In [0]:
def firstLetterFunction(correo):
    return correo[0]

firstLetterFunction("apellidonombre@gmail.com")

Out[17]: 'a'

##### Definir UDF para etiquetar el día de la semana

In [0]:
def labelDayOfWeek(day):
    dia = {"Mon":"1","Tue":2,"Wed":"3","Thu":"4","Fri":"5","Sat":"6","Sun":"7"}
    return dia.get(day) + "-" + day

labelDayOfWeek("Mon")

Out[19]: '1-Mon'

#### Crear y aplicar UDF

Definir un UDF que envuelva la función. Esto serializa la función y la envía a los ejecutores para poder utilizarla en nuestro DataFrame.

In [0]:
from pyspark.sql.functions import udf

def firstLetterFunction(correo):
    return correo[0]

# Envolvemos la función en un UDF:
udfFirstLetter = udf(firstLetterFunction)

In [0]:
# Aplicamos el UDF en la columna nombre:
aplicarUDF = df.select(udfFirstLetter(col('nombre')))
display(aplicarUDF)

firstLetterFunction(nombre)
A
A
M
J
J
A
T
T
R
P


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def calculoAsistencia(asistencia):
    resultado = asistencia / 10
    return resultado

# Envolvemos la función en un UDF:
udfCalculoAsistencia = udf(calculoAsistencia, DoubleType())

In [0]:
# Aplicamos el UDF en la columna asistencia
# Si no colocasemos el tipo de dato en la creacion del UDF nos devolveria los resultados con el tipo "string"
aplicarUDF = df.select(udfCalculoAsistencia(col('asistencia')))
display(aplicarUDF)

calculoAsistencia(asistencia)
9.0
9.0
5.5
8.5
8.5
9.8
7.0
7.0
5.0
7.0


#### Registrar UDF para usar en SQL

Registrar UDF usando "spark.udf.register" para crear UDF en el SQL namespace:

In [0]:
def firstLetterFunction(correo):
    return correo[0]

df.createOrReplaceTempView("sales")

spark.udf.register("sql_udf", firstLetterFunction)


Out[24]: <function __main__.firstLetterFunction(correo)>

In [0]:
%sql
SELECT sql_udf(nombre) AS first_letter FROM sales

first_letter
A
A
M
J
J
A
T
T
R
P


#### UDF Multiparámetros

Existe una forma para crear un UDF que permita recibir másde un solo parámetro:

In [0]:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import *

def calculoAsistencia(parametros):
	#Capturamos los parámetros
	asistencia = parametros[0]
	puntaje = parametros[1]
	#
	#Calculo
	respuesta = asistencia*puntaje
	#
	#Devolvemos el valor
	return respuesta

# Envolvemos la función en un UDF:
udfCalculoAsistencia = udf(calculoAsistencia, IntegerType())

In [0]:
#Aplicamos la función
df2 = df.select(
	df["id"].alias("ID"),
    df["nombre"].alias("NOMBRE"),
	udfCalculoAsistencia(
		struct(
			df["puntaje"], 
			df["asistencia"]
		)
	).alias("CALCULO")
)

df2.show()

+---+-------+-------+
| ID| NOMBRE|CALCULO|
+---+-------+-------+
|  1|Alfonso|   7740|
|  1|Alfonso|   7740|
|  2|  Maria|   3080|
|  3|Javiera|   6545|
|  3|Javiera|   6545|
|  4| Andres|   8722|
|  5|  Tomas|   4830|
|  5|  Tomas|   4830|
|  6|   Raul|   2250|
|  7|  Pedro|   4270|
+---+-------+-------+

