# <font color='blue'>Data Science Academy Big Data Real-Time Analytics com Python e Spark</font>

# <font color='blue'>Capítulo 9</font>

## <font color='blue'>Spark MLLib - Regressão Linear</font>

<strong> Descrição </strong>
<ul style="list-style-type:square">
  <li>Método para avaliar o relacionamento entre variáveis.</li>
  <li>Estima o valor de uma variável dependente a partir dos valores das variáveis independentes.</li>
  <li>Usado quando as variáveis dependente e independente são contínuas e possuem alguma correlação.</li>
  <li>O R-Square mede quão perto os dados estão da linha de regressão. O valor do R-Squared será entre 0 e 1, sendo que quanto maior o valor, melhor.</li>
  <li>Os dados de entrada e de saída são usados na construção do modelo. A equação linear retorna os valores dos coeficientes.</li>
  <li>A equação linear representa o modelo.</li>
</ul>

<dl>
  <dt>Vantagens</dt>
  <dd>- Baixo custo</dd>
  <dd>- Veloz</dd>
  <dd>- Excelente para realação lineares</dd>
  <br />
  <dt>Desvantagens</dt>
  <dd>- Somente variáveis numéricas</dd>
  <dd>- Sensível a outliers</dd>
  <br />
  <dt>Aplicação</dt>
  <dd>- Um dos modelos mais antigos e pode ser usado para resolver diversos problemas</dd>
</dl>

## Usaremos Regressão Linear para prever os valores de MPG (Miles Per Gallon)

MPG será a variável target e as demais variáveis serão as features (variáveis preditoras).

In [28]:
# Spark Session - usada quando se trabalha com Dataframes no Spark
spark = SparkSession.builder.getOrCreate()

In [29]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [33]:

# @hidden_cell
credentials_1 = {
  'auth_url':'https://identity.open.softlayer.com',
  'project':'object_storage_f0d6ce32_5e0f_4bc0_8812_229b8d429dbe',
  'project_id':'9a0cc60102244d368e96a83f25d4ca89',
  'region':'dallas',
  'user_id':'0caf8026c98a4342ac027a05416e6dee',
  'domain_id':'3be46074545f4c09b1f10df3ace95998',
  'domain_name':'1351407',
  'username':'member_327b95c3eecf105b8bdb0125b81968cfcc557dbd',
  'password':"""D[Cvr1bgf9DM^I{C""",
  'container':'CursoSpark',
  'tenantId':'undefined',
  'filename':'carros.csv'
}


In [34]:

from pyspark.sql import SparkSession

# @hidden_cell
# This function is used to setup the access of Spark to your Object Storage. The definition contains your credentials.
# You might want to remove those credentials before you share your notebook.
def set_hadoop_config_with_credentials_f0d6ce325e0f4bc08812229b8d429dbe(name):
    """This function sets the Hadoop configuration so it is possible to
    access data from Bluemix Object Storage using Spark"""

    prefix = 'fs.swift.service.' + name
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/v3/auth/tokens')
    hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
    hconf.set(prefix + '.tenant', '9a0cc60102244d368e96a83f25d4ca89')
    hconf.set(prefix + '.username', '0caf8026c98a4342ac027a05416e6dee')
    hconf.set(prefix + '.password', 'D[Cvr1bgf9DM^I{C')
    hconf.setInt(prefix + '.http.port', 8080)
    hconf.set(prefix + '.region', 'dallas')
    hconf.setBoolean(prefix + '.public', False)

# you can choose any name
name = 'keystone'
set_hadoop_config_with_credentials_f0d6ce325e0f4bc08812229b8d429dbe(name)


In [36]:
# Leitura do arquivo no Hadoop
fileNameOut = 'swift://'+ credentials_1['container'] + '.keystone/carros.csv' 
carrosRDD = sc.textFile(fileNameOut)

In [37]:
# Colocando o RDD em cache. Esse processo otimiza a performance.
carrosRDD.cache()

