# SOLUCIÓN ESTÁNDAR EXTRACCIÓN DE FEATURES CON FEATURETOOLS (PYSPARK)

### Objetivo

El objetivo de este notebook es proporcionar un método estándar para la extracción de nuevas variables utilizando la librería **Featuretools** en PySpark.

Comenzamos inicializando nuestra SparkSession y Spark Context.

In [578]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.context import SQLContext
sqlContext = SQLContext(sc)

spark = SparkSession.builder.master("local[2]").appName("MiPrimer").config("spark.executor.memory", "6g").config("spark.cores.max","4").getOrCreate()
sc = spark.sparkContext

Importamos las librerías que vamos a necesitar:

In [571]:
import pandas as pd
import numpy as np

import featuretools as ft
import featuretools.variable_types as vtypes

import datetime 
import pyspark.sql.functions as F
from pyspark.sql.functions import unix_timestamp, from_unixtime, to_date
from pyspark.sql.functions import pandas_udf, PandasUDFType, udf

### Ejemplo 1:

Cargamos los datos que están disponibles en la librería **Featuretools**

In [572]:
data = ft.demo.load_mock_customer()

#Cargamos un único dataset
customer_df = data['customers'] 

#Creamos el EntitySet
es = ft.EntitySet('name', {'customers': (customer_df, 'customer_id')})

    #Es lo mismo:
    # es = ft.EntitySet(id="name")
    # es = es.entity_from_dataframe(entity_id="customers", index='customer_id', dataframe=customer_df)

#Generamos el dataframe con las nuevas variables:
feature_matrix, feats = ft.dfs(entityset=es, target_entity='customers', max_depth=2)

A continuación creamos un DataFrame de spark a partir de nuestro dataframe:

In [573]:
customer_sp = spark.createDataFrame(customer_df)
target_schema = spark.createDataFrame(feature_matrix.reset_index()).schema


In [574]:
target_schema

StructType(List(StructField(customer_id,LongType,true),StructField(zip_code,StringType,true),StructField(DAY(join_date),LongType,true),StructField(DAY(date_of_birth),LongType,true),StructField(YEAR(join_date),LongType,true),StructField(YEAR(date_of_birth),LongType,true),StructField(MONTH(join_date),LongType,true),StructField(MONTH(date_of_birth),LongType,true),StructField(WEEKDAY(join_date),LongType,true),StructField(WEEKDAY(date_of_birth),LongType,true)))

Este es el dataframe que hemos creado:

In [575]:
customer_sp.show()

+-----------+--------+-------------------+-------------------+
|customer_id|zip_code|          join_date|      date_of_birth|
+-----------+--------+-------------------+-------------------+
|          1|   60091|2011-04-17 10:48:33|1994-07-18 00:00:00|
|          2|   13244|2012-04-15 23:31:04|1986-08-18 00:00:00|
|          3|   13244|2011-08-13 15:42:34|2003-11-21 00:00:00|
|          4|   60091|2011-04-08 20:08:14|2006-08-15 00:00:00|
|          5|   60091|2010-07-17 05:27:50|1984-07-28 00:00:00|
+-----------+--------+-------------------+-------------------+



Para poder realizar la extracción de variables necesitamos la función @pandas_udf de pyspark.

La función *@pandas_udf* lleva 3 atributos:  
- Function: función definida por el usuario  
- ReturnType: el tipo de salida que devuelve
- FunctionType: puede ser **Scalar**, si define una transformación de una o más pandas series a un panda series o **Grouped Map**, si define una transformación de un pandas DataFrame a un pandas Dataframe. En este caso debera incluirse el *schema* en el ReturnType.

Pasos:
1. Cogemos una muestra (unas 5 filas) del dataframe de spark si es muy grande y convertimos la muestra a pandas (toPandas()).  
2. Vamos a realizar en python la parte de featuretools para obtener las nuevas features creadas y generar el schema que pasaremos después a la función pandas_udf.  
3. Para asegurarnos que se generan las mismas variables, generamos/guardarmos las features que devuelve la feature_matrix y usamos la función *calculate_feature_matrix* de **Featuretools**.

