In [1]:
%load_ext watermark
%watermark

2019-05-25T16:58:26+01:00

CPython 3.6.8
IPython 7.5.0

compiler   : GCC 7.3.0
system     : Linux
release    : 4.15.0-50-generic
machine    : x86_64
processor  : x86_64
CPU cores  : 8
interpreter: 64bit


In [2]:
import warnings
warnings.simplefilter("ignore")

Spark tiene varios modulos, entre ellos un modulo de [Machine Learning](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html), vamos a ver como usarlo.

Creamos una sesion de spark, uso 6 cores por que si uso mas me quedo sin memoria en mi ordenador

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
     .master("local[6]") \
     .appName("Taxis") \
     .getOrCreate()
spark

Vamos a replicar el pipeline que usamos con dask

```
pipeline = make_pipeline(
    make_union(
        make_pipeline(
            ColumnSelector(cols=["vendor_id", "store_and_fwd_flag", "payment_type", "rate_code"]),
            OneHotEncoder()
        ),
        make_pipeline(
            ColumnSelector(cols=["pickup_longitude", "pickup_latitude", "passenger_count"]),
            StandardScaler()
        )  
    ),
    SGDRegressor()
)
```

In [3]:
variable_objetivo = "tip_amount"
variables_independientes = ["vendor_id", "store_and_fwd_flag", "payment_type",
               "rate_code", "pickup_longitude", "pickup_latitude", 
               "passenger_count"]

Vamos a leer el dataset de 2014 de trayectos de taxis

In [6]:
taxi = (
    spark
    .read
    .parquet("../data/nyc_taxi_data_2014.parquet/")
    .select(variables_independientes + [variable_objetivo])
    .withColumnRenamed(variable_objetivo, "label")
)

In [7]:
taxi.head()

Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-73.943106, pickup_latitude=40.706701, passenger_count=1, label=1.0)

Spark tiene la figura de [Pipelines](https://spark.apache.org/docs/latest/ml-pipeline.html) que es similar a la de scikit-learn, en particular define 2 tipos de objetos:

- **Transformers**, que transforman el dataset de entrenamiento (encoders, standardizadores, etc)
- **Estimators**, que realizan las prediciones (Regresion lineal, Random Forest, etc.

Los transformadores de Spark funcionan con **1 sola columna como input** y **1 columna como output**.

Los estimadores de spark funcionan con **1 columna como input** (llamada `features`) y **1 columna como output** (generalmente llamada `labels`)

Para el pipeline vamos a usar los siguientes transformadores:
    
- `StringIndexer`, que convierte una columna de strings a numeros
- `OneHotEncoder`, que realiza codificación one hot de enteros
- `StandardScaler`, que estandariza un vector de numeros
- `VectorAssembler`, que toma un conjunto de columnas y las agrupa en una sola columna de vectores

In [8]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler

In [9]:
variables_categoricas = ["vendor_id", "store_and_fwd_flag", "payment_type", "rate_code"]
variables_numericas = ["pickup_longitude", "pickup_latitude", "passenger_count"]

el StringIndexer y OneHotEncoder de spark trabajan con una sola columna, por lo tanto tenemos que crear  una lista de indexadores y encoders (uno de cada para cada columna)

In [10]:
indexers = []
encoders = []
for columna in variables_categoricas:
    indexers.append(
        StringIndexer(inputCol=columna, outputCol=f"{columna}_idx)", handleInvalid="keep"))
    encoders.append(
        OneHotEncoder(inputCol=f"{columna}_idx)", outputCol=f"{columna}_enc"))

In [11]:
indexers

[StringIndexer_0355836cdcfd,
 StringIndexer_09e1ed4bde22,
 StringIndexer_b9584fc7d362,
 StringIndexer_884a41018097]

In [12]:
encoders

[OneHotEncoder_70fb743ff741,
 OneHotEncoder_fb43e9837251,
 OneHotEncoder_c3962db20440,
 OneHotEncoder_f841bd12d78d]

In [13]:
encoders[0].getInputCol()

'vendor_id_idx)'

In [14]:
encoders[0].getOutputCol()

'vendor_id_enc'

Ahora creamos un [VectorAssembler](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler) que tomara el output de los encoders y los convertira en un vector

In [15]:
ensamblador_categorico = VectorAssembler(
                            inputCols=[enc.getOutputCol() for enc in encoders], 
                            outputCol="vector_categorico")

In [16]:
ensamblador_categorico

VectorAssembler_882866db619c

Ahora creamos un pipeline para datos categoricos, que va a ser una lista con los siguentes pasos (o `stages` como se llaman en spark):

1. indexers, que convierte strings de variables categoricas a numeros

2.encoders, que convierten strings a codificacion one hot

3. ensamblador, que  cogen el output de distintos encoders y los unen en un vector unico

In [17]:
from pyspark.ml.pipeline import Pipeline

In [18]:
pipeline_categorico = Pipeline(stages=indexers +encoders + [ensamblador_categorico])

In [19]:
type(pipeline_categorico)

pyspark.ml.pipeline.Pipeline

Podemos evaluar el pipeline

In [20]:
taxi_cat = pipeline_categorico.fit(taxi).transform(taxi)

In [21]:
taxi_cat

DataFrame[vendor_id: string, store_and_fwd_flag: string, payment_type: string, rate_code: bigint, pickup_longitude: double, pickup_latitude: double, passenger_count: bigint, label: double, vendor_id_idx): double, store_and_fwd_flag_idx): double, payment_type_idx): double, rate_code_idx): double, vendor_id_enc: vector, store_and_fwd_flag_enc: vector, payment_type_enc: vector, rate_code_enc: vector, vector_categorico: vector]

