# Uso de UDFs en Pyspark

Nos permiten definir funciones custom usando Python/Pandas
- Existen dos tipos:
    -  pyspark.sql.functions.udf: Para funciones definidas en Python
    - pyspark.sql.functions.pandas_udf: Para funciones definidas en Pandas
- Solo permiten aplicar funciones por filas



## UDFs Tradicionales (Python UDFs)
- Rendimiento: las *UDFs tradicionales* pueden ser más lentas porque cada fila del DataFrame se procesa de manera individual y el procesamiento implica la serialización y deserialización de datos entre la JVM de Spark y el intérprete de Python
- Facilidad de Uso: son sencillas de escribir y entender si se está familiarizado con Python, pero pueden no ser tan eficientes para grandes volúmenes de datos

## Pandas UDFs (Vectorized UDFs)
- Rendimiento: las *pandas_udf* están optimizadas para el rendimiento ya que procesan datos en lotes utilizando el backend de Apache Arrow para la transferencia de datos entre Spark y Pandas. Esto reduce significativamente el overhead de serialización y deserialización
- Facilidad de Uso: Son más eficientes para grandes volúmenes de datos y permiten usar funciones de Pandas, lo cual puede simplificar ciertas transformaciones complejas

# Ejemplos de uso

In [0]:
# Datos de ejemplo (lista de tuplas)
data = [
    (1, "Juan", "Perez", 30, "Marketing", 3000.0),
    (2, "Ana", "Gomez", 45, "Ventas", 4000.0),
    (3, "Pedro", "Lopez", 29, "IT", 3500.0),
    (4, "Luis", "Martinez", 50, "RRHH", 4500.0),
    (5, "Maria", "Jimenez", 35, "IT", 3800.0)
]

columns = ["ID", "Nombre", "Apellido", "Edad", "Departamento", "Salario"]

df = spark.createDataFrame(data, columns) # creación de un spark df

In [0]:
df.display()

ID,Nombre,Apellido,Edad,Departamento,Salario
1,Juan,Perez,30,Marketing,3000.0
2,Ana,Gomez,45,Ventas,4000.0
3,Pedro,Lopez,29,IT,3500.0
4,Luis,Martinez,50,RRHH,4500.0
5,Maria,Jimenez,35,IT,3800.0


In [0]:
# cargar módulos básicos de spark sql
import pyspark.sql.functions as F

## Ejemplo 1: concatenar nombre y apellido

### UDF básica

In [0]:
# Definimos la función que concatena nombre_apellido
def concat_two_terms(term_1, term_2):
    return f"{term_1}_{term_2}"

concat_two_terms_udf = F.udf(concat_two_terms) # Construimos la UDF

# Aplicamos la UDF
df_concat = df.withColumn("Nombre_Completo", concat_two_terms_udf(F.col("Nombre"), F.col("Apellido")))

df_concat.display()

ID,Nombre,Apellido,Edad,Departamento,Salario,Nombre_Completo
1,Juan,Perez,30,Marketing,3000.0,Juan_Perez
2,Ana,Gomez,45,Ventas,4000.0,Ana_Gomez
3,Pedro,Lopez,29,IT,3500.0,Pedro_Lopez
4,Luis,Martinez,50,RRHH,4500.0,Luis_Martinez
5,Maria,Jimenez,35,IT,3800.0,Maria_Jimenez


### Pandas UDF

In [0]:
# Definir la pandas_udf para concatenar nombre y apellido
@F.pandas_udf("string")
def pd_udf_concat_two_terms(nombre, apellido):
    return nombre + "_" + apellido

# Aplicar la pandas_udf al DataFrame
df_concat = df.withColumn("Nombre_Completo_2", pd_udf_concat_two_terms(df["Nombre"], df["Apellido"]))

df_concat.show()

+---+------+--------+----+------------+-------+-----------------+
| ID|Nombre|Apellido|Edad|Departamento|Salario|Nombre_Completo_2|
+---+------+--------+----+------------+-------+-----------------+
|  1|  Juan|   Perez|  30|   Marketing| 3000.0|       Juan_Perez|
|  2|   Ana|   Gomez|  45|      Ventas| 4000.0|        Ana_Gomez|
|  3| Pedro|   Lopez|  29|          IT| 3500.0|      Pedro_Lopez|
|  4|  Luis|Martinez|  50|        RRHH| 4500.0|    Luis_Martinez|
|  5| Maria| Jimenez|  35|          IT| 3800.0|    Maria_Jimenez|
+---+------+--------+----+------------+-------+-----------------+



## Ejemplo 2: Bonificación salario

### UDF básica

In [0]:
# Definimos la UDF para obtener el bonus sobre el salario base
def bonus_calculate(salary, bonus_pct):
    return salary * bonus_pct

