In [68]:
%matplotlib inline
import findspark
import os

In [69]:
findspark.init(os.environ['SPARK_HOME'])

In [70]:
%matplotlib inline
import pyspark 
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
#from pyspark.ml.linalg import *
from pyspark.sql.functions import *  
#from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, SQLTransformer, Normalizer, VectorAssembler, VectorIndexer
from pyspark.ml import Pipeline
#from pyspark.ml.feature import SQLTransformer
from pyspark.ml.linalg import Vectors
#from pyspark.ml.feature import Normalizer
#from pyspark.ml.linalg import Vectors
#from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
#from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [71]:
# creamos la sesión de spark
spark = SparkSession\
.builder\
.appName("spark_Parte2")\
.getOrCreate()

In [72]:
profecoSchema = StructType([StructField("producto", StringType(), True), \
                     StructField("presentacion", StringType(), True), \
                     StructField("marca", StringType(), True), \
                     StructField("categoria", StringType(), True), \
                     StructField("catalogo", StringType(), True), \
                     StructField("precio", DoubleType(), True), \
                     StructField("fecharegistro", TimestampType(), True), \
                     StructField("cadenacomercial", StringType(), True), \
                     StructField("giro", StringType(), True), \
                     StructField("nombrecomercial", StringType(), True), \
                     StructField("direccion", StringType(), True), \
                     StructField("estado", StringType(), True), \
                     StructField("municipio", StringType(), True), \
                     StructField("latitud", DoubleType(), True), \
                     StructField("longitud", DoubleType(), True)] )

In [73]:
profecoDf = spark.read.format("csv")\
        .option("delimiter", "|")\
        .option("header","true")\
        .schema(profecoSchema) \
        .option("inferSchema", "true")\
        .load("profeco_final_bash.csv")

In [74]:
profecoDF = spark.read.format('parquet')\
        .load("profecoFinal.parquet")

In [75]:
profecoDf.printSchema()

root
 |-- producto: string (nullable = true)
 |-- presentacion: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- catalogo: string (nullable = true)
 |-- precio: double (nullable = true)
 |-- fecharegistro: timestamp (nullable = true)
 |-- cadenacomercial: string (nullable = true)
 |-- giro: string (nullable = true)
 |-- nombrecomercial: string (nullable = true)
 |-- direccion: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- latitud: double (nullable = true)
 |-- longitud: double (nullable = true)



In [76]:
profecoDF.printSchema()

root
 |-- producto: string (nullable = true)
 |-- presentacion: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- catalogo: string (nullable = true)
 |-- precio: double (nullable = true)
 |-- fecharegistro: timestamp (nullable = true)
 |-- cadenacomercial: string (nullable = true)
 |-- giro: string (nullable = true)
 |-- nombrecomercial: string (nullable = true)
 |-- direccion: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- latitud: double (nullable = true)
 |-- longitud: double (nullable = true)



In [89]:
# Manipulación de variables
sqlTrans = SQLTransformer( \
    statement="SELECT * FROM  __THIS__ \
          WHERE categoria LIKE '%medicamentos%' \
          AND estado='distrito federal' \
          AND MONTH(fecharegistro)=4 \
          AND YEAR(fecharegistro)=2016")

In [90]:
# Formato de label / predictor
#assemblerPrecio = VectorAssembler(inputCols=["precio"],outputCol="precioVec")

In [91]:
profecoDF.count()

45992

In [92]:
# Transformación de variables
#scalerPrecio = Normalizer(inputCol='precioVec', outputCol='scaled_precio')

In [140]:
# Selección de variables
formula = SQLTransformer( \
#    statement="SELECT producto, marca, precio AS label ,cadenacomercial, municipio FROM  __THIS__ GROUP BY producto")
    statement="SELECT producto,marca,cadenacomercial, MEAN(precio) AS label FROM  __THIS__ GROUP BY producto,marca,cadenacomercial")

In [141]:
# Formato de String a categótico
productoIndexer = StringIndexer(inputCol="producto", outputCol="productoIndex")
marcaIndexer = StringIndexer(inputCol="marca", outputCol="marcaIndex")
cadenacomercialIndexer = StringIndexer(inputCol="cadenacomercial", outputCol="cadenacomercialIndex")
#municipioIndexer = StringIndexer(inputCol="municipio", outputCol="municipioIndex")