In [22]:
taxi_cat.head(2)

[Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-73.943106, pickup_latitude=40.706701, passenger_count=1, label=1.0, vendor_id_idx)=1.0, store_and_fwd_flag_idx)=0.0, payment_type_idx)=0.0, rate_code_idx)=0.0, vendor_id_enc=SparseVector(2, {1: 1.0}), store_and_fwd_flag_enc=SparseVector(2, {0: 1.0}), payment_type_enc=SparseVector(5, {0: 1.0}), rate_code_enc=SparseVector(12, {0: 1.0}), vector_categorico=SparseVector(21, {1: 1.0, 2: 1.0, 4: 1.0, 9: 1.0})),
 Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-74.006878, pickup_latitude=40.739494, passenger_count=1, label=2.4, vendor_id_idx)=1.0, store_and_fwd_flag_idx)=0.0, payment_type_idx)=0.0, rate_code_idx)=0.0, vendor_id_enc=SparseVector(2, {1: 1.0}), store_and_fwd_flag_enc=SparseVector(2, {0: 1.0}), payment_type_enc=SparseVector(5, {0: 1.0}), rate_code_enc=SparseVector(12, {0: 1.0}), vector_categorico=SparseVector(21, {1: 1.0, 2: 1.0, 4: 1.0, 9

**Variables numéricas**

Para las variables numericas simplemente creamos un standard scaler las variables numericas.

El [StandardScaler](https://spark.apache.org/docs/latest/ml-features.html#standardscaler) de spark funciona con una lista de vectores, estandarizando cada variable. Por ello ponemos primero todas las variables numericas en un vector mediante el uso de un VectorAssembler.

In [23]:
ensamblador_numerico = VectorAssembler(inputCols=variables_numericas, 
                                       outputCol="vector_numerico")    

estandarizador = StandardScaler(inputCol="vector_numerico", outputCol="vector_numerico_std")    

pipeline_numerico = Pipeline(stages=[ensamblador_numerico, estandarizador])

In [24]:
taxi_num = pipeline_numerico.fit(taxi).transform(taxi)

In [25]:
taxi_num.head(2)

[Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-73.943106, pickup_latitude=40.706701, passenger_count=1, label=1.0, vector_numerico=DenseVector([-73.9431, 40.7067, 1.0]), vector_numerico_std=DenseVector([-8.6117, 8.5471, 0.719])),
 Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-74.006878, pickup_latitude=40.739494, passenger_count=1, label=2.4, vector_numerico=DenseVector([-74.0069, 40.7395, 1.0]), vector_numerico_std=DenseVector([-8.6192, 8.554, 0.719]))]

Ahora que tenemos un pipeline para variables categoricas y otro para variables numericas creamos un pipeline que sea la union del output de ambas, dicho output lo llamamos por convencion `features`:

In [26]:
ensamblador_procesado = VectorAssembler(inputCols=["vector_numerico", "vector_categorico"], 
                                         outputCol="features")  

pipeline_procesado = Pipeline(stages=[pipeline_numerico, pipeline_categorico, 
                                      ensamblador_procesado])

In [27]:
taxi_procesado = pipeline_procesado.fit(taxi).transform(taxi)

In [28]:
taxi_procesado.head(2)

[Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-73.943106, pickup_latitude=40.706701, passenger_count=1, label=1.0, vector_numerico=DenseVector([-73.9431, 40.7067, 1.0]), vector_numerico_std=DenseVector([-8.6117, 8.5471, 0.719]), vendor_id_idx)=1.0, store_and_fwd_flag_idx)=0.0, payment_type_idx)=0.0, rate_code_idx)=0.0, vendor_id_enc=SparseVector(2, {1: 1.0}), store_and_fwd_flag_enc=SparseVector(2, {0: 1.0}), payment_type_enc=SparseVector(5, {0: 1.0}), rate_code_enc=SparseVector(12, {0: 1.0}), vector_categorico=SparseVector(21, {1: 1.0, 2: 1.0, 4: 1.0, 9: 1.0}), features=SparseVector(24, {0: -73.9431, 1: 40.7067, 2: 1.0, 4: 1.0, 5: 1.0, 7: 1.0, 12: 1.0})),
 Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-74.006878, pickup_latitude=40.739494, passenger_count=1, label=2.4, vector_numerico=DenseVector([-74.0069, 40.7395, 1.0]), vector_numerico_std=DenseVector([-8.6192, 8.554, 0.719]), vendor_i

