In [0]:

#@title
from IPython.display import Image
Image("/content/transforma_datos.png")

Out[65]: <IPython.core.display.Image object>

# Procesammiento de datos usando PySpark
Este modulo vamos a estudiar los diferentes pasos para preprocesar y manejar datos en PySpark. Las técnicas de preprocesamiento ciertamente pueden variar de un caso a otro, y se pueden usar muchos métodos diferentes para masajear los datos en la forma deseada. La idea de este modulo es exponer algunas de las técnicas más comunes para manejar big data en Spark. En este modulo, vamos a repasar los diferentes pasos involucrados en el preprocesamiento de datos, como el manejo de valores faltantes, la fusión de conjuntos de datos, la aplicación de funciones, las agregaciones y la clasificación. Una parte importante del preprocesamiento de datos es la transformación de columnas numéricas en categóricas y viceversa, que veremos en los próximos modulos y se basa en el **Machine learning**.

## Creación de un objeto SparkSession
El primer paso es crear un objeto `SparkSession()` para usar **Spark**. También importamos todas las funciones y tipos de datos requeridos desde spark.sql:

In [0]:
# crear un dataframe de cliente declarando el esquema y pasando valores
import pyspark.sql.functions as F
from pyspark.sql.types import *

from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing').getOrCreate()

Ahora, en lugar de leer directamente un archivo para crear un `DataFrame`, Crearemos un `DataFrame`, pasando valores clave. La forma en que creamos un `DataFrame` en Spark es declarando su esquema y pasando los valores de las columnas.

## Creación del `DataFrame`
En el siguiente ejemplo, vamos a crear un nuevo `DataFrame` con cinco columnas de ciertos tipos de datos (cadena y entero). Como puede ver, cuando llamamos a mostrar en el nuevo `DataFrame`, se crea con tres filas y cinco columnas que contienen los valores que pasamos.

Para agregar cada una de las columnas  usamos la función `add()`.

In [0]:
schema=StructType().add("user_id","string").add("country","string").add("browser", "string").add("OS",'string').add("age", "integer")

In [0]:
# Creamos los valores que 
df=spark.createDataFrame([("Nica505",'Nicaragua',"Chrome","WIN",33),("Pty507",'Panama',"Safari","MacOS",35),("Br57",'Brazil',"Mozilla","Linux",25)],schema=schema)
df.show()

+-------+---------+-------+-----+---+
|user_id|  country|browser|   OS|age|
+-------+---------+-------+-----+---+
|Nica505|Nicaragua| Chrome|  WIN| 33|
| Pty507|   Panama| Safari|MacOS| 35|
|   Br57|   Brazil|Mozilla|Linux| 25|
+-------+---------+-------+-----+---+



In [0]:
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- browser: string (nullable = true)
 |-- OS: string (nullable = true)
 |-- age: integer (nullable = true)



In [0]:
df.show()

+-------+---------+-------+-----+---+
|user_id|  country|browser|   OS|age|
+-------+---------+-------+-----+---+
|Nica505|Nicaragua| Chrome|  WIN| 33|
| Pty507|   Panama| Safari|MacOS| 35|
|   Br57|   Brazil|Mozilla|Linux| 25|
+-------+---------+-------+-----+---+



## Valores nulos
Es muy común tener valores nulos como parte de los datos generales. Por lo tanto, se vuelve fundamental agregar pipeline al procesamiento de datos para manejar los valores nulos. En **Spark**, podemos lidiar con valores nulos reemplazándolos con algún valor específico o quitando las filas/columnas que contienen valores nulos.

Primero, vamos a crear un nuevo `DataFrame` (df_na) que contiene valores nulos en dos de sus columnas (el esquema es el mismo que en el `DataFrame` anterior). En esta primera parte vamos a valores nulos, llenamos todos los valores nulos en el `DataFrame` actual con un valor de 0, lo que ofrece una solución rápida. Luego vamos a usar la función `fillna` para reemplazar todos los valores nulos en el `DataFrame` con 0. Por el segundo parte, vamos a  reemplazar los valores nulos en columnas específicas (país, navegador) respectivamente.

In [0]:
# Creamos un dataframe con valores nulos
df_na=spark.createDataFrame([("A203",None,"Chrome","WIN",33),("A201",'Panama',None,"MacOS",35),("A205",'Brazil',"Mozilla","Linux",25)],schema=schema)


In [0]:
# Visualicemos las primeras filas
df_na.show()

+-------+-------+-------+-----+---+
|user_id|country|browser|   OS|age|
+-------+-------+-------+-----+---+
|   A203|   null| Chrome|  WIN| 33|
|   A201| Panama|   null|MacOS| 35|
|   A205| Brazil|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



In [0]:
# Usamos la función fillna para reemplazar los datos nulos con 0
df_na.fillna('0').show()

+-------+-------+-------+-----+---+
|user_id|country|browser|   OS|age|
+-------+-------+-------+-----+---+
|   A203|      0| Chrome|  WIN| 33|
|   A201| Panama|      0|MacOS| 35|
|   A205| Brazil|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