In [576]:
@pandas_udf(target_schema, PandasUDFType.GROUPED_MAP)   
def generate_features(df_sp):
    es = ft.EntitySet('name', {'customers': (df_sp, 'customer_id')})
    return ft.calculate_feature_matrix(feats, es).reset_index()

In [577]:
customer_ft = customer_sp.groupby("customer_id").apply(generate_features)
customer_ft.toPandas()

Unnamed: 0,customer_id,zip_code,DAY(join_date),DAY(date_of_birth),YEAR(join_date),YEAR(date_of_birth),MONTH(join_date),MONTH(date_of_birth),WEEKDAY(join_date),WEEKDAY(date_of_birth)
0,5,60091,17,28,2010,1984,7,7,5,5
1,1,60091,17,18,2011,1994,4,7,6,0
2,3,13244,13,21,2011,2003,8,11,5,4
3,2,13244,15,18,2012,1986,4,8,6,0
4,4,60091,8,15,2011,2006,4,8,4,1


Observamos como hemos creado nuevas variables en nuestro dataframe de pyspark.

Observación: realizar el adecuado preprocesamiento de datos previo a ejecutar featuretools: limpieza de nulls, conversión de tipos...

### Ejemplo 2

En el siguiente ejemplo tenemos un dataset de venta de coches el cual vamos a leer con spark:

In [637]:
Ventas_coches = spark.read.csv("Car_sales.csv", header=True)

In [638]:
Ventas_coches.printSchema()

root
 |-- Manufacturer: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Sales_in_thousands: string (nullable = true)
 |-- __year_resale_value: string (nullable = true)
 |-- Vehicle_type: string (nullable = true)
 |-- Price_in_thousands: string (nullable = true)
 |-- Engine_size: string (nullable = true)
 |-- Horsepower: string (nullable = true)
 |-- Wheelbase: string (nullable = true)
 |-- Width: string (nullable = true)
 |-- Length: string (nullable = true)
 |-- Curb_weight: string (nullable = true)
 |-- Fuel_capacity: string (nullable = true)
 |-- Fuel_efficiency: string (nullable = true)
 |-- Latest_Launch: string (nullable = true)
 |-- Power_perf_factor: string (nullable = true)



Observamos como el esquema de las variables no corresponde con el tipo que es cada una. Procedemos a cambiar el tipo de cada una de ellas, fijándonos bien en la columna *Latest_Launch* que es de tipo fecha.

In [639]:
Ventas_coches = Ventas_coches.withColumn('Sales_in_thousands', F.col('Sales_in_thousands').cast('float'))\
.withColumn('__year_resale_value', F.col('__year_resale_value').cast('float'))\
.withColumn('Price_in_thousands', F.col('Price_in_thousands').cast('float'))\
.withColumn('Engine_size', F.col('Engine_size').cast('float'))\
.withColumn('Horsepower', F.col('Horsepower').cast('int'))\
.withColumn('Wheelbase', F.col('Wheelbase').cast('float'))\
.withColumn('Width', F.col('Width').cast('float'))\
.withColumn('Length', F.col('Length').cast('float'))\
.withColumn('Curb_weight', F.col('Curb_weight').cast('float'))\
.withColumn('Fuel_capacity', F.col('Fuel_capacity').cast('float'))\
.withColumn('Fuel_efficiency', F.col('Fuel_efficiency').cast('float'))\
.withColumn('Latest_Launch',to_date(unix_timestamp('Latest_Launch', "d/M/yyyy").cast('timestamp')))\
.withColumn('Power_perf_factor', F.col('Power_perf_factor').cast('float'))

Una vez realizado, vemos como todas las variables son del tipo correspondiente.

In [640]:
Ventas_coches.printSchema()