Ahora definimos el estimador, usaremos un modelo de regresion lineal por descenso de gradiente. 

Los estimadores de Spark, aparte de los hiperparámetros caracteristicos de cada algoritmo, tienen ciertos argumentos comunes:

- `featuresCol`, el nombre de la columna de las variables independientes (por defecto, "features")
- `labelCol`, el nombre de la columna de la variable independiente (por defecto, "labels")
- `predictionCol`, el nombre para las predicciones (por defecto, "prediction")

In [29]:
from pyspark.ml.regression import LinearRegression

In [30]:
lr = LinearRegression(maxIter=100, regParam=0.3, elasticNetParam=0.8, 
                      featuresCol="features", labelCol="label",
                      predictionCol="prediccion")
pipeline = Pipeline(stages=[pipeline_procesado, lr])

hacemos `fit`, esto tarda unos 4 minutos en mi ordenador

In [33]:
%time modelo = pipeline.fit(taxi)

CPU times: user 144 ms, sys: 29.3 ms, total: 173 ms
Wall time: 2min 32s


In [34]:
modelo

PipelineModel_d2e6153fce94

Ahora para obtener predicciones hacemos `transform`, no predict

In [35]:
modelo.transform(taxi).head()

Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-73.943106, pickup_latitude=40.706701, passenger_count=1, label=1.0, vector_numerico=DenseVector([-73.9431, 40.7067, 1.0]), vector_numerico_std=DenseVector([-8.6117, 8.5471, 0.719]), vendor_id_idx)=1.0, store_and_fwd_flag_idx)=0.0, payment_type_idx)=0.0, rate_code_idx)=0.0, vendor_id_enc=SparseVector(2, {1: 1.0}), store_and_fwd_flag_enc=SparseVector(2, {0: 1.0}), payment_type_enc=SparseVector(5, {0: 1.0}), rate_code_enc=SparseVector(12, {0: 1.0}), vector_categorico=SparseVector(21, {1: 1.0, 2: 1.0, 4: 1.0, 9: 1.0}), features=SparseVector(24, {0: -73.9431, 1: 40.7067, 2: 1.0, 4: 1.0, 5: 1.0, 7: 1.0, 12: 1.0}), prediccion=2.1970520795492154)

Podemos guardar el output a otro archivo

In [39]:
%%time
(modelo
 .transform(taxi)
 .select(
    variables_categoricas+variables_numericas+["prediccion"]
 )
 .write
 .parquet("taxi_2014_prediccion")
)

CPU times: user 33.9 ms, sys: 15.1 ms, total: 49 ms
Wall time: 42.1 s


In [40]:
taxi_pred = spark.read.parquet("taxi_2014_prediccion/")

In [41]:
taxi_pred.head()

Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-73.943106, pickup_latitude=40.706701, passenger_count=1, prediccion=2.1970520795492154)