In [0]:
# fillna para llenar los valores null con valores especificos 
df_na.fillna( { 'country':'Mexico', 'browser':'Explorador' } ).show()

+-------+-------+----------+-----+---+
|user_id|country|   browser|   OS|age|
+-------+-------+----------+-----+---+
|   A203| Mexico|    Chrome|  WIN| 33|
|   A201| Panama|Explorador|MacOS| 35|
|   A205| Brazil|   Mozilla|Linux| 25|
+-------+-------+----------+-----+---+



Para eliminar las filas con valores nulos, simplemente podemos usar la funcionalidad `na.drop` en **PySpark**. Mientras que si es necesario hacer esto para columnas específicas, también podemos pasar el conjunto de nombres de columna, como se muestra en el siguiente ejemplo:

In [0]:
# Devolver nuevas filas omitiendo df con valores nulos# 
df_na.na.drop().show()

+-------+-------+-------+-----+---+
|user_id|country|browser|   OS|age|
+-------+-------+-------+-----+---+
|   A205| Brazil|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



In [0]:
df_na.na.drop(subset='country').show()

+-------+-------+-------+-----+---+
|user_id|country|browser|   OS|age|
+-------+-------+-------+-----+---+
|   A201| Panama|   null|MacOS| 35|
|   A205| Brazil|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



Otro paso muy común en el procesamiento de datos es reemplazar algunos puntos de datos con valores particulares. Podemos usar la función `replace()` para esto, como se muestra en el siguiente ejemplo. Para soltar la columna de un `DataFrame` podemos usar la función de soltar de **PySpark**.

In [0]:
df_na.replace("Chrome","Google Chrome").show()

+-------+-------+-------------+-----+---+
|user_id|country|      browser|   OS|age|
+-------+-------+-------------+-----+---+
|   A203|   null|Google Chrome|  WIN| 33|
|   A201| Panama|         null|MacOS| 35|
|   A205| Brazil|      Mozilla|Linux| 25|
+-------+-------+-------------+-----+---+



In [0]:
#deleting column 
df.drop('user_id').show()

+---------+-------+-----+---+
|  country|browser|   OS|age|
+---------+-------+-----+---+
|Nicaragua| Chrome|  WIN| 33|
|   Panama| Safari|MacOS| 35|
|   Brazil|Mozilla|Linux| 25|
+---------+-------+-----+---+



## Cargando dataset
Ahora que hemos visto cómo crear un `DataFrame` pasando un valor y cómo tratar los valores faltantes, podemos crear un `DataFrame` de **Spark** leyendo un archivo (.csv, parquet, etc.). El conjunto de datos contiene un total de siete columnas y 2000 filas. La función de resumen nos permite ver las medidas estadísticas del conjunto de datos, como el mínimo, el máximo y la media de los datos numéricos presentes en el `DataFrame`.

In [0]:
# File location and type
file_location = "/FileStore/tables/customer_data-1.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

In [0]:
df=spark.read.csv(file_location,header=True,inferSchema=True)

In [0]:
df.count()

Out[18]: 2000

In [0]:
len(df.columns)

Out[19]: 7

In [0]:
df.printSchema()

root
 |-- Customer_subtype: string (nullable = true)
 |-- Number_of_houses: integer (nullable = true)
 |-- Avg_size_household: integer (nullable = true)
 |-- Avg_age: string (nullable = true)
 |-- Customer_main_type: string (nullable = true)
 |-- Avg_Salary: integer (nullable = true)
 |-- label: integer (nullable = true)



In [0]:
df.show(3)

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|Lower class large...|               1|                 3|30-40 years|Family with grown...|     44905|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     37575|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     27915|    0|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
only showing top 3 rows



In [0]:
df.columns

Out[22]: ['Customer_subtype',
 'Number_of_houses',
 'Avg_size_household',
 'Avg_age',
 'Customer_main_type',
 'Avg_Salary',
 'label']

In [0]:
df.summary().show()

+-------+--------------------+------------------+------------------+-----------+--------------------+-----------------+------------------+
|summary|    Customer_subtype|  Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|       Avg_Salary|             label|
+-------+--------------------+------------------+------------------+-----------+--------------------+-----------------+------------------+
|  count|                2000|              2000|              2000|       2000|                2000|             2000|              2000|
|   mean|                null|            1.1075|            2.6895|       null|                null|     1616908.0835|            0.0605|
| stddev|                null|0.3873225521186316|0.7914562220841646|       null|                null|6822647.757312146|0.2384705099001677|
|    min|Affluent senior a...|                 1|                 1|20-30 years|      Average Family|             1361|                 0|
|    25%|                nu

La mayoría de las veces, no usaremos todas las columnas presentes en el `DataFrame`, ya que algunas pueden ser redundantes y tener muy poco valor en términos de proporcionar información útil. Por lo tanto, subdividir el `DataFrame` se vuelve fundamental para tener los datos adecuados para el análisis.

### Subset de un `DataFrame`
Se puede crear un subconjunto de un `DataFrame`, en función de múltiples condiciones en las que seleccionamos algunas filas, columnas o datos con ciertos filtros establecidos. En los siguientes ejemplos, veremos cómo podemos crear un subconjunto del `DataFrame` original, según ciertas condiciones, para demostrar el proceso de filtrado de registros.
* Select
* Filter
* Where

