In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
import pyspark.sql.functions as func
import pandas as pd

import re

In [2]:
sqlContext = SQLContext(sc)
hive_context = HiveContext(sc)

### Importação de tabela do Hive para leitura

In [3]:
df_modelo_final = hive_context.sql("select * from tb_base_final_para_ml_doencas ")

In [4]:
df_modelo_final.toPandas().head()

Unnamed: 0,no_bairro_residencia,ds_semana_notificacao,resposta,populacao,temp_maxima,temp_minima,temp_comp_media,estacao_arcoverde,estacao_cabrobo,estacao_garanhuns,estacao_ouricuri,estacao_petrolina,estacao_recife,estacao_seminfo,estacao_surubim,ds_tipo_base
0,VILA TAMANDARE,201617,1,33147.348105,31.096273,23.243363,26.897865,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,original
1,PORTO DA MADEIRA,201627,2,7713.0,28.75,19.099999,24.679999,0.0,0.0,0.0,0.0,0.0,69.0,0.0,0.0,original
2,PORTO DA MADEIRA,201613,4,7713.0,32.025,23.8,27.820001,0.0,0.0,0.0,0.0,0.0,69.0,0.0,0.0,original
3,PORTO DA MADEIRA,201612,1,7713.0,30.4,23.700001,26.5,0.0,0.0,0.0,0.0,0.0,69.0,0.0,0.0,original
4,PORTO DA MADEIRA,201604,1,7713.0,31.6,25.200001,28.1,0.0,0.0,0.0,0.0,0.0,69.0,0.0,0.0,original


In [5]:
df_modelo_final.printSchema()

root
 |-- no_bairro_residencia: string (nullable = true)
 |-- ds_semana_notificacao: integer (nullable = true)
 |-- resposta: long (nullable = true)
 |-- populacao: double (nullable = true)
 |-- temp_maxima: double (nullable = true)
 |-- temp_minima: double (nullable = true)
 |-- temp_comp_media: double (nullable = true)
 |-- estacao_arcoverde: double (nullable = true)
 |-- estacao_cabrobo: double (nullable = true)
 |-- estacao_garanhuns: double (nullable = true)
 |-- estacao_ouricuri: double (nullable = true)
 |-- estacao_petrolina: double (nullable = true)
 |-- estacao_recife: double (nullable = true)
 |-- estacao_seminfo: double (nullable = true)
 |-- estacao_surubim: double (nullable = true)
 |-- ds_tipo_base: string (nullable = true)



In [6]:
df_modelo_final.registerTempTable("tb_base_pre_modelo")

In [7]:
df_base_pre_modelo = sqlContext.sql("""
                           SELECT 
                                  resposta
                                  , populacao
                                  , temp_maxima
                                  , temp_minima
                                  , temp_comp_media
                                  , estacao_arcoverde
                                  , estacao_cabrobo
                                  , estacao_garanhuns
                                  , estacao_ouricuri
                                  , estacao_petrolina
                                  , estacao_recife
                                  , estacao_seminfo
                                  , estacao_surubim
                          FROM    tb_base_pre_modelo 
                          """) 

In [8]:
df_base_pre_modelo.toPandas().head()

Unnamed: 0,resposta,populacao,temp_maxima,temp_minima,temp_comp_media,estacao_arcoverde,estacao_cabrobo,estacao_garanhuns,estacao_ouricuri,estacao_petrolina,estacao_recife,estacao_seminfo,estacao_surubim
0,1,33147.348105,31.096273,23.243363,26.897865,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
1,2,7713.0,28.75,19.099999,24.679999,0.0,0.0,0.0,0.0,0.0,69.0,0.0,0.0
2,4,7713.0,32.025,23.8,27.820001,0.0,0.0,0.0,0.0,0.0,69.0,0.0,0.0
3,1,7713.0,30.4,23.700001,26.5,0.0,0.0,0.0,0.0,0.0,69.0,0.0,0.0
4,1,7713.0,31.6,25.200001,28.1,0.0,0.0,0.0,0.0,0.0,69.0,0.0,0.0


In [9]:
df_base_pre_modelo.printSchema()

root
 |-- resposta: long (nullable = true)
 |-- populacao: double (nullable = true)
 |-- temp_maxima: double (nullable = true)
 |-- temp_minima: double (nullable = true)
 |-- temp_comp_media: double (nullable = true)
 |-- estacao_arcoverde: double (nullable = true)
 |-- estacao_cabrobo: double (nullable = true)
 |-- estacao_garanhuns: double (nullable = true)
 |-- estacao_ouricuri: double (nullable = true)
 |-- estacao_petrolina: double (nullable = true)
 |-- estacao_recife: double (nullable = true)
 |-- estacao_seminfo: double (nullable = true)
 |-- estacao_surubim: double (nullable = true)



