Usaremos Regressão Linear para prever o consumo de combustível de automóveis.

A variável consumo no dataset1.csv será a variável target (dependente) e as demais variáveis serão candidatas a features (variáveis preditoras ou independentes).

Este é, portanto, será um problema de Regressão Linear Múltipla (quando temos mais de 1 variável preditora ou independente).

In [1]:
from platform import python_version
print('Versão usada:', python_version())

Versão usada: 3.9.12


In [2]:
#Importando e inicializando o findspark
import findspark
findspark.init()

In [3]:
# Importações
import numpy as np
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

## Carregando os dados

In [4]:
# Criando o Spark Context
sc = SparkContext(appName= 'Note1')

23/04/07 14:57:00 WARN Utils: Your hostname, mor-Inspiron-3501 resolves to a loopback address: 127.0.1.1; using 192.168.0.217 instead (on interface wlp0s20f3)
23/04/07 14:57:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/07 14:57:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/07 14:57:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
sc.setLogLevel('ERROR')

In [6]:
# Criando o Spark Session
spark_session = SparkSession.builder.master('local').getOrCreate()

In [7]:
# Carregando os dados e gerando um RDD
dados = sc.textFile('dataset1.csv')
type(dados)

pyspark.rdd.RDD

In [8]:
# Colocando o RDD em cache. Esse processo otimiza a performance
dados.cache()

dataset1.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [9]:
# Número de registros
dados.count()

                                                                                

399

In [10]:
# Visualizando as primeiras linhas
dados.take(5)

['consumo,numero_cilindros,capacidade,horsepower,peso,aceleracao,ano,nome',
 '30,4,79,70,2074,19.5,71,peugeot 304',
 '30,4,88,76,2065,14.5,71,fiat 124b',
 '31,4,71,65,1773,19,71,toyota corolla 1200',
 '35,4,72,69,1613,18,71,datsun 1200']

In [11]:
# Removendo a primeira linha do arquivo (cabeçalho)
# Isso é requerido pelo spark.

dados = dados.filter(lambda x: 'horsepower' not in x)
dados.count()

398

In [12]:
dados.take(5)

['30,4,79,70,2074,19.5,71,peugeot 304',
 '30,4,88,76,2065,14.5,71,fiat 124b',
 '31,4,71,65,1773,19,71,toyota corolla 1200',
 '35,4,72,69,1613,18,71,datsun 1200',
 '27,4,97,60,1834,19,71,volkswagen model 111']

## Limpeza dos Dados

Vamos verificar se há valores ausentes. RDDs são ótimos para processamento, mas ruins para exploração, então converteremos o RDD para DataFrame Spark e então para DataFrame Pandas.

In [13]:
# Converte RDD para DataFrame Spark
# É necessário converter para string, pois ao usar a função textfile, o spark não importou o schemma dos dados.
df_spark = dados.map(lambda x: str(x)).map(lambda w: w.split(',')).toDF()

In [14]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [15]:
df_pandas = df_spark.toPandas()
df_pandas.head()

Unnamed: 0,_1,_2,_3,_4,_5,_6,_7,_8
0,30,4,79,70,2074,19.5,71,peugeot 304
1,30,4,88,76,2065,14.5,71,fiat 124b
2,31,4,71,65,1773,19.0,71,toyota corolla 1200
3,35,4,72,69,1613,18.0,71,datsun 1200
4,27,4,97,60,1834,19.0,71,volkswagen model 111


In [16]:
type(df_pandas)

pandas.core.frame.DataFrame

In [17]:
# Procurando valores nulos
df_pandas.isnull().sum()

_1    0
_2    0
_3    0
_4    0
_5    0
_6    0
_7    0
_8    0
dtype: int64

In [18]:
# Boa notícia! Não há valor nulo, mas será que há valores ausentes?

total = np.sum(df_pandas.apply(lambda x: x.str.contains('\?')).values)
total

6

In [19]:
# Vejamos quais linhas e colunas têm o caracter especial
np.where(df_pandas.apply(lambda x: x.str.contains('\?')).values)