### Select
En este ejemplo, tomamos una de las columnas del `DataFrame`, `Avg_Salary`, y creamos un subconjunto del dataframe original, usando la función `select()`. Podemos pasar cualquier número de columnas que deben estar presentes en el subconjunto. Luego aplicamos un filtro en el `DataFrame` para extraer los registros, en función de un cierto umbral (Avg_Salary> 1000000). Una vez filtrados, podemos tomar el recuento total de registros presentes en el subconjunto o tomarlo para su posterior procesamiento.

In [0]:
df.select(['Customer_subtype','Avg_Salary']).show()

+--------------------+----------+
|    Customer_subtype|Avg_Salary|
+--------------------+----------+
|Lower class large...|     44905|
|Mixed small town ...|     37575|
|Mixed small town ...|     27915|
|Modern, complete ...|     19504|
|  Large family farms|     34943|
|    Young and rising|     13064|
|Large religious f...|     29090|
|Lower class large...|      6895|
|Lower class large...|     35497|
|     Family starters|     30800|
|       Stable family|     39157|
|Modern, complete ...|     40839|
|Lower class large...|     30008|
|        Mixed rurals|     37209|
|    Young and rising|     45361|
|Lower class large...|     45650|
|Traditional families|     18982|
|Mixed apartment d...|     30093|
|Young all america...|     27097|
|Low income catholics|     23511|
+--------------------+----------+
only showing top 20 rows



In [0]:
df.filter(df['Avg_Salary'] > 1000000).count()

Out[25]: 128

In [0]:
df.filter(df['Avg_Salary'] > 1000000).show()

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
| High status seniors|               1|                 3|40-50 years|Successful hedonists|   4670288|    0|
| High status seniors|               1|                 3|50-60 years|Successful hedonists|   9561873|    0|
| High status seniors|               1|                 2|40-50 years|Successful hedonists|  18687005|    0|
| High status seniors|               1|                 2|40-50 years|Successful hedonists|  24139960|    0|
| High status seniors|               1|                 2|50-60 years|Successful hedonists|   6718606|    0|
|High Income, expe...|               1|                 3|40-50 years|Successful hedonists|  19347139|    0|
|High Income, expe.

### Filter
También podemos aplicar más de un filtro en el `DataFrame`, al incluir más condiciones, como se muestra a continuación. Esto se puede hacer de dos maneras: primero, aplicando filtros consecutivos, luego usando operandos (&, o) con una instrucción where.

In [0]:
df.filter(df['Avg_Salary'] > 500000).filter(df['Number_of_houses'] > 2).show()

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    596723|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    944444|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    788477|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    994077|    0|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+



### Where

In [0]:
df.where((df['Avg_Salary'] > 500000) & (df['Number_of_houses'] > 2)).show()


+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    596723|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    944444|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    788477|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    994077|    0|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+



Podemos observar que los resultados son los mismo, a pesar que hemos usado instrucciones diferentes.

### Aggregations 
Cualquier tipo de agregación se puede dividir simplemente en tres etapas, en el siguiente orden:

* Split
* Apply
* Combine

El primer paso es dividir los datos, en función de una columna o grupo de columnas, y luego realizar la operación en esos pequeños grupos individuales (recuento, máximo, promedio, etc.). Una vez que se obtienen los resultados para cada conjunto de grupos, el último paso es combinar todos estos resultados. En el siguiente ejemplo, agregamos los datos, según el 'Subtipo de cliente', y simplemente contamos la cantidad de registros en cada categoría. Usamos la función `groupBy()` en **PySpark**. El resultado de esto no está en ningún orden en particular, ya que no hemos aplicado ninguna clasificación a los resultados. Por tanto, también veremos cómo podemos aplicar cualquier tipo de clasificación a los resultados finales. Debido a que tenemos siete columnas en el `DataFrame`, todas son columnas categóricas excepto una (Avg_Salary), podemos iterar sobre cada columna y aplicar la agregación como en el siguiente ejemplo:

In [0]:
df.groupBy('Customer_subtype').count().show()

+--------------------+-----+
|    Customer_subtype|count|
+--------------------+-----+
|Large family, emp...|   56|
|Religious elderly...|   47|
|Large religious f...|  107|
|Modern, complete ...|   93|
|    Village families|   68|
|Young all america...|   62|
|Young urban have-...|    4|
|Young seniors in ...|   22|
|Fresh masters in ...|    2|
|High Income, expe...|   52|
|Lower class large...|  288|
| Residential elderly|    6|
|Senior cosmopolitans|    1|
|        Mixed rurals|   67|
|Career and childcare|   33|
|Low income catholics|   72|
|Mixed apartment d...|   34|
|Seniors in apartm...|   17|
|Middle class fami...|  122|
|Traditional families|  129|
+--------------------+-----+
only showing top 20 rows



In [0]:
for col in df.columns:
    if col !='Avg_Salary':
        print(f" *** Aggregation for  {col} ***")
        df.groupBy(col).count().orderBy('count',ascending=False).show(truncate=False)

    

 *** Aggregation for  Customer_subtype ***