### Criando e testando o modelo modelo Boosting

In [10]:
### Let¥s do the same using ML package
## Importing Functions

from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

In [11]:
## Splitting the DatFrame into Train and Test

(Train, Test) = df_base_pre_modelo.randomSplit([0.7, 0.3])

In [12]:
Train.printSchema()

root
 |-- resposta: long (nullable = true)
 |-- populacao: double (nullable = true)
 |-- temp_maxima: double (nullable = true)
 |-- temp_minima: double (nullable = true)
 |-- temp_comp_media: double (nullable = true)
 |-- estacao_arcoverde: double (nullable = true)
 |-- estacao_cabrobo: double (nullable = true)
 |-- estacao_garanhuns: double (nullable = true)
 |-- estacao_ouricuri: double (nullable = true)
 |-- estacao_petrolina: double (nullable = true)
 |-- estacao_recife: double (nullable = true)
 |-- estacao_seminfo: double (nullable = true)
 |-- estacao_surubim: double (nullable = true)



In [13]:
from pyspark.ml.feature import VectorAssembler
assemblerInputs = ["populacao","temp_maxima","temp_minima","temp_comp_media","estacao_arcoverde","estacao_cabrobo","estacao_garanhuns","estacao_ouricuri","estacao_petrolina","estacao_recife","estacao_seminfo","estacao_surubim"]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages = [assembler]

In [14]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(Train)

In [15]:
X_train = pipelineModel.transform(Train)
X_test = pipelineModel.transform(Test)

In [16]:
X_train.toPandas().head()

Unnamed: 0,resposta,populacao,temp_maxima,temp_minima,temp_comp_media,estacao_arcoverde,estacao_cabrobo,estacao_garanhuns,estacao_ouricuri,estacao_petrolina,estacao_recife,estacao_seminfo,estacao_surubim,features
0,1,7713.0,28.200001,19.4,23.620001,0.0,0.0,0.0,0.0,0.0,69.0,0.0,0.0,"(7713.0, 28.200000762939453, 19.39999961853027..."
1,1,7713.0,28.9,22.0,24.84,0.0,0.0,0.0,0.0,0.0,69.0,0.0,0.0,"(7713.0, 28.899999618530273, 22.0, 24.84000015..."
2,1,7713.0,29.1,21.4,24.959999,0.0,0.0,0.0,0.0,0.0,69.0,0.0,0.0,"(7713.0, 29.100000381469727, 21.39999961853027..."
3,1,7713.0,29.700001,24.4,26.299999,0.0,0.0,0.0,0.0,0.0,69.0,0.0,0.0,"(7713.0, 29.700000762939453, 24.39999961853027..."
4,1,7713.0,30.200001,23.0,25.98,0.0,0.0,0.0,0.0,0.0,69.0,0.0,0.0,"(7713.0, 30.200000762939453, 23.0, 25.97999954..."


In [17]:
### Boosting - the final model

## Importing package and creating Boosting model
from pyspark.ml.regression import GBTRegressor
dBoost = GBTRegressor(labelCol='resposta', featuresCol='features')

## Updating pipeline using the boost
pipeline = Pipeline(stages=[dBoost])

## Creating Evaluation Function based on R2
evaluator = RegressionEvaluator(labelCol='resposta', predictionCol='prediction', metricName='r2') 

## Creating grid in max depth, subsamplingRate and number of trees
paramGrid = ParamGridBuilder().addGrid(dBoost.maxDepth, [3,6]).addGrid(dBoost.subsamplingRate,
 [0.5,1]).addGrid(dBoost.maxIter, [10]).build()

## Updating cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

## Applying the model
CV_model = crossval.fit(X_train)

## Printing best model
print(CV_model.bestModel.stages[0])

transformed_data = CV_model.transform(X_test)

print(evaluator.getMetricName(), 'R2:', evaluator.evaluate(transformed_data))

GBTRegressionModel (uid=GBTRegressor_46e3a836d06721d64377) with 10 trees
r2 R2: 0.4842284517381257


In [18]:
modelo = CV_model.bestModel

In [20]:
#salvando o modelo
from pyspark.ml import Pipeline, PipelineModel

#pipelineModel.save("/user/labdata/model/")
modelo.save("/user/labdata/model/")