root
 |-- Manufacturer: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Sales_in_thousands: float (nullable = true)
 |-- __year_resale_value: float (nullable = true)
 |-- Vehicle_type: string (nullable = true)
 |-- Price_in_thousands: float (nullable = true)
 |-- Engine_size: float (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Wheelbase: float (nullable = true)
 |-- Width: float (nullable = true)
 |-- Length: float (nullable = true)
 |-- Curb_weight: float (nullable = true)
 |-- Fuel_capacity: float (nullable = true)
 |-- Fuel_efficiency: float (nullable = true)
 |-- Latest_Launch: date (nullable = true)
 |-- Power_perf_factor: float (nullable = true)



Para ejecutar *featuretools* debe haber un Id único para cada registro. Creamos una columna adicional que nos genere este Id.

In [641]:
from pyspark.sql.functions import monotonically_increasing_id
Ventas_coches = Ventas_coches.withColumn('Id', monotonically_increasing_id())

Como hemos comentado, el dataframe debe estar limpio de registros nulos. Veamos cuántos tenemos:

In [642]:
Ventas_coches.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in Ventas_coches.columns]).show()
# Ventas_coches.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in Ventas_coches.columns]).show()


+------------+-----+------------------+-------------------+------------+------------------+-----------+----------+---------+-----+------+-----------+-------------+---------------+-------------+-----------------+---+
|Manufacturer|Model|Sales_in_thousands|__year_resale_value|Vehicle_type|Price_in_thousands|Engine_size|Horsepower|Wheelbase|Width|Length|Curb_weight|Fuel_capacity|Fuel_efficiency|Latest_Launch|Power_perf_factor| Id|
+------------+-----+------------------+-------------------+------------+------------------+-----------+----------+---------+-----+------+-----------+-------------+---------------+-------------+-----------------+---+
|           0|    0|                 0|                 36|           0|                 2|          1|         1|        1|    1|     1|          2|            1|              3|          100|                2|  0|
+------------+-----+------------------+-------------------+------------+------------------+-----------+----------+---------+-----+------

Para este caso concreto, vamos a filtrar por aquellos registros que no son nulos y elimninar las columnas que no nos son necesarias para generar nuevas variables.

In [643]:
Ventas_coches2 = Ventas_coches.where(Ventas_coches.Latest_Launch.isNotNull()).drop('__year_resale_value')\
.drop('Engine_size')\
.drop('Fuel_efficiency')\
.drop('Price_in_thousands')\
.drop('Power_perf_factor')

In [644]:
Ventas_coches2.count()

57

In [645]:
Ventas_coches2.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in Ventas_coches2.columns]).show()


+------------+-----+------------------+------------+----------+---------+-----+------+-----------+-------------+-------------+---+
|Manufacturer|Model|Sales_in_thousands|Vehicle_type|Horsepower|Wheelbase|Width|Length|Curb_weight|Fuel_capacity|Latest_Launch| Id|
+------------+-----+------------------+------------+----------+---------+-----+------+-----------+-------------+-------------+---+
|           0|    0|                 0|           0|         0|        0|    0|     0|          0|            0|            0|  0|
+------------+-----+------------------+------------+----------+---------+-----+------+-----------+-------------+-------------+---+



Generamos la muestra convirtiendo a pandas nuestro dataframe.

In [646]:
# Ventas_coches_prueba = Ventas_coches.select(['Id', 'Manufacturer', 'Latest_Launch']).toPandas().head(5)
Ventas_coches_prueba = Ventas_coches2.toPandas().head(5)

In [690]:
Ventas_coches_prueba