+------------------------------------------+-----+
|Customer_subtype                          |count|
+------------------------------------------+-----+
|Lower class large families                |288  |
|Traditional families                      |129  |
|Middle class families                     |122  |
|Large religious families                  |107  |
|Modern, complete families                 |93   |
|Couples with teens 'Married with children'|83   |
|Young and rising                          |78   |
|High status seniors                       |76   |
|Low income catholics                      |72   |
|Mixed seniors                             |71   |
|Village families                          |68   |
|Mixed rurals                              |67   |
|Young all american family                 |62   |
|Stable family                             |62   |
|Large family, employed child              |56   |
|Young, low educated                   

Como se mencionó, podemos tener diferentes tipos de operaciones en grupos de registros, como
* Mean
* Max
* Min
* Sum

Los siguientes ejemplos cubren algunos de estos, basados en diferentes agrupaciones. **F** se refiere a la función **Spark sql** aquí.

In [0]:
# Determinamos el mínimo
df.groupBy('Customer_main_type').agg(F.min('Avg_Salary')).show()

+--------------------+---------------+
|  Customer_main_type|min(Avg_Salary)|
+--------------------+---------------+
|             Farmers|          10469|
|       Career Loners|          13246|
|Retired and Relig...|           1361|
|Successful hedonists|          12705|
|         Living well|          10418|
|      Average Family|          10506|
|    Cruising Seniors|          10100|
|Conservative fami...|          10179|
|      Driven Growers|          10257|
|Family with grown...|           1502|
+--------------------+---------------+



In [0]:
# Determinamos el máximo
df.groupBy('Customer_main_type').agg(F.max('Avg_Salary')).show()

+--------------------+---------------+
|  Customer_main_type|max(Avg_Salary)|
+--------------------+---------------+
|             Farmers|          49965|
|       Career Loners|          49903|
|Retired and Relig...|          49564|
|Successful hedonists|       48919896|
|         Living well|          49816|
|      Average Family|         991838|
|    Cruising Seniors|          49526|
|Conservative fami...|          49965|
|      Driven Growers|          49932|
|Family with grown...|          49901|
+--------------------+---------------+



In [0]:
# Determinamos la suma
df.groupBy('Customer_main_type').agg(F.sum('Avg_Salary')).show()

+--------------------+---------------+
|  Customer_main_type|sum(Avg_Salary)|
+--------------------+---------------+
|             Farmers|        2809468|
|       Career Loners|         484089|
|Retired and Relig...|        5522439|
|Successful hedonists|     3158111161|
|         Living well|        5552540|
|      Average Family|       32111040|
|    Cruising Seniors|        1732220|
|Conservative fami...|        6963043|
|      Driven Growers|        5292275|
|Family with grown...|       15237892|
+--------------------+---------------+



In [0]:
# Determinamos la media o promedio
df.groupBy('Customer_main_type').agg(F.mean('Avg_Salary')).show()

+--------------------+--------------------+
|  Customer_main_type|     avg(Avg_Salary)|
+--------------------+--------------------+
|             Farmers|  30209.333333333332|
|       Career Loners|             32272.6|
|Retired and Relig...|   27338.80693069307|
|Successful hedonists|1.6278923510309279E7|
|         Living well|  31194.044943820223|
|      Average Family|  104256.62337662338|
|    Cruising Seniors|  28870.333333333332|
|Conservative fami...|  29504.419491525423|
|      Driven Growers|   30769.04069767442|
|Family with grown...|  28114.191881918818|
+--------------------+--------------------+



### Sorting 
A veces, simplemente existe la necesidad de ordenar los datos con agregación o sin ningún tipo de agregación. Ahí es donde podemos hacer uso de la función `sort` y `orderBy` de **PySpark**, para reorganizar los datos en un orden particular, como se muestra en los siguientes ejemplos:

In [0]:
df.sort("Avg_salary", ascending=False).show()

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
| High status seniors|               1|                 2|60-70 years|Successful hedonists|  48919896|    0|
|High Income, expe...|               1|                 2|50-60 years|Successful hedonists|  48177970|    0|
|High Income, expe...|               1|                 2|50-60 years|Successful hedonists|  48069548|    1|
|High Income, expe...|               1|                 3|40-50 years|Successful hedonists|  46911924|    0|
| High status seniors|               1|                 3|40-50 years|Successful hedonists|  46614009|    0|
|High Income, expe...|               1|                 3|30-40 years|Successful hedonists|  45952441|    0|
|High Income, expe.

In [0]:
df.groupBy('Customer_subtype').agg(F.avg('Avg_Salary').alias('mean_salary')).orderBy('mean_salary',ascending=False).show(50,False)

+------------------------------------------+--------------------+
|Customer_subtype                          |mean_salary         |
+------------------------------------------+--------------------+
|High status seniors                       |2.507677857894737E7 |
|High Income, expensive child              |2.3839817807692308E7|
|Affluent young families                   |662068.7777777778   |
|Affluent senior apartments                |653638.8235294118   |
|Senior cosmopolitans                      |49903.0             |
|Students in apartments                    |35532.142857142855  |
|Large family farms                        |33135.61538461538   |
|Young, low educated                       |33072.21428571428   |
|Large family, employed child              |32867.857142857145  |
|Suburban youth                            |32558.0             |
|Village families                          |32449.470588235294  |
|Middle class families                     |31579.385245901638  |
|Modern, c