In [142]:
assembler = VectorAssembler(
#    inputCols=["productoIndex", "marcaIndex","cadenacomercialIndex","municipioIndex"],
    inputCols=["productoIndex", "marcaIndex","cadenacomercialIndex"],
    outputCol="Features")

In [143]:
# Train a random forest model
rf = RandomForestRegressor(labelCol='label',featuresCol='Features',maxBins=400)

In [144]:
#pipeline = Pipeline(stages=[sqlTrans,formula,productoIndexer,marcaIndexer,cadenacomercialIndexer,municipioIndexer,assembler,rf])
pipeline = Pipeline(stages=[sqlTrans,formula,productoIndexer,marcaIndexer,cadenacomercialIndexer,assembler,rf])

In [145]:
paramGrid = ParamGridBuilder()\
    .addGrid(rf.maxDepth, [2,5,7])\
    .addGrid(rf.numTrees, [64,96,128])\
    .build()

In [146]:
tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

In [147]:
model = tvs.fit(profecoDF)

In [148]:
model.estimator

Param(parent='TrainValidationSplitModel_41578b3a6dc0', name='estimator', doc='estimator to be cross-validated')

In [149]:
model.uid

'TrainValidationSplitModel_41578b3a6dc0'

In [150]:
model.bestModel

PipelineModel_aba44237a75b

In [151]:
model.explainParams()

"estimator: estimator to be cross-validated (current: Pipeline_d897a6b29ad8)\nestimatorParamMaps: estimator param maps (current: [{Param(parent='RandomForestRegressor_da68f211ac8f', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2, Param(parent='RandomForestRegressor_da68f211ac8f', name='numTrees', doc='Number of trees to train (>= 1).'): 64}, {Param(parent='RandomForestRegressor_da68f211ac8f', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2, Param(parent='RandomForestRegressor_da68f211ac8f', name='numTrees', doc='Number of trees to train (>= 1).'): 96}, {Param(parent='RandomForestRegressor_da68f211ac8f', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2, Param(parent='RandomForestRegressor_da68f211ac8f', name='numTre

In [152]:
model.extractParamMap()

{Param(parent='TrainValidationSplitModel_41578b3a6dc0', name='seed', doc='random seed.'): -3892216695014581501,
 Param(parent='TrainValidationSplitModel_41578b3a6dc0', name='estimator', doc='estimator to be cross-validated'): Pipeline_d897a6b29ad8,
 Param(parent='TrainValidationSplitModel_41578b3a6dc0', name='estimatorParamMaps', doc='estimator param maps'): [{Param(parent='RandomForestRegressor_da68f211ac8f', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2,
   Param(parent='RandomForestRegressor_da68f211ac8f', name='numTrees', doc='Number of trees to train (>= 1).'): 64},
  {Param(parent='RandomForestRegressor_da68f211ac8f', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2,
   Param(parent='RandomForestRegressor_da68f211ac8f', name='numTrees', doc='Number of trees to train (>= 1).'): 96},
  {Param(parent=

In [153]:
model.bestModel

PipelineModel_aba44237a75b

In [154]:
model.getEstimatorParamMaps()

[{Param(parent='RandomForestRegressor_da68f211ac8f', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2,
  Param(parent='RandomForestRegressor_da68f211ac8f', name='numTrees', doc='Number of trees to train (>= 1).'): 64},
 {Param(parent='RandomForestRegressor_da68f211ac8f', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2,
  Param(parent='RandomForestRegressor_da68f211ac8f', name='numTrees', doc='Number of trees to train (>= 1).'): 96},
 {Param(parent='RandomForestRegressor_da68f211ac8f', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2,
  Param(parent='RandomForestRegressor_da68f211ac8f', name='numTrees', doc='Number of trees to train (>= 1).'): 128},
 {Param(parent='RandomForestRegressor_da68f211ac8f', name='maxDepth',

In [155]:
model.validationMetrics

[261.93832789769846,
 266.62892239546653,
 267.23915944851495,
 239.14360227575764,
 242.0394652082212,
 242.43839226996073,
 236.90668383364613,
 240.50209037453038,
 240.33611929282813]