In [0]:
# Importando as bibliotecas
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml import Pipeline
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns

In [0]:
# O Dataset 'Power Plant' descreve uma usina geradora de energia movida a gás que se utiliza de vários sensores para controle da operação.
# O problema é de regressão considerando que o 'target' estamos tentando prever é numérico.

In [0]:
%sql
create database ada_857

In [0]:
%sql
show databases

databaseName
ada_857
default


In [0]:
%sql
use ada_857

In [0]:
# O arquivo é .tsv que significa Tab-separated-variable, ou variável separada por tab. 
display(dbutils.fs.ls("/databricks-datasets/power-plant/data"))

path,name,size,modificationTime
dbfs:/databricks-datasets/power-plant/data/Sheet1.tsv,Sheet1.tsv,308693,1436833478000
dbfs:/databricks-datasets/power-plant/data/Sheet2.tsv,Sheet2.tsv,308693,1436833478000
dbfs:/databricks-datasets/power-plant/data/Sheet3.tsv,Sheet3.tsv,308693,1436833478000
dbfs:/databricks-datasets/power-plant/data/Sheet4.tsv,Sheet4.tsv,308693,1436833478000
dbfs:/databricks-datasets/power-plant/data/Sheet5.tsv,Sheet5.tsv,308693,1436833479000


In [0]:
# Visualização dos dados
print(dbutils.fs.head("/databricks-datasets/power-plant/data/Sheet1.tsv"))

[Truncated to first 65536 bytes]
AT	V	AP	RH	PE
14.96	41.76	1024.07	73.17	463.26
25.18	62.96	1020.04	59.08	444.37
5.11	39.4	1012.16	92.14	488.56
20.86	57.32	1010.24	76.64	446.48
10.82	37.5	1009.23	96.62	473.9
26.27	59.44	1012.23	58.77	443.67
15.89	43.96	1014.02	75.24	467.35
9.48	44.71	1019.12	66.43	478.42
14.64	45	1021.78	41.25	475.98
11.74	43.56	1015.14	70.72	477.5
17.99	43.72	1008.64	75.04	453.02
20.14	46.93	1014.66	64.22	453.99
24.34	73.5	1011.31	84.15	440.29
25.71	58.59	1012.77	61.83	451.28
26.19	69.34	1009.48	87.59	433.99
21.42	43.79	1015.76	43.08	462.19
18.21	45	1022.86	48.84	467.54
11.04	41.74	1022.6	77.51	477.2
14.45	52.75	1023.97	63.59	459.85
13.97	38.47	1015.15	55.28	464.3
17.76	42.42	1009.09	66.26	468.27
5.41	40.07	1019.16	64.77	495.24
7.76	42.28	1008.52	83.31	483.8
27.23	63.9	1014.3	47.19	443.61
27.36	48.6	1003.18	54.93	436.06
27.47	70.72	1009.97	74.62	443.25
14.6	39.31	1011.11	72.52	464.16
7.91	39.96	1023.57	88.44	475.52
5.81	35.79	1012.14	92.28	484.41
30.53	65.18	1012.69	4

#### Definindo as features dos dados
AT = Atmospheric Temperature in C\
V = Exhaust Vacuum Speed\
AP = Atmospheric Pressure\
RH = Relative Humidity\
PE = Power Output. This is the value we are trying to predict given the measurements above.

In [0]:
# Usando delimitador = '\t' e inferschema='true' pois os dados são todos do tipo 'double' 
# e o pacote consegue inferir dinamicamente o tipo de cada coluna
df = sqlContext.read.format('com.databricks.spark.csv').options(delimiter='\t',header='true',inferschema='true').load("/databricks-datasets/power-plant/data")

In [0]:
dbutils.fs.mkdirs('ada_857/datasets')

Out[38]: True

In [0]:
%sql
show tables

database,tableName,isTemporary
ada_857,power,False
,power_plant,True


In [0]:
df = spark.read.load("/databricks-datasets/power-plant/data/Sheet1.tsv", format='csv', sep='\t', header=True, inferSchema=True)

In [0]:
df.describe

Out[49]: <bound method DataFrame.describe of DataFrame[AT: double, V: double, AP: double, RH: double, PE: double]>

In [0]:
dbutils.fs.ls('ada_857/datasets/')

Out[44]: []

In [0]:
%sql
create table power1(
  at float,
  v float,
  ap float,
  rh float,
  pe float
)
row format delimited
fields terminated by '\t'
lines terminated by '\n' 
stored as textfile
location '/ada_857/datasets'
tblproperties("skip.header.line.count"="1")

In [0]:
# Visualizando o tipo dos dados
df.printSchema()

root
 |-- AT: double (nullable = true)
 |-- V: double (nullable = true)
 |-- AP: double (nullable = true)
 |-- RH: double (nullable = true)
 |-- PE: double (nullable = true)



In [0]:
# Examinando os dados utilizando o método display
display(df)

AT,V,AP,RH,PE
14.96,41.76,1024.07,73.17,463.26
25.18,62.96,1020.04,59.08,444.37
5.11,39.4,1012.16,92.14,488.56
20.86,57.32,1010.24,76.64,446.48
10.82,37.5,1009.23,96.62,473.9
26.27,59.44,1012.23,58.77,443.67
15.89,43.96,1014.02,75.24,467.35
9.48,44.71,1019.12,66.43,478.42
14.64,45.0,1021.78,41.25,475.98
11.74,43.56,1015.14,70.72,477.5


In [0]:
sqlContext.sql("DROP TABLE IF EXISTS power1")
sqlContext.registerDataFrameAsTable(df, "power1")

In [0]:
%sql
SELECT * FROM power1