Unnamed: 0,Manufacturer,Model,Sales_in_thousands,Vehicle_type,Horsepower,Wheelbase,Width,Length,Curb_weight,Fuel_capacity,Latest_Launch,Id
0,Acura,Integra,16.919001,Passenger,140,101.199997,67.300003,172.399994,2.639,13.2,2012-02-02,0
1,Acura,TL,39.383999,Passenger,225,108.099998,70.300003,192.899994,3.517,17.200001,2011-03-06,1
2,Acura,CL,14.114,Passenger,225,106.900002,70.599998,192.0,3.47,17.200001,2012-04-01,2
3,Acura,RL,8.588,Passenger,210,114.599998,71.400002,196.600006,3.85,18.0,2011-10-03,3
4,Audi,A4,20.396999,Passenger,150,102.599998,68.199997,178.0,2.998,16.4,2011-08-10,4


  
  Creamos el EntitySet con un id cualquiera, en este caso 'name'. Añadimos la nueva entidad que corresponde con el dataframe de muestra que hemos generado.  
Como index le pasamos la columna *Id* previamente creada y como entity_id el nombre que queramos para esa entidad.

In [648]:
es = ft.EntitySet(id="name")
es = es.entity_from_dataframe(entity_id="dataprueba", index='Id', dataframe=Ventas_coches_prueba)

#Generamos el dataframe con las nuevas variables:
feature_matrix, feats = ft.dfs(entityset=es, target_entity='dataprueba', max_depth=2)

In [649]:
feature_matrix

Unnamed: 0_level_0,Manufacturer,Model,Sales_in_thousands,Vehicle_type,Horsepower,Wheelbase,Width,Length,Curb_weight,Fuel_capacity,DAY(Latest_Launch),YEAR(Latest_Launch),MONTH(Latest_Launch),WEEKDAY(Latest_Launch)
Id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
0,Acura,Integra,16.919001,Passenger,140,101.199997,67.300003,172.399994,2.639,13.2,2,2012,2,3
1,Acura,TL,39.383999,Passenger,225,108.099998,70.300003,192.899994,3.517,17.200001,6,2011,3,6
2,Acura,CL,14.114,Passenger,225,106.900002,70.599998,192.0,3.47,17.200001,1,2012,4,6
3,Acura,RL,8.588,Passenger,210,114.599998,71.400002,196.600006,3.85,18.0,3,2011,10,0
4,Audi,A4,20.396999,Passenger,150,102.599998,68.199997,178.0,2.998,16.4,10,2011,8,2


Una vez hemos creado la matriz de nuevas variables la convertimos a un DataFrame de spark y nos guardamos el schema que necesitaremos después para pasarlo como atributo en la función @pandas_udf.

In [650]:
# customer_sp = spark.createDataFrame(customer_df)
target_schema = spark.createDataFrame(feature_matrix.reset_index()).schema


In [651]:
target_schema

StructType(List(StructField(Id,LongType,true),StructField(Manufacturer,StringType,true),StructField(Model,StringType,true),StructField(Sales_in_thousands,DoubleType,true),StructField(Vehicle_type,StringType,true),StructField(Horsepower,LongType,true),StructField(Wheelbase,DoubleType,true),StructField(Width,DoubleType,true),StructField(Length,DoubleType,true),StructField(Curb_weight,DoubleType,true),StructField(Fuel_capacity,DoubleType,true),StructField(DAY(Latest_Launch),LongType,true),StructField(YEAR(Latest_Launch),LongType,true),StructField(MONTH(Latest_Launch),LongType,true),StructField(WEEKDAY(Latest_Launch),LongType,true)))

Definimos la función pasandole los argumentos:

In [652]:
@pandas_udf(target_schema, PandasUDFType.GROUPED_MAP)   
def generate_features(df_sp):
    es = ft.EntitySet('name', {'dataprueba': (df_sp, 'Id')})
    return ft.calculate_feature_matrix(feats, es).reset_index()

Aplicamos la función sobre la columna *Id* del dataframe de spark:

In [653]:
data_ft = Ventas_coches2.groupby("Id").apply(generate_features)


In [654]:
data_ft.show(5)