swift://CursoSpark.keystone/carros.csv MapPartitionsRDD[49] at textFile at NativeMethodAccessorImpl.java:-2

In [38]:
carrosRDD.count()

399

In [39]:
carrosRDD.take(5)

[u'MPG,CYLINDERS,DISPLACEMENT,HORSEPOWER,WEIGHT,ACCELERATION,MODELYEAR,NAME',
 u'18,8,307,130,3504,12,70,chevrolet chevelle malibu',
 u'15,8,350,165,3693,11.5,70,buick skylark 320',
 u'18,8,318,150,3436,11,70,plymouth satellite',
 u'16,8,304,150,3433,12,70,amc rebel sst']

In [40]:
# Removendo a primeira linha do arquivo (cabeçalho)
carrosRDD2 = carrosRDD.filter(lambda x: "DISPLACEMENT" not in x)
carrosRDD2.count()

398

## Limpeza dos Dados

In [41]:
# Usando um valor padrão para average HP (que será usado para preencher os valores missing)
mediaHP = sc.broadcast(75.0)

In [42]:
# Função para limpeza dos dados
def limpaDados( inputStr) :
    global mediaHP
    attList = inputStr.split(",")
    
    # Substitui o caracter ? por um valor
    hpValue = attList[3]
    if hpValue == "?":
        hpValue = mediaHP.value
       
    # Cria uma linha usando a função Row, limpando e convertendo os dados de string para float
    linhas = Row(MPG = float(attList[0]), CYLINDERS = float(attList[1]), DISPLACEMENT = float(attList[2]), 
                 HORSEPOWER = float(hpValue), WEIGHT = float(attList[4]), ACCELERATION = float(attList[5]), 
                 MODELYEAR = float(attList[6]), NAME = attList[7]) 
    return linhas

In [44]:
# Executa a função no RDD
carrosRDD3 = carrosRDD2.map(limpaDados)
carrosRDD3.cache()
carrosRDD3.take(5)

[Row(ACCELERATION=12.0, CYLINDERS=8.0, DISPLACEMENT=307.0, HORSEPOWER=130.0, MODELYEAR=70.0, MPG=18.0, NAME=u'chevrolet chevelle malibu', WEIGHT=3504.0),
 Row(ACCELERATION=11.5, CYLINDERS=8.0, DISPLACEMENT=350.0, HORSEPOWER=165.0, MODELYEAR=70.0, MPG=15.0, NAME=u'buick skylark 320', WEIGHT=3693.0),
 Row(ACCELERATION=11.0, CYLINDERS=8.0, DISPLACEMENT=318.0, HORSEPOWER=150.0, MODELYEAR=70.0, MPG=18.0, NAME=u'plymouth satellite', WEIGHT=3436.0),
 Row(ACCELERATION=12.0, CYLINDERS=8.0, DISPLACEMENT=304.0, HORSEPOWER=150.0, MODELYEAR=70.0, MPG=16.0, NAME=u'amc rebel sst', WEIGHT=3433.0),
 Row(ACCELERATION=10.5, CYLINDERS=8.0, DISPLACEMENT=302.0, HORSEPOWER=140.0, MODELYEAR=70.0, MPG=17.0, NAME=u'ford torino', WEIGHT=3449.0)]

## Análise Exploratória de Dados

In [21]:
# Cria um Dataframe
carrosDF = spSession.createDataFrame(carrosRDD3)