AT,V,AP,RH,PE
14.96,41.76,1024.07,73.17,463.26
25.18,62.96,1020.04,59.08,444.37
5.11,39.4,1012.16,92.14,488.56
20.86,57.32,1010.24,76.64,446.48
10.82,37.5,1009.23,96.62,473.9
26.27,59.44,1012.23,58.77,443.67
15.89,43.96,1014.02,75.24,467.35
9.48,44.71,1019.12,66.43,478.42
14.64,45.0,1021.78,41.25,475.98
11.74,43.56,1015.14,70.72,477.5


In [0]:
%sql
desc power1

col_name,data_type,comment
AT,double,
V,double,
AP,double,
RH,double,
PE,double,


In [0]:
# Usando método 'describe' para analisar os dados
df = sqlContext.table("power1")
display(df.describe())

summary,AT,V,AP,RH,PE
count,9568.0,9568.0,9568.0,9568.0,9568.0
mean,19.65123118729102,54.30580372073601,1013.2590781772604,73.30897784280926,454.3650094063554
stddev,7.452473229611082,12.707892998326784,5.938783705811581,14.600268756728964,17.066994999803402
min,1.81,25.36,992.89,25.56,420.26
max,37.11,81.56,1033.3,100.16,495.76


In [0]:
%sql
select AP as Pressure, PE as Power from power1

In [0]:
df.head()

Out[59]: Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26)

In [0]:
# Utilizando o VectorAssembler
from pyspark.ml.feature import VectorAssembler

datasetDF = sqlContext.table("power1")

vectorizer = VectorAssembler()
vectorizer.setInputCols(["AT", "V", "AP", "RH"])
vectorizer.setOutputCol("features")

Out[60]: VectorAssembler_7b93c9da7a1e

In [0]:
# Split dos dados sendo 20% teste e 80% treino
seed = 22
(split20DF, split80DF) = df.randomSplit([0.2,0.8],seed)

testSetDF = split20DF
trainingSetDF = split80DF

In [0]:
# Modelo de Regressão Linear
lr = LinearRegression()

In [0]:
# Configurando os parametros para o método
lr.setPredictionCol("Predicted_PE")\
  .setLabelCol("PE")\
  .setMaxIter(100)\
  .setRegParam(0.1)

Out[64]: LinearRegression_0ebeb8c6394e

In [0]:
# Usando o pipeline API do Spark
lrPipeline = Pipeline()
lrPipeline.setStages([vectorizer, lr])

Out[65]: Pipeline_783201e480a2

In [0]:
# Treinando o dataset
lrModel = lrPipeline.fit(trainingSetDF)

In [0]:
# Aplicando o modelo de regressão linear para os dados de teste para predizer o resultado do power output(Predicted_PE)
predictionsAndLabelsDF = lrModel.transform(testSetDF).select("AT", "V", "AP", "RH", "PE", "Predicted_PE")

display(predictionsAndLabelsDF)

AT,V,AP,RH,PE,Predicted_PE
2.58,39.42,1028.68,69.03,488.69,492.72225974259095
3.26,41.31,996.32,100.0,489.38,483.6250190098736
3.6,35.19,1018.73,99.1,488.98,486.5969310063951
3.63,38.44,1016.16,87.38,487.87,487.205464745973
3.85,35.47,1016.78,85.31,489.78,487.90544241609297
3.92,41.31,999.22,95.26,487.35,483.3091671623199
4.04,35.47,1017.51,87.35,486.86,487.3057631710416
4.11,38.44,1015.9,81.79,488.05,487.08769807215646
4.15,39.9,1008.84,96.68,491.22,483.8466633502798
4.32,35.47,1017.8,88.51,488.03,486.6258618547966


In [0]:
# Utilizando métricas de avaliação
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# Criando RMSE usando colunas 'PE' e 'Predicted_PE'
regEval = RegressionEvaluator(predictionCol="Predicted_PE", labelCol="PE", metricName="rmse")
rmse = regEval.evaluate(predictionsAndLabelsDF)
print("Root Mean Squared Error: %.2f" % rmse)

Root Mean Squared Error: 4.56


In [0]:
# Utilizando 'R squared' para verificar a proporção da variância do modelo, quanto mais próximo de 1 melhor  
r2 = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})

print("r2: {0:.2f}".format(r2))

r2: 0.93


In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [0]:
# Tentativa de melhorar o modelo fazendo 'tuning' de alguns parametros
crossval = CrossValidator(estimator=lrPipeline, evaluator=regEval, numFolds=3)

In [0]:
# Tunando parametro de regularização entre 0.01 à 0.10
regParam = [x / 100.0 for x in range(1, 11)]

In [0]:
# Criando grid de parametro e adicionando ao CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, regParam)
             .build())
crossval.setEstimatorParamMaps(paramGrid)

Out[74]: CrossValidator_485b9c9a8b1c

In [0]:
# Retornando o melhor modelo
cvModel = crossval.fit(trainingSetDF).bestModel

In [0]:
# Usando o cvModel para verificar as metricas para o dataset de teste
# Executar o RMSE e regEval criado previamente no predictionsAndLabelsDF Dataframe
# Avaliando a métrica r2 no dataset de teste
predictionsAndLabelsDF = cvModel.transform(testSetDF).select("AT", "V", "AP", "RH", "PE", "Predicted_PE")
rmseNew = regEval.evaluate(predictionsAndLabelsDF)
r2New = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})

print("Antigo Root Mean Squared Error: {0:2.2f}".format(rmse))
print("Novo Root Mean Squared Error: {0:2.2f}".format(rmseNew))
print("Antigo r2: {0:2.2f}".format(r2))
print("Novo r2: {0:2.2f}".format(r2New))

Antigo Root Mean Squared Error: 4.56
Novo Root Mean Squared Error: 4.55
Antigo r2: 0.93
Novo r2: 0.93