In [0]:
df.groupBy('Customer_subtype').agg(F.max('Avg_Salary').alias('max_salary')).orderBy('max_salary',ascending=False).show()

+--------------------+----------+
|    Customer_subtype|max_salary|
+--------------------+----------+
| High status seniors|  48919896|
|High Income, expe...|  48177970|
|Affluent senior a...|    994077|
|Affluent young fa...|    991838|
|Traditional families|     49965|
|  Large family farms|     49965|
|Middle class fami...|     49932|
|Senior cosmopolitans|     49903|
|Mixed small town ...|     49901|
|Lower class large...|     49899|
|       Mixed seniors|     49876|
|    Young and rising|     49816|
|        Mixed rurals|     49785|
|Modern, complete ...|     49729|
| Young, low educated|     49626|
|Mixed apartment d...|     49621|
|     Family starters|     49602|
|    Village families|     49575|
|Religious elderly...|     49564|
|       Stable family|     49548|
+--------------------+----------+
only showing top 20 rows



### Collect 
En algunos casos, también debemos recopilar la lista de valores para grupos particulares o para categorías individuales. Por ejemplo, supongamos que un cliente va a una tienda en línea y accede a diferentes páginas del sitio web de la tienda. Si tenemos que recopilar todas las actividades del cliente en una lista, podemos usar la función de `collect` en **PySpark**. Podemos recopilar valores de dos maneras diferentes:
* Collect List
* Collect Set

In [0]:
# Collect _set 
df.groupby("Customer_subtype").agg(F.collect_set("Number_of_houses")).show()

+--------------------+-----------------------------+
|    Customer_subtype|collect_set(Number_of_houses)|
+--------------------+-----------------------------+
|Large family, emp...|                       [1, 2]|
|Religious elderly...|                       [1, 2]|
|Large religious f...|                       [1, 2]|
|Modern, complete ...|                       [1, 2]|
|    Village families|                       [1, 2]|
|Young all america...|                       [1, 2]|
|Young urban have-...|                       [1, 2]|
|Young seniors in ...|                    [1, 2, 3]|
|Fresh masters in ...|                          [1]|
|High Income, expe...|                          [1]|
|Lower class large...|                       [1, 2]|
| Residential elderly|                    [1, 2, 3]|
|Senior cosmopolitans|                          [3]|
|        Mixed rurals|                          [1]|
|Career and childcare|                       [1, 2]|
|Low income catholics|                        

In [0]:
#collect list 
df.groupby("Customer_subtype").agg(F.collect_list("Number_of_houses")).show()