Py4JJavaError: An error occurred while calling o23.applySchemaToPythonRDD.
: java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
	at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
	at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:171)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
	at java.lang.reflect.Constructor.newInstance(Unknown Source)
	at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:258)
	at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:359)
	at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:263)
	at org.apache.spark.sql.hive.HiveSharedState.metadataHive$lzycompute(HiveSharedState.scala:39)
	at org.apache.spark.sql.hive.HiveSharedState.metadataHive(HiveSharedState.scala:38)
	at org.apache.spark.sql.hive.HiveSharedState.externalCatalog$lzycompute(HiveSharedState.scala:46)
	at org.apache.spark.sql.hive.HiveSharedState.externalCatalog(HiveSharedState.scala:45)
	at org.apache.spark.sql.hive.HiveSessionState.catalog$lzycompute(HiveSessionState.scala:50)
	at org.apache.spark.sql.hive.HiveSessionState.catalog(HiveSessionState.scala:48)
	at org.apache.spark.sql.hive.HiveSessionState$$anon$1.<init>(HiveSessionState.scala:63)
	at org.apache.spark.sql.hive.HiveSessionState.analyzer$lzycompute(HiveSessionState.scala:63)
	at org.apache.spark.sql.hive.HiveSessionState.analyzer(HiveSessionState.scala:62)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
	at org.apache.spark.sql.SparkSession.applySchemaToPythonRDD(SparkSession.scala:666)
	at org.apache.spark.sql.SparkSession.applySchemaToPythonRDD(SparkSession.scala:656)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:211)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
	at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1523)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:86)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
	at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
	at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
	... 32 more
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
	at java.lang.reflect.Constructor.newInstance(Unknown Source)
	at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)
	... 38 more
Caused by: java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:C:/Users/Rui/OneDrive/Backup/DataScience/CursoSpark/Capitulo9/Capitulo09/spark-warehouse
	at org.apache.hadoop.fs.Path.initialize(Path.java:205)
	at org.apache.hadoop.fs.Path.<init>(Path.java:171)
	at org.apache.hadoop.hive.metastore.Warehouse.getWhRoot(Warehouse.java:159)
	at org.apache.hadoop.hive.metastore.Warehouse.getDefaultDatabasePath(Warehouse.java:177)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB_core(HiveMetaStore.java:600)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:620)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:461)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:66)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72)
	at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:199)
	at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java:74)
	... 43 more
Caused by: java.net.URISyntaxException: Relative path in absolute URI: file:C:/Users/Rui/OneDrive/Backup/DataScience/CursoSpark/Capitulo9/Capitulo09/spark-warehouse
	at java.net.URI.checkPath(Unknown Source)
	at java.net.URI.<init>(Unknown Source)
	at org.apache.hadoop.fs.Path.initialize(Path.java:202)
	... 54 more


In [12]:
# Estatísticas descritivas
carrosDF.select("MPG","CYLINDERS").describe().show()

+-------+-----------------+------------------+
|summary|              MPG|         CYLINDERS|
+-------+-----------------+------------------+
|  count|              398|               398|
|   mean|23.51457286432161| 5.454773869346734|
| stddev|7.815984312565782|1.7010042445332125|
|    min|              9.0|               3.0|
|    max|             46.6|               8.0|
+-------+-----------------+------------------+



In [13]:
# Encontrando a correlação entre a variável target com as variáveis preditoras
for i in carrosDF.columns:
    if not(isinstance(carrosDF.select(i).take(1)[0][0], str)) :
        print( "Correlação da variável MPG com ", i, carrosDF.stat.corr('MPG', i))

Correlação da variável MPG com  ACCELERATION 0.4202889121016499
Correlação da variável MPG com  CYLINDERS -0.7753962854205548
Correlação da variável MPG com  DISPLACEMENT -0.8042028248058979
Correlação da variável MPG com  HORSEPOWER -0.7747041523498721
Correlação da variável MPG com  MODELYEAR 0.5792671330833091
Correlação da variável MPG com  MPG 1.0
Correlação da variável MPG com  WEIGHT -0.8317409332443347


## Pré-Processamento dos Dados

In [14]:
# Convertendo para um LabeledPoint (target, Vector[features])
# Remove colunas não relevantes para o modelo ou com baixa correlação
def transformaVar(row) :
    obj = (row["MPG"], Vectors.dense([row["ACCELERATION"], row["DISPLACEMENT"], row["WEIGHT"]]))
    return obj