+---+------------+--------------+------------------+------------+----------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+--------------------+----------------------+
| Id|Manufacturer|         Model|Sales_in_thousands|Vehicle_type|Horsepower|         Wheelbase|            Width|            Length|       Curb_weight|     Fuel_capacity|DAY(Latest_Launch)|YEAR(Latest_Launch)|MONTH(Latest_Launch)|WEEKDAY(Latest_Launch)|
+---+------------+--------------+------------------+------------+----------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+--------------------+----------------------+
|  0|       Acura|       Integra| 16.91900062561035|   Passenger|       140|101.19999694824219|67.30000305175781|172.39999389648438|2.6389999389648438|13.199999809265137|                 2|               2012|                   2|        

Observamos que hemos obtenido las nuevas variables correspondientes a la fecha que era la única que podía generar nuevas features.

### Próximos pasos:

Ahora nos preguntamos cómo agrupar nuestro dataframe de manera que las features generadas sean agregaciones y no transformaciones.  
Realizamos los mismos pasos que antes:

In [720]:
Ventas_coches_prueba = Ventas_coches2.toPandas().head(10)

In [721]:
es = ft.EntitySet(id="name")
es = es.entity_from_dataframe(entity_id="dataprueba", index='Id', dataframe=Ventas_coches_prueba)

Añadimos una nueva entidad a nuestro EntitySet. Al tener una única tabla esta nueva entidad será la columna por la que queramos agrupar, en nuestro caso: 'Manufacturer'

In [722]:
es = es.normalize_entity(base_entity_id="dataprueba", new_entity_id="Manufacturer", index="Manufacturer")


In [723]:
es

Entityset: name
  Entities:
    dataprueba [Rows: 10, Columns: 12]
    Manufacturer [Rows: 5, Columns: 1]
  Relationships:
    dataprueba.Manufacturer -> Manufacturer.Manufacturer

Generamos la matriz nueva de variables y guardamos el schema para añadirlo como atributo después:

In [724]:
feature_matrix, feats = ft.dfs(entityset=es, target_entity='Manufacturer')

In [725]:
target_schema = spark.createDataFrame(feature_matrix.reset_index()).schema


In [726]:
feature_matrix

Unnamed: 0_level_0,SUM(dataprueba.Sales_in_thousands),SUM(dataprueba.Horsepower),SUM(dataprueba.Wheelbase),SUM(dataprueba.Width),SUM(dataprueba.Length),SUM(dataprueba.Curb_weight),SUM(dataprueba.Fuel_capacity),STD(dataprueba.Sales_in_thousands),STD(dataprueba.Horsepower),STD(dataprueba.Wheelbase),...,MODE(dataprueba.Model),MODE(dataprueba.Vehicle_type),NUM_UNIQUE(dataprueba.DAY(Latest_Launch)),NUM_UNIQUE(dataprueba.YEAR(Latest_Launch)),NUM_UNIQUE(dataprueba.MONTH(Latest_Launch)),NUM_UNIQUE(dataprueba.WEEKDAY(Latest_Launch)),MODE(dataprueba.DAY(Latest_Launch)),MODE(dataprueba.YEAR(Latest_Launch)),MODE(dataprueba.MONTH(Latest_Launch)),MODE(dataprueba.WEEKDAY(Latest_Launch))
Manufacturer,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
Acura,79.004997,800,430.799988,279.600006,753.900024,13.476,65.599998,13.53838,40.620192,5.497272,...,CL,Passenger,4,2,4,3,1,2011,2,6
Audi,39.177002,350,211.299988,144.299988,370.0,6.559,34.900002,1.143391,35.355339,4.31335,...,A4,Passenger,2,1,2,2,8,2011,8,2
BMW,17.527,193,111.400002,70.900002,188.0,3.472,18.5,,,,...,528i,Passenger,1,1,1,1,4,2011,4,0
Buick,130.910995,415,218.0,145.399994,390.799988,6.911,35.0,36.918751,45.961941,0.0,...,Century,Passenger,2,1,2,2,9,2011,2,2
Chevrolet,17.947001,345,104.5,73.599998,179.699997,3.21,19.1,,,,...,Corvette,Passenger,1,1,1,1,5,2012,12,2