+--------------------+------------------------------+
|    Customer_subtype|collect_list(Number_of_houses)|
+--------------------+------------------------------+
|Large family, emp...|          [2, 1, 2, 1, 2, 1...|
|Religious elderly...|          [1, 1, 1, 1, 1, 1...|
|Large religious f...|          [2, 1, 1, 2, 1, 1...|
|Modern, complete ...|          [1, 1, 2, 1, 1, 1...|
|    Village families|          [1, 1, 1, 1, 1, 1...|
|Young all america...|          [1, 1, 2, 2, 1, 1...|
|Young urban have-...|                  [1, 2, 1, 1]|
|Young seniors in ...|          [1, 1, 1, 1, 1, 2...|
|Fresh masters in ...|                        [1, 1]|
|High Income, expe...|          [1, 1, 1, 1, 1, 1...|
|Lower class large...|          [1, 1, 1, 1, 1, 1...|
| Residential elderly|            [3, 1, 1, 3, 2, 1]|
|Senior cosmopolitans|                           [3]|
|        Mixed rurals|          [1, 1, 1, 1, 1, 1...|
|Career and childcare|          [2, 1, 1, 1, 1, 1...|
|Low income catholics|      

La necesidad de crear una nueva columna con un valor constante puede ser muy común. Por lo tanto, podemos hacer eso en PySpark, usando la función `lit`. En el siguiente ejemplo, creamos una nueva columna con un valor constante:

In [0]:
# creando una nueva columna con valor constante

df=df.withColumn('constant',F.lit('finance'))

In [0]:
df.select('Customer_subtype','constant').show()

+--------------------+--------+
|    Customer_subtype|constant|
+--------------------+--------+
|Lower class large...| finance|
|Mixed small town ...| finance|
|Mixed small town ...| finance|
|Modern, complete ...| finance|
|  Large family farms| finance|
|    Young and rising| finance|
|Large religious f...| finance|
|Lower class large...| finance|
|Lower class large...| finance|
|     Family starters| finance|
|       Stable family| finance|
|Modern, complete ...| finance|
|Lower class large...| finance|
|        Mixed rurals| finance|
|    Young and rising| finance|
|Lower class large...| finance|
|Traditional families| finance|
|Mixed apartment d...| finance|
|Young all america...| finance|
|Low income catholics| finance|
+--------------------+--------+
only showing top 20 rows



Debido a que estamos tratando con `DataFrame`, es un requisito común aplicar ciertas funciones personalizadas en columnas específicas y obtener el resultado. Por lo tanto, hacemos uso de **UDF** para aplicar funciones de Python en una o más columnas.

### User-Defined Functions (UDFs)
En este ejemplo, intentamos nombrar las categorías de edad y crear una función Python estándar (categoría_edad) para las mismas. Para aplicar esto en el dataframe de Spark, creamos un objeto UDF usando esta función de Python. El único requisito es mencionar el tipo de retorno de la función. En este caso, es simplemente un valor de cadena.

In [0]:
from pyspark.sql.functions import udf
df.groupby("Avg_age").count().show()

+-----------+-----+
|    Avg_age|count|
+-----------+-----+
|70-80 years|    8|
|50-60 years|  373|
|30-40 years|  496|
|20-30 years|   31|
|60-70 years|   64|
|40-50 years| 1028|
+-----------+-----+



In [0]:
# crear una función para asignar categorías
def age_category(age):
    if age  == '20-30 years':
        return 'Young'
    elif age== '30-40 years':
        return 'Mid Aged' 
    elif ((age== '40-50 years') or (age== '50-60 years')) :
        return 'Old'
    else:
        return 'Very Old'



In [0]:
# creamos age categorica udf 
age_udf=udf(age_category,StringType())
# creamos el bucket column by applying udf
df=df.withColumn('age_category',age_udf(df['Avg_age']))

In [0]:
df.select('Avg_age','age_category').show()

+-----------+------------+
|    Avg_age|age_category|
+-----------+------------+
|30-40 years|    Mid Aged|
|30-40 years|    Mid Aged|
|30-40 years|    Mid Aged|
|40-50 years|         Old|
|30-40 years|    Mid Aged|
|20-30 years|       Young|
|30-40 years|    Mid Aged|
|40-50 years|         Old|
|50-60 years|         Old|
|40-50 years|         Old|
|40-50 years|         Old|
|40-50 years|         Old|
|40-50 years|         Old|
|40-50 years|         Old|
|30-40 years|    Mid Aged|
|40-50 years|         Old|
|40-50 years|         Old|
|40-50 years|         Old|
|30-40 years|    Mid Aged|
|50-60 years|         Old|
+-----------+------------+
only showing top 20 rows



In [0]:
df.groupby("age_category").count().show()

Los UDF de Pandas son otro avance reciente, así que repasémoslos ahora.

## Pandas UDF
Las UDF de Pandas son mucho más rápidas y eficientes, en términos de procesamiento y tiempo de ejecución, en comparación con las UDF estándar de Python. La principal diferencia entre una UDF de Python normal y una UDF de Pandas es que una UDF de Python se ejecuta fila por fila y, por lo tanto, realmente no ofrece la ventaja de un marco distribuido. Puede llevar más tiempo, en comparación con Pandas UDF, que se ejecuta bloque por bloque y ofrece resultados más rápidos. Hay tres tipos diferentes de UDF de Pandas: `scalar, grouped map`, y `grouped agg`. La única diferencia en el uso de una UDF de Pandas en comparación con una UDF tradicional radica en la declaración. En el siguiente ejemplo, intentamos escalar los valores de Avg_Salary aplicando la escala. Primero tomamos los valores mínimo y máximo de Avg_Salary, restamos de cada valor el salario mínimo de cada valor y luego lo dividimos por la diferencia entre el máximo y el mínimo.

$$\frac{X-x_{min}}{X_{max}-X{min}}$$

In [0]:
df.select('Avg_Salary').summary().show()

+-------+-----------------+
|summary|       Avg_Salary|
+-------+-----------------+
|  count|             2000|
|   mean|     1616908.0835|
| stddev|6822647.757312146|
|    min|             1361|
|    25%|            20315|
|    50%|            31421|
|    75%|            42949|
|    max|         48919896|
+-------+-----------------+



In [0]:
min_sal=1361
max_sal=48919896

In [0]:
### Pandas udf 
from pyspark.sql.functions import pandas_udf, PandasUDFType

def scaled_salary(salary):
    scaled_sal=(salary-min_sal)/(max_sal-min_sal)
    return scaled_sal

In [0]:
scaling_udf = pandas_udf(scaled_salary, DoubleType())
df.withColumn("scaled_salary", scaling_udf(df['Avg_Salary'])).show(10,False)



+--------------------------+----------------+------------------+-----------+---------------------+----------+-----+--------+------------+---------------------+
|Customer_subtype          |Number_of_houses|Avg_size_household|Avg_age    |Customer_main_type   |Avg_Salary|label|constant|age_category|scaled_salary        |
+--------------------------+----------------+------------------+-----------+---------------------+----------+-----+--------+------------+---------------------+
|Lower class large families|1               |3                 |30-40 years|Family with grown ups|44905     |0    |finance |Mid Aged    |8.901329526732557E-4 |
|Mixed small town dwellers |1               |2                 |30-40 years|Family with grown ups|37575     |0    |finance |Mid Aged    |7.40291997705982E-4  |
|Mixed small town dwellers |1               |2                 |30-40 years|Family with grown ups|27915     |0    |finance |Mid Aged    |5.42820834679534E-4  |
|Modern, complete families |1           

Así es como podemos usar tanto las UDF convencionales como las de Pandas para aplicar diferentes condiciones en el `DataFrame`, según sea necesario.

### Joins
Combinar diferentes conjuntos de datos es un requisito muy genérico presente en la mayoría de los `Pipelines` de procesamiento de datos en el mundo de **Big Data**. **PySpark** ofrece varias formas de realizar este trabajo **merge** y **pivot** los valores de su marco de datos, según sea necesario. En el siguiente ejemplo, creamos un `DataFrame` fabricado con algunos valores de código de región ficticios para todos los tipos de clientes. La idea es combinar este `DataFrame` con el `DataFrame` original, para tener estos códigos de región como parte del `DataFrame` original, como una columna.

In [0]:
df.groupby("Customer_main_type").count().show(50,False)

+---------------------+-----+
|Customer_main_type   |count|
+---------------------+-----+
|Farmers              |93   |
|Career Loners        |15   |
|Retired and Religious|202  |
|Successful hedonists |194  |
|Living well          |178  |
|Average Family       |308  |
|Cruising Seniors     |60   |
|Conservative families|236  |
|Driven Growers       |172  |
|Family with grown ups|542  |
+---------------------+-----+



In [0]:
# Creamos un nuevo DataFrame llamado región
region_data = spark.createDataFrame([('Family with grown ups','PN'),
                                    ('Driven Growers','GJ'),
                                    ('Conservative families','DD'),
                                    ('Cruising Seniors','DL'),
                                    ('Average Family ','MN'),
                                    ('Living well','KA'),
                                    ('Successful hedonists','JH'),
                                    ('Retired and Religious','AX'),
                                   ('Career Loners','HY'),('Farmers','JH')],schema=StructType().add("Customer_main_type","string").add("Region Code","string"))

In [0]:
region_data.show()

+--------------------+-----------+
|  Customer_main_type|Region Code|
+--------------------+-----------+
|Family with grown...|         PN|
|      Driven Growers|         GJ|
|Conservative fami...|         DD|
|    Cruising Seniors|         DL|
|     Average Family |         MN|
|         Living well|         KA|
|Successful hedonists|         JH|
|Retired and Relig...|         AX|
|       Career Loners|         HY|
|             Farmers|         JH|
+--------------------+-----------+



In [0]:
# Vamos a unir ahora ambos dataframe
new_df=df.join(region_data,on='Customer_main_type')



In [0]:
new_df.groupby("Region Code").count().show(50,False)

+-----------+-----+
|Region Code|count|
+-----------+-----+
|PN         |542  |
|GJ         |172  |
|DD         |236  |
|DL         |60   |
|KA         |178  |
|JH         |287  |
|AX         |202  |
|HY         |15   |
+-----------+-----+



Tomamos el recuento regional después de unir el marco de datos original (df) con el `DataFrame` `region_data` recién creado en la columna `Customer_main_type`.

### Pivoting 
Podemos usar la función `pivot()` en **PySpark** para simplemente crear una vista dinámica del `DataFrame` para columnas específicas, como se muestra en el siguiente ejemplo. Aquí, estamos agrupando datos, según el tipo de cliente. Las columnas representan diferentes grupos de edad. Los valores dentro de **la tabla dinámica(pivote table)** son la suma del salario promedio de cada una de estas categorías de tipo de cliente para un grupo de edad en particular. También nos aseguramos de que no haya valores *nulos o vacíos*, completando todos los valores nulos con 0. En el siguiente ejemplo, creamos una **tabla dinámica** más, usando la columna de etiqueta y tomando la suma de Salario promedio como los valores dentro de ella.

In [0]:
df.groupBy('Customer_main_type').pivot('Avg_age').sum('Avg_salary').fillna(0).show()

+--------------------+-----------+-----------+-----------+-----------+-----------+-----------+
|  Customer_main_type|20-30 years|30-40 years|40-50 years|50-60 years|60-70 years|70-80 years|
+--------------------+-----------+-----------+-----------+-----------+-----------+-----------+
|             Farmers|          0|     462027|    2031235|     316206|          0|          0|
|       Career Loners|     143998|     176639|      25701|     105193|      32558|          0|
|Retired and Relig...|     126350|     336631|    2975266|    1687711|     335357|      61124|
|Successful hedonists|      42261|  171278764| 1223362814| 1563071675|  200340129|      15518|
|         Living well|     460528|    2965303|    1795405|     331304|          0|          0|
|      Average Family|          0|   23682805|    7789464|     412490|     226281|          0|
|    Cruising Seniors|          0|      43302|     303601|     529354|     716425|     139538|
|Conservative fami...|      69390|    2381485|    

In [0]:
df.groupBy('Customer_main_type').pivot('label').sum('Avg_salary').fillna(0).show()

+--------------------+----------+---------+
|  Customer_main_type|         0|        1|
+--------------------+----------+---------+
|             Farmers|   2734832|    74636|
|       Career Loners|    484089|        0|
|Retired and Relig...|   5328410|   194029|
|Successful hedonists|2720381462|437729699|
|         Living well|   5453384|    99156|
|      Average Family|  26036999|  6074041|
|    Cruising Seniors|   1675841|    56379|
|Conservative fami...|   6595027|   368016|
|      Driven Growers|   4492465|   799810|
|Family with grown...|  14394094|   843798|
+--------------------+----------+---------+



Dividimos los datos, según la columna `Customer_main_type`, y tomamos la suma acumulada del `Avg_Salary` de cada uno de los valores de la etiqueta (0,1), usando la función **pivote**.

### Window OperationsWindow Functions or Windowed Aggregates
Esta funcionalidad en **PySpark** le permite realizar ciertas operaciones en grupos de registros conocidos como `within the window`("dentro de la ventana"). Calcula los resultados para cada fila dentro de la ventana. Un ejemplo clásico del uso de la ventana son las diversas agregaciones para un usuario durante diferentes sesiones. Un visitante puede tener múltiples sesiones en un sitio web en particular y, por lo tanto, la ventana se puede usar para contar las actividades totales del usuario durante cada sesión. PySpark admite tres tipos de funciones de ventana:

* Aggregations
* Ranking
* Analytics

En el siguiente ejemplo, importamos la función de ventana, además de otras, como número_fila. El siguiente paso es definir la ventana. A veces puede ser simplemente una columna ordenada o, a veces, puede basarse en categorías particulares dentro de una columna. Veremos ejemplos de cada uno de ellos. En el primer ejemplo, definimos la ventana, que solo se basa en la columna Ordenada Salario promedio, y clasificamos estos salarios. Creamos una nueva columna "rango" y asignamos rangos a cada uno de los valores de Salario promedio.

In [0]:
## Ranking 

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import udf,rank, col,row_number

In [0]:
#create a window function to order the relevant column( Avg Salary)
win = Window.orderBy(df['Avg_Salary'].desc())

In [0]:
#create a additonal column with row numbers as rank
df=df.withColumn('rank', row_number().over(win).alias('rank'))

In [0]:
df.show()

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|constant|age_category|rank|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
| High status seniors|               1|                 2|60-70 years|Successful hedonists|  48919896|    0| finance|    Very Old|   1|
|High Income, expe...|               1|                 2|50-60 years|Successful hedonists|  48177970|    0| finance|         Old|   2|
|High Income, expe...|               1|                 2|50-60 years|Successful hedonists|  48069548|    1| finance|         Old|   3|
|High Income, expe...|               1|                 3|40-50 years|Successful hedonists|  46911924|    0| finance|         Old|   4|
| High status seniors|               1|         

Un requisito común es encontrar los tres valores principales de una categoría. En este caso, la ventana se puede utilizar para obtener los resultados. En el siguiente ejemplo, definimos la ventana y la partición por la columna del subtipo Cliente. Básicamente, lo que hace es ordenar el salario promedio para cada categoría de subtipo de cliente, por lo que ahora podemos usar el filtro para obtener los tres valores de salario principales para cada grupo.

In [0]:
# Ranking groupwise 
#create a window function to order the relevant column( Avg Salary)
win_1 = Window.partitionBy("Customer_subtype").orderBy(df['Avg_Salary'].desc())

In [0]:
#create a additonal column with row numbers as rank
df=df.withColumn('rank', row_number().over(win_1).alias('rank'))

Ahora que tenemos un nuevo rango de columna que consiste en el rango o cada categoría de Customer_subtype, podemos filtrar fácilmente los tres primeros rangos para cada categoría.

In [0]:
df.groupBy('rank').count().orderBy('rank').show()

+----+-----+
|rank|count|
+----+-----+
|   1|   39|
|   2|   37|
|   3|   36|
|   4|   36|
|   5|   34|
|   6|   34|
|   7|   32|
|   8|   31|
|   9|   31|
|  10|   31|
|  11|   31|
|  12|   31|
|  13|   31|
|  14|   31|
|  15|   31|
|  16|   30|
|  17|   30|
|  18|   27|
|  19|   27|
|  20|   27|
+----+-----+
only showing top 20 rows



In [0]:
# filter top 3 customers from every group
df.filter(col('rank') < 4).show()

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|constant|age_category|rank|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------+------------+----+
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    994077|    0| finance|         Old|   1|
|Affluent senior a...|               1|                 2|50-60 years|Successful hedonists|    983051|    0| finance|         Old|   2|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    944444|    0| finance|         Old|   3|
|Affluent young fa...|               1|                 3|30-40 years|      Average Family|    991838|    0| finance|    Mid Aged|   1|
|Affluent young fa...|               1|         

## Conclusión
En este modulo, analizamos diferentes técnicas para leer, limpiar y preprocesar datos en PySpark. Vismo los métodos para unir un dataframe y crear una tabla dinámica a partir de él. Las secciones finales del modulo cubrieron las UDF y las operaciones basadas en ventanas en PySpark. Los próximos modulos se centrarán en el manejo de transmisión de datos en PySpark y el **Machine learning** mediante MLlib.