bonus_udf = F.udf(bonus_calculate) # Creamos de la UDF

bonus_df = df.withColumn("Bonificacion", bonus_udf(F.col("Salario"), F.lit(0.15)))

bonus_df.show()

+---+------+--------+----+------------+-------+------------+
| ID|Nombre|Apellido|Edad|Departamento|Salario|Bonificacion|
+---+------+--------+----+------------+-------+------------+
|  1|  Juan|   Perez|  30|   Marketing| 3000.0|       450.0|
|  2|   Ana|   Gomez|  45|      Ventas| 4000.0|       600.0|
|  3| Pedro|   Lopez|  29|          IT| 3500.0|       525.0|
|  4|  Luis|Martinez|  50|        RRHH| 4500.0|       675.0|
|  5| Maria| Jimenez|  35|          IT| 3800.0|       570.0|
+---+------+--------+----+------------+-------+------------+



### Pandas UDF

In [0]:
# Definimos la pandas_udf para obtener el bonus sobre el salario base
@F.pandas_udf("double")
def pd_udf_bonus_calculate(salary, bonus_pct):
    return salary * bonus_pct

# Aplicar la pandas_udf al DataFrame
bonus_df = df.withColumn("Bonificacion_2", pd_udf_bonus_calculate(df["Salario"], F.lit(0.15)))

bonus_df.show()

+---+------+--------+----+------------+-------+--------------+
| ID|Nombre|Apellido|Edad|Departamento|Salario|Bonificacion_2|
+---+------+--------+----+------------+-------+--------------+
|  1|  Juan|   Perez|  30|   Marketing| 3000.0|         450.0|
|  2|   Ana|   Gomez|  45|      Ventas| 4000.0|         600.0|
|  3| Pedro|   Lopez|  29|          IT| 3500.0|         525.0|
|  4|  Luis|Martinez|  50|        RRHH| 4500.0|         675.0|
|  5| Maria| Jimenez|  35|          IT| 3800.0|         570.0|
+---+------+--------+----+------------+-------+--------------+



## Ejemplo 3: Categorización Edad

### UDF Básica

In [0]:
# Definimos la función la UDF para categorizar la edad
def bucket_age(age, value_1, value_2):
    if age < value_1:
        return "Joven"
    elif value_1 <= age < value_2:
        return "Mediana_Edad"
    else:
        return "Mayor"

bucket_age_udf = F.udf(bucket_age) # Creamos de la UDF

# Aplicar la UDF al DataFrame
df_bucket_age = df.withColumn("Categoria_Edad", bucket_age_udf(F.col("Edad"), F.lit(30), F.lit(45)))

df_bucket_age.show()

+---+------+--------+----+------------+-------+--------------+
| ID|Nombre|Apellido|Edad|Departamento|Salario|Categoria_Edad|
+---+------+--------+----+------------+-------+--------------+
|  1|  Juan|   Perez|  30|   Marketing| 3000.0|  Mediana_Edad|
|  2|   Ana|   Gomez|  45|      Ventas| 4000.0|         Mayor|
|  3| Pedro|   Lopez|  29|          IT| 3500.0|         Joven|
|  4|  Luis|Martinez|  50|        RRHH| 4500.0|         Mayor|
|  5| Maria| Jimenez|  35|          IT| 3800.0|  Mediana_Edad|
+---+------+--------+----+------------+-------+--------------+



### Pandas UDF

In [0]:
# Definimos la pandas_udf para los buckets de edad
@F.pandas_udf("string")
def pd_udf_bucket_age(age, value_1, value_2):
    return age.apply(lambda x: "Joven" if x < value_1.iloc[0] else "Mediana_Edad" if x < value_2.iloc[0] else "Mayor")

# Aplicación de la pandas_udf al DataFrame
df_bucket_age = df.withColumn("Categoria_Edad_2", pd_udf_bucket_age(df["Edad"], F.lit(30), F.lit(45)))

df_bucket_age.show()

+---+------+--------+----+------------+-------+----------------+
| ID|Nombre|Apellido|Edad|Departamento|Salario|Categoria_Edad_2|
+---+------+--------+----+------------+-------+----------------+
|  1|  Juan|   Perez|  30|   Marketing| 3000.0|    Mediana_Edad|
|  2|   Ana|   Gomez|  45|      Ventas| 4000.0|           Mayor|
|  3| Pedro|   Lopez|  29|          IT| 3500.0|           Joven|
|  4|  Luis|Martinez|  50|        RRHH| 4500.0|           Mayor|
|  5| Maria| Jimenez|  35|          IT| 3800.0|    Mediana_Edad|
+---+------+--------+----+------------+-------+----------------+