In [15]:
# Utiliza o RDD, aplica a função, converte para Dataframe e aplica a função select()
carrosRDD4 = carrosRDD3.map(transformaVar)
carrosDF = spSession.createDataFrame(carrosRDD4,["label", "features"])
carrosDF.select("label","features").show(10)

+-----+-------------------+
|label|           features|
+-----+-------------------+
| 18.0|[12.0,307.0,3504.0]|
| 15.0|[11.5,350.0,3693.0]|
| 18.0|[11.0,318.0,3436.0]|
| 16.0|[12.0,304.0,3433.0]|
| 17.0|[10.5,302.0,3449.0]|
| 15.0|[10.0,429.0,4341.0]|
| 14.0| [9.0,454.0,4354.0]|
| 14.0| [8.5,440.0,4312.0]|
| 14.0|[10.0,455.0,4425.0]|
| 15.0| [8.5,390.0,3850.0]|
+-----+-------------------+
only showing top 10 rows



In [16]:
carrosRDD4.take(5)

[(18.0, DenseVector([12.0, 307.0, 3504.0])),
 (15.0, DenseVector([11.5, 350.0, 3693.0])),
 (18.0, DenseVector([11.0, 318.0, 3436.0])),
 (16.0, DenseVector([12.0, 304.0, 3433.0])),
 (17.0, DenseVector([10.5, 302.0, 3449.0]))]

## Machine Learning

In [17]:
# Dados de Treino e de Teste
(dados_treino, dados_teste) = carrosDF.randomSplit([0.7, 0.3])

In [18]:
dados_treino.count()

287

In [19]:
dados_teste.count()

111

In [20]:
# Construindo o modelo com os dados de treino
linearReg = LinearRegression(maxIter = 10)
modelo = linearReg.fit(dados_treino)

In [21]:
print(modelo)

LinearRegression_4b718f39d15e4655d3b4


In [22]:
# Imprimindo as métricas
print("Coeficientes: " + str(modelo.coefficients))
print("Intercept: " + str(modelo.intercept))

Coeficientes: [0.20683227091,-0.0139156296053,-0.00571177718642]
Intercept: 40.070468428318414


In [23]:
# Previsões com dados de teste
predictions = modelo.transform(dados_teste)
predictions.select("prediction", "features").show()

+------------------+-------------------+
|        prediction|           features|
+------------------+-------------------+
| 9.913154602969094|[11.0,429.0,4633.0]|
| 7.735003087949785|[11.0,455.0,4951.0]|
|11.488830476938418|[12.0,400.0,4464.0]|
|11.832141254222886|[12.5,400.0,4422.0]|
|14.650893383592912|[16.0,302.0,4294.0]|
|14.290973955959721|[13.0,351.0,4129.0]|
| 15.78001282820481|[14.5,302.0,4042.0]|
|14.730981744824629|[15.5,304.0,4257.0]|
|18.506122537199946|[11.0,318.0,3399.0]|
| 16.48497603247754|[11.5,350.0,3693.0]|
|16.657319167099608|[12.5,318.0,3777.0]|
|14.573343113326626|[13.0,350.0,4082.0]|
|14.819335205272605|[13.5,318.0,4135.0]|
|12.735359151499622|[14.0,350.0,4440.0]|
|21.053220938567524|[17.0,250.0,3336.0]|
|21.332219412311503|[21.0,250.0,3432.0]|
|12.034140356336867| [9.5,400.0,4278.0]|
|21.591336286289643|[18.0,250.0,3278.0]|
|18.159162343353028|[18.5,250.0,3897.0]|
|20.782954972507465|[15.5,250.0,3329.0]|
+------------------+-------------------+
only showing top

In [24]:
# Coeficiente de determinação R2
avaliador = RegressionEvaluator(predictionCol = "prediction", labelCol = "label", metricName = "r2")
avaliador.evaluate(predictions)

0.7057954428371136

# Fim

### Obrigado - Data Science Academy - <a href=http://facebook.com/dsacademy>facebook.com/dsacademybr</a>