Seguimos los mismos pasos que antes recordando que hay que añadir la nueva entidad:

In [727]:
@pandas_udf(target_schema, PandasUDFType.GROUPED_MAP)   
def generate_features(df_sp):
    es = ft.EntitySet('name', {'dataprueba': (df_sp, 'Id')})
    es = es.normalize_entity(base_entity_id="dataprueba", new_entity_id="Manufacturer", index="Manufacturer")
    return ft.calculate_feature_matrix(feats, es).reset_index()

Aplicamos la función sobre la columna *Manufacturer* del dataframe de spark que es por la que queremos agregar:

In [729]:
data_ft = Ventas_coches2.groupby("Manufacturer").apply(generate_features)


Observamos que el resultado es el esperado:

In [730]:
data_ft.toPandas()

Unnamed: 0,Manufacturer,SUM(dataprueba.Sales_in_thousands),SUM(dataprueba.Horsepower),SUM(dataprueba.Wheelbase),SUM(dataprueba.Width),SUM(dataprueba.Length),SUM(dataprueba.Curb_weight),SUM(dataprueba.Fuel_capacity),STD(dataprueba.Sales_in_thousands),STD(dataprueba.Horsepower),...,MODE(dataprueba.Model),MODE(dataprueba.Vehicle_type),NUM_UNIQUE(dataprueba.DAY(Latest_Launch)),NUM_UNIQUE(dataprueba.YEAR(Latest_Launch)),NUM_UNIQUE(dataprueba.MONTH(Latest_Launch)),NUM_UNIQUE(dataprueba.WEEKDAY(Latest_Launch)),MODE(dataprueba.DAY(Latest_Launch)),MODE(dataprueba.YEAR(Latest_Launch)),MODE(dataprueba.MONTH(Latest_Launch)),MODE(dataprueba.WEEKDAY(Latest_Launch))
0,Volkswagen,5.596,115,98.900002,68.300003,163.300003,2.762,14.6,,,...,GTI,Passenger,1,1,1,1,4,2011,1,1
1,Oldsmobile,38.554001,215,109.0,73.599998,195.899994,3.455,18.0,,,...,Intrigue,Passenger,1,1,1,1,4,2011,1,1
2,Lexus,88.007996,655,318.299988,212.899994,559.5,10.911,55.5,19.801897,7.637626,...,ES300,Passenger,3,1,3,2,1,2012,4,4
3,Jaguar,15.467,240,114.5,71.599998,191.300003,3.65,18.4,,,...,S-Type,Passenger,1,1,1,1,11,2012,3,6
4,Saturn,58.460999,274,213.0,138.0,380.799988,5.985,26.200001,29.356951,0.0,...,LS,Passenger,2,2,2,2,8,2011,4,3
5,Jeep,293.152985,505,300.700012,208.399994,501.0,10.119,59.5,52.873352,41.932485,...,Cherokee,Car,3,2,3,3,3,2012,4,1
6,Chevrolet,50.246002,465,201.600006,140.299988,354.0,5.608,32.299999,10.148396,159.099026,...,Corvette,Passenger,2,2,2,1,5,2011,11,2
7,Hyundai,41.183998,92,96.099998,65.699997,166.699997,2.24,11.9,,,...,Accent,Passenger,1,1,1,1,9,2012,10,1
8,Saab,21.306,355,209.0,138.0,371.399994,6.27,35.400002,2.06758,10.606602,...,3-Sep,Passenger,2,2,2,1,6,2011,9,1
9,Honda,88.884003,415,224.5,146.0,379.399994,8.145,41.099998,44.670765,3.535534,...,Odyssey,Car,2,1,2,2,2,2012,8,0