### Testando o modelo prevendo uma base simulada

In [25]:
#Importa arquivo exemplo
df_teste_2017 = sqlContext.read.format('csv').options(header='true', delimiter=',', quote= "\"", inferSchema='true').load('/user/labdata/Dados_2017_teste.csv')  

In [26]:
df_teste_2017.toPandas().head()

Unnamed: 0,populacao,temp_maxima,temp_minima,temp_comp_media,estacao_recife,estacao_ouricuri,estacao_cabrobo,estacao_petrolina,estacao_seminfo,estacao_garanhuns,estacao_arcoverde,estacao_surubim,resposta
0,5773.0,31.3,24.7,27.56,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.862713
1,5773.0,30.7,20.65,25.02,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.608811
2,5773.0,31.8,23.8,27.82,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.210589
3,36265.0,30.864103,23.779487,27.09174,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,23.041336
4,36265.0,31.054545,22.813636,26.629091,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,11.031927


In [27]:
df_teste_2017.printSchema()

root
 |-- populacao: double (nullable = true)
 |-- temp_maxima: double (nullable = true)
 |-- temp_minima: double (nullable = true)
 |-- temp_comp_media: double (nullable = true)
 |-- estacao_recife: double (nullable = true)
 |-- estacao_ouricuri: double (nullable = true)
 |-- estacao_cabrobo: double (nullable = true)
 |-- estacao_petrolina: double (nullable = true)
 |-- estacao_seminfo: double (nullable = true)
 |-- estacao_garanhuns: double (nullable = true)
 |-- estacao_arcoverde: double (nullable = true)
 |-- estacao_surubim: double (nullable = true)
 |-- resposta: double (nullable = true)



In [28]:
#Trata base retirando variavel resposta
df_teste_2017.registerTempTable("tb_schema_modelo")

In [29]:
df_teste_2017 = sqlContext.sql("""
                           SELECT 
                                  populacao
                                 , temp_maxima
                                 , temp_minima
                                 , temp_comp_media
                                 , estacao_recife
                                 , estacao_ouricuri
                                 , estacao_cabrobo
                                 , estacao_petrolina
                                 , estacao_seminfo
                                 , estacao_garanhuns
                                 , estacao_arcoverde
                                 , estacao_surubim
                          FROM    tb_schema_modelo 
                          """)     

In [30]:
df_teste_2017.toPandas().head(3)

Unnamed: 0,populacao,temp_maxima,temp_minima,temp_comp_media,estacao_recife,estacao_ouricuri,estacao_cabrobo,estacao_petrolina,estacao_seminfo,estacao_garanhuns,estacao_arcoverde,estacao_surubim
0,5773.0,31.3,24.7,27.56,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,5773.0,30.7,20.65,25.02,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,5773.0,31.8,23.8,27.82,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [33]:
#cria vetor para modelo
assemblerInputs = ["populacao","temp_maxima","temp_minima","temp_comp_media","estacao_arcoverde","estacao_cabrobo","estacao_garanhuns","estacao_ouricuri","estacao_petrolina","estacao_recife","estacao_seminfo","estacao_surubim"]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages = [assembler]        

pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_teste_2017)        

df_pre_previsao = pipelineModel.transform(df_teste_2017) 

In [34]:
modelo = PipelineModel.load("/user/labdata/model/")

In [39]:
base = modelo.transform(df_pre_previsao)

In [41]:
base.toPandas().head()

Unnamed: 0,populacao,temp_maxima,temp_minima,temp_comp_media,estacao_recife,estacao_ouricuri,estacao_cabrobo,estacao_petrolina,estacao_seminfo,estacao_garanhuns,estacao_arcoverde,estacao_surubim,features,prediction
0,5773.0,31.3,24.7,27.56,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,"(5773.0, 31.3, 24.7, 27.56, 0.0, 0.0, 0.0, 0.0...",3.375366
1,5773.0,30.7,20.65,25.02,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,"(5773.0, 30.7, 20.65, 25.02, 0.0, 0.0, 0.0, 0....",2.50942
2,5773.0,31.8,23.8,27.82,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,"(5773.0, 31.8, 23.8, 27.82, 0.0, 0.0, 0.0, 0.0...",4.308073
3,36265.0,30.864103,23.779487,27.09174,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,"(36265.0, 30.864102564102566, 23.7794871794871...",5.046598
4,36265.0,31.054545,22.813636,26.629091,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,"(36265.0, 31.05454545454545, 22.81363636363636...",4.197925