(array([ 48, 126, 330, 336, 354, 374]), array([3, 3, 3, 3, 3, 3]))

In [20]:
# Visualizando uma linha
df_pandas.iloc[330]

_1                    40.9
_2                       4
_3                      85
_4                       ?
_5                    1835
_6                    17.3
_7                      80
_8    renault lecar deluxe
Name: 330, dtype: object

In [21]:
# Usando um valor padrão para average HP (que será usado para preencher os valores ausentes)
mediaHP = sc.broadcast(75.0)

In [25]:
# Função para limpeza dos dados

def limpa_dados(inputStr):
    
    # Variável global
    global mediaHP
    
    # Lista de atributos
    attList = inputStr.split(',')
    
    # Substitui o caracter ? por um valor na coluna de índice 3
    
    hpValue = attList[3]
    if hpValue == '?':
        hpValue = mediaHP.value
    
    # Cria uma linha usando a função Row, limpando e convertendo os dados de string para float
    linhas = Row(consumo = float(attList[0]), 
                 numero_cilindros = float(attList[1]), 
                 capacidade = float(attList[2]), 
                 hosrsepower = float(hpValue), 
                 peso = float(attList[4]), 
                 aceleracao = float(attList[5]), 
                 ano = float(attList[6]), 
                 nome = attList[7]) 
    return linhas

In [26]:
# Executa a função no RDD
dados2 = dados.map(limpa_dados)
dados2.cache()
dados2.take(5)

[Row(consumo=30.0, numero_cilindros=4.0, capacidade=79.0, hosrsepower=70.0, peso=2074.0, aceleracao=19.5, ano=71.0, nome='peugeot 304'),
 Row(consumo=30.0, numero_cilindros=4.0, capacidade=88.0, hosrsepower=76.0, peso=2065.0, aceleracao=14.5, ano=71.0, nome='fiat 124b'),
 Row(consumo=31.0, numero_cilindros=4.0, capacidade=71.0, hosrsepower=65.0, peso=1773.0, aceleracao=19.0, ano=71.0, nome='toyota corolla 1200'),
 Row(consumo=35.0, numero_cilindros=4.0, capacidade=72.0, hosrsepower=69.0, peso=1613.0, aceleracao=18.0, ano=71.0, nome='datsun 1200'),
 Row(consumo=27.0, numero_cilindros=4.0, capacidade=97.0, hosrsepower=60.0, peso=1834.0, aceleracao=19.0, ano=71.0, nome='volkswagen model 111')]

## Análise Exploratória de Dados

In [28]:
# Cria um Dataframe

df_carros = spark_session.createDataFrame(dados2)

In [29]:
# Estatísticas descritivas de duas variáveis (como exemplo)

df_carros.select("consumo", "numero_cilindros").describe().show()

+-------+-----------------+-----------------+
|summary|          consumo| numero_cilindros|
+-------+-----------------+-----------------+
|  count|              398|              398|
|   mean|23.51457286432161|5.454773869346734|
| stddev|7.815984312565782|1.701004244533212|
|    min|              9.0|              3.0|
|    max|             46.6|              8.0|
+-------+-----------------+-----------------+



In [33]:
# Encontrando a correlação entre a variável target com as variáveis preditoras (exceto o nome)
# Calculamos a correlação somente das variáveis numéricas!!

for i in df_carros.columns:
    
    if not(isinstance(df_carros.select(i).take(1)[0][0],str)):
           print("Correlação da Variável Target com:", i, df_carros.stat.corr('consumo', i))

Correlação da Variável Target com: consumo 1.0
Correlação da Variável Target com: numero_cilindros -0.7753962854205546
Correlação da Variável Target com: capacidade -0.8042028248058979
Correlação da Variável Target com: hosrsepower -0.7747041523498721
Correlação da Variável Target com: peso -0.8317409332443347
Correlação da Variável Target com: aceleracao 0.42028891210164976
Correlação da Variável Target com: ano 0.5792671330833099


## Pré-Processamento dos Dados

In [34]:
# Convertendo para um LabeledPoint (target, Vector[features])
# Remove colunas não relevantes ou com baixa correlação