In [42]:
taxi_pred.select("prediccion").describe().collect()

[Row(summary='count', prediccion='14999999'),
 Row(summary='mean', prediccion='1.4559072103899229'),
 Row(summary='stddev', prediccion='1.0965019704659398'),
 Row(summary='min', prediccion='0.2470778372101039'),
 Row(summary='max', prediccion='5.683756534354175')]

# Validacion cruzada y Busquedas de hiperparámetros

Spark es capaz de [validar y optimizar modelos](https://spark.apache.org/docs/latest/ml-tuning.html). Por desgracia no tiene la figura del RandomizedSearchCV, sin embargo si que tiene la busqueda en malla (`ParamGridBuilder`)

In [43]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [44]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [1e-3, 1.]) \
    .addGrid(lr.elasticNetParam, [1e-3, 1.]) \
    .build()
type(paramGrid)

list

Vemos que hay 4 combinaciones distintas que va a buscar la malla

In [47]:
paramGrid

[{Param(parent='LinearRegression_2aa128e77d15', name='regParam', doc='regularization parameter (>= 0).'): 0.001,
  Param(parent='LinearRegression_2aa128e77d15', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.001},
 {Param(parent='LinearRegression_2aa128e77d15', name='regParam', doc='regularization parameter (>= 0).'): 0.001,
  Param(parent='LinearRegression_2aa128e77d15', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 1.0},
 {Param(parent='LinearRegression_2aa128e77d15', name='regParam', doc='regularization parameter (>= 0).'): 1.0,
  Param(parent='LinearRegression_2aa128e77d15', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.001},
 {Param(

In [45]:
len(paramGrid)

4

Creamos un evaluador de regression que sera el encargado de evaluar el funcionamiento de cada estimador

In [48]:
evaluador = RegressionEvaluator(metricName='rmse', predictionCol='prediccion')

creamos un [`CrossValidator`](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) que se encargue de la validacion cruzada

In [49]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluador,
                          numFolds=5)

In [50]:
type(crossval)

pyspark.ml.tuning.CrossValidator

Ahora entrenamos la busqueda, esto tarda unos 35 minutos en mi ordenador!

In [51]:
%time cvModel = crossval.fit(taxi)

CPU times: user 4.1 s, sys: 1.13 s, total: 5.24 s
Wall time: 27min 29s


Una vez ha terminado podemos ver los resultados de cada iteracion

In [52]:
cvModel.avgMetrics

[1.7172700256296567,
 1.7172883634559573,
 1.7372954404496626,
 2.1302285248602355]

Y podemos ver los resultados de cada combinacion de hiperparámetros

In [53]:
list(zip(cvModel.avgMetrics, paramGrid))

[(1.7172700256296567,
  {Param(parent='LinearRegression_2aa128e77d15', name='regParam', doc='regularization parameter (>= 0).'): 0.001,
   Param(parent='LinearRegression_2aa128e77d15', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.001}),
 (1.7172883634559573,
  {Param(parent='LinearRegression_2aa128e77d15', name='regParam', doc='regularization parameter (>= 0).'): 0.001,
   Param(parent='LinearRegression_2aa128e77d15', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 1.0}),
 (1.7372954404496626,
  {Param(parent='LinearRegression_2aa128e77d15', name='regParam', doc='regularization parameter (>= 0).'): 1.0,
   Param(parent='LinearRegression_2aa128e77d15', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty

Podemos  ver los pasos del pipeline

In [54]:
cvModel.bestModel.stages

[PipelineModel_2e66bde88a1c, LinearRegression_2aa128e77d15]

Y finalmente, podemos ver los hiperparametros del mejor estimador:

In [56]:
cvModel.bestModel.stages[1].extractParamMap()

{Param(parent='LinearRegression_2aa128e77d15', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LinearRegression_2aa128e77d15', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.001,
 Param(parent='LinearRegression_2aa128e77d15', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0.'): 1.35,
 Param(parent='LinearRegression_2aa128e77d15', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LinearRegression_2aa128e77d15', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LinearRegression_2aa128e77d15', name='labelCol', doc='label column name'): 'label',
 Param(parent='LinearRegression_2aa128e77d15', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber. (Default squaredError)'): 'squaredError',
 Param(