def transformaVar(row):
    obj = (row['consumo'], Vectors.dense([row["peso"], row["capacidade"], row["numero_cilindros"]]))
    return obj

In [35]:
dados3 = dados2.map(transformaVar)
dados3.take(5)

[(30.0, DenseVector([2074.0, 79.0, 4.0])),
 (30.0, DenseVector([2065.0, 88.0, 4.0])),
 (31.0, DenseVector([1773.0, 71.0, 4.0])),
 (35.0, DenseVector([1613.0, 72.0, 4.0])),
 (27.0, DenseVector([1834.0, 97.0, 4.0]))]

In [36]:
# Converte o RDD para DataFrame do Spark
df_carros = spark_session.createDataFrame(dados3, ['label', 'features'])
df_carros.select('label', 'features').show(10)

+-----+------------------+
|label|          features|
+-----+------------------+
| 30.0| [2074.0,79.0,4.0]|
| 30.0| [2065.0,88.0,4.0]|
| 31.0| [1773.0,71.0,4.0]|
| 35.0| [1613.0,72.0,4.0]|
| 27.0| [1834.0,97.0,4.0]|
| 26.0| [1955.0,91.0,4.0]|
| 24.0|[2278.0,113.0,4.0]|
| 25.0| [2126.0,97.5,4.0]|
| 23.0| [2254.0,97.0,4.0]|
| 20.0|[2408.0,140.0,4.0]|
+-----+------------------+
only showing top 10 rows



In [39]:
# Divisão em dados de Treino e de Teste com split 70/30
(dados_treino, dados_teste) = df_carros.randomSplit([0.7, 0.3])
print(dados_treino.count())
print(dados_teste.count())

277
121


## Machine Learning

In [40]:
 # Cria o objeto
    
linear_reg = LinearRegression()

In [41]:
# Treina o objeto com dados e cria o modelo
modelo = linear_reg.fit(dados_treino)

In [42]:
print(modelo)

LinearRegressionModel: uid=LinearRegression_a30843bcf71d, numFeatures=3


In [43]:
# Imprimindo os coeficientes (o que o modelo aprendeu)
print('Coeficientes:' + str(modelo.coefficients))
print("Intercepto:" + str(modelo.intercept))

Coeficientes:[-0.006591941437983503,-0.010024369168943533,0.029607349972484476]
Intercepto:44.75288204137945


In [44]:
# Previsões com dados de teste
pred = modelo.transform(dados_teste)

In [46]:
# Visualiza as previsões
pred.select("features", 'prediction').show()

+------------------+------------------+
|          features|        prediction|
+------------------+------------------+
|[4499.0,350.0,8.0]|11.824067102541306|
|[4906.0,400.0,8.0]| 8.639928478834847|
|[4951.0,455.0,8.0]| 7.791950809833693|
|[4952.0,429.0,8.0]| 8.045992466788242|
|[3169.0,302.0,8.0]| 21.07251893516866|
|[3821.0,360.0,8.0]|16.193159705804685|
|[3988.0,350.0,8.0]|15.192549177350877|
|[4294.0,302.0,8.0]|13.656584817437217|
|[4422.0,400.0,8.0]| 11.83042813481886|
|[4502.0,350.0,8.0]| 11.80429127822736|
|[4654.0,360.0,8.0]| 10.70207248796443|
|[4735.0,440.0,8.0]| 9.366175697972281|
|[4746.0,400.0,8.0]| 9.694639108912206|
|[5140.0,400.0,8.0]| 7.097414182346704|
|[3086.0,455.0,8.0]|20.085921591672925|
|[4042.0,302.0,8.0]|15.317754059809058|
|[4354.0,454.0,8.0]|11.737364217478785|
|[4425.0,455.0,8.0]|11.259312006213015|
|[3336.0,250.0,6.0]|20.433717211865506|
|[3432.0,250.0,6.0]| 19.80089083381909|
+------------------+------------------+
only showing top 20 rows



In [47]:
# Coeficiente de determinação R2
avaliador = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='r2')

In [48]:
# Resultado
avaliador.evaluate(pred)

0.7241433037002081

## Fim 