# **Previsão de Tarifas em Corridas de Táxi com PySpark**

-  O desafio que enfrentaremos é semelhante ao apresentado no projeto de [Previsão de Tarifas em Corridas de Táxi](https://medium.com/@omarca2015c/previsão-de-tarifas-em-corridas-de-táxi-911506cc1117), porém desta vez estaremos utilizando o conjunto completo de dados, que contém aproximadamente 55 milhões de linhas.

## **1. Problema de Negócio**

- Imagine que você trabalha para uma empresa de táxi e que uma das maiores reclamações de seus clientes é que eles não sabem quanto custará uma corrida até que ela termine.Isso porque a distância é apenas um dos vários fatores a partir dos quais as tarifas de táxi são calculadas.

- Nesse contexto, faremos uso da ferramenta Pyspark para realizar as previsões necessárias.

## **2. Preparando o ambiente para utilizar o PySpark**

### **Instalando o Java**

- O Apache Spark depende de outros sistemas, portanto, antes do Spark é preciso instalar as dependências. Primeiro, deve-se instalar o java

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

### **Instalando o Apache Spark**

- Em seguida, é preciso fazer o download do Spark, e como este roda sobre o  Hadoop Distributed File System (HDFS), também é preciso fazer  o download do Hadoop. Após os downloads, basta descompactar esses arquivos que o Spark já estará disponível no seu notebook Colab.

In [2]:
# Fazendo download
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

# Descompactando os arquivos
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

### **Configurando o ambiente**

- Precisamos dizer para o sistema onde encontrar o Java e o Spark, previamente instalados.

In [3]:
# Importando a biblioteca os
import os

# Definindo a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Definindo a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

- A seguir, vamos precisar da biblioteca findspark que vai nos permitir importar pacotes necessários para o funcionamento do pyspark.

In [4]:
# instalando a findspark
!pip install -q findspark

In [5]:
#importando a findspark
import findspark

# iniciando o findspark
findspark.init()

### **Iniciando o PySpark**

In [6]:
# importando o pacote necessário para iniciar uma seção Spark
from pyspark.sql import SparkSession

# iniciando o spark context
sc = SparkSession.builder.master('local[*]').getOrCreate()

# Verificando se a sessão foi criada
sc

## **3. Ingestão dos dados para o Apache Spark**

### **3.1 Habilitar o uso da API Kaggle no Google Colab**

In [7]:
# Instalando a biblioteca Kaggle
!pip install -q kaggle

- Faça o download do arquivo de configuração da API Kaggle chamado "kaggle.json" em sua conta Kaggle.

In [8]:
# Fazendo o upload do Token da API Kaggle
from google.colab import files
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"omarmarca","key":"9621522919e711e3ed6531006be9c535"}'}

In [9]:
# Movendo o arquivo kaggle.json para o diretório adequado
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [10]:
# Fazendo o download do  Conjunto de Dados
!kaggle competitions download -c new-york-city-taxi-fare-prediction

Downloading new-york-city-taxi-fare-prediction.zip to /content
100% 1.56G/1.56G [00:18<00:00, 172MB/s]
100% 1.56G/1.56G [00:18<00:00, 92.8MB/s]


In [11]:
# Descompactando o Conjunto de Dados
!unzip new-york-city-taxi-fare-prediction.zip

Archive:  new-york-city-taxi-fare-prediction.zip
  inflating: GCP-Coupons-Instructions.rtf  
  inflating: sample_submission.csv   
  inflating: test.csv                
  inflating: train.csv               


### **3.2 Importar bibliotecas e conjunto de dados**

In [12]:
# importando as bibliotecas necessárias
from pyspark import *
from pyspark.sql import *

# carregando um conjunto de dados
taxi = (sc.read
      .format('csv')
      .option('inferSchema',True)
      .option('delimiter',',')
      .option('header',True)
      .load('/content/train.csv')
)

# Espiando o dataset
taxi.show(10)

+--------------------+-----------+--------------------+----------------+---------------+-----------------+----------------+---------------+
|                 key|fare_amount|     pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|
+--------------------+-----------+--------------------+----------------+---------------+-----------------+----------------+---------------+
|2009-06-15 17:26:...|        4.5|2009-06-15 17:26:...|      -73.844311|      40.721319|        -73.84161|       40.712278|              1|
|2010-01-05 16:52:...|       16.9|2010-01-05 16:52:...|      -74.016048|      40.711303|       -73.979268|       40.782004|              1|
|2011-08-18 00:35:...|        5.7|2011-08-18 00:35:...|      -73.982738|       40.76127|       -73.991242|       40.750562|              2|
|2012-04-21 04:30:...|        7.7|2012-04-21 04:30:...|       -73.98713|      40.733143|       -73.991567|       40.758092|              1|
|2010-03-09 07:51:..

### Metadados:

>  - key: deve ser usado apenas como um campo de ID exclusivo
>  - fare_amount: valor em dólares do custo da corrida de táxi (target)
>  - pickup_datetime: data/hora que indica quando a corrida de táxi começou
>  - pickup_longitude: coordenada de longitude de onde a corrida de táxi começou
>  - pickup_latitude: coordenada de latitude de onde a corrida de táxi começou
>  - dropoff_longitude: coordenada de longitude de onde a corrida de táxi terminou  
>  - dropoff_latitude: coordenada de latitude de onde a corrida de táxi terminou
>  - passenger_count: número de passageiros na corrida de táxi

## **4. Análises preliminares**

In [13]:
# Verificando o Schema
taxi.printSchema()

root
 |-- key: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



In [14]:
# Contando o número de linhas do dataset
taxi.count()

55423856

- 8 variáveis e 55423856 observações no dataset e a variável fare_amount é a variável alvo.  A variable pickup_datetime precisa ser convertida para o formato date.

In [15]:
# Resumo dos atributos numéricos
taxi.describe('fare_amount', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'passenger_count').show()

+-------+------------------+------------------+-----------------+------------------+------------------+------------------+
|summary|       fare_amount|  pickup_longitude|  pickup_latitude| dropoff_longitude|  dropoff_latitude|   passenger_count|
+-------+------------------+------------------+-----------------+------------------+------------------+------------------+
|  count|          55423856|          55423856|         55423856|          55423480|          55423480|          55423856|
|   mean|11.345045601663852|-72.50968444358729| 39.9197917868882|-72.51120972971809|39.920681444828844|1.6853799201556816|
| stddev|  20.7108321982325| 12.84888338140265|9.642353041994934|12.782196517830771| 9.633345796415124|1.3276643570959683|
|    min|            -300.0|      -3442.059565|     -3492.263768|      -3442.024565|      -3547.886698|                 0|
|    max|          93963.36|       3457.625683|      3408.789565|        3457.62235|       3537.132528|               208|
+-------+-------

- A variável fare_amount e passenger_count apresentam  outliers. A variable dropoff_longitude e dropoff_latitude tem missing values.

## **5. Pré-processamento**

### **Descarte de variáveis não importantes e conversão da variável pickup_datetime para date**

In [16]:
from pyspark.sql.functions import *

taxi1 = taxi.drop("key")
taxi1 = taxi1.withColumn("pickup_datetime", to_timestamp(taxi1.pickup_datetime))
taxi1.show(5)

+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+
|fare_amount|    pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|
+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+
|        4.5|2009-06-15 17:26:21|      -73.844311|      40.721319|        -73.84161|       40.712278|              1|
|       16.9|2010-01-05 16:52:16|      -74.016048|      40.711303|       -73.979268|       40.782004|              1|
|        5.7|2011-08-18 00:35:00|      -73.982738|       40.76127|       -73.991242|       40.750562|              2|
|        7.7|2012-04-21 04:30:42|       -73.98713|      40.733143|       -73.991567|       40.758092|              1|
|        5.3|2010-03-09 07:51:00|      -73.968095|      40.768008|       -73.956655|       40.783762|              1|
+-----------+-------------------+----------------+------

In [17]:
taxi1.printSchema()

root
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



### **Remoção das linhas com zero passageiros**

In [18]:
taxi1.groupBy("passenger_count").count().show()

+---------------+--------+
|passenger_count|   count|
+---------------+--------+
|             34|       1|
|              1|38337524|
|              6| 1174647|
|              3| 2432712|
|              5| 3929346|
|              9|      23|
|              4| 1178852|
|              8|       9|
|             49|       1|
|              7|      15|
|             51|       1|
|            129|       2|
|              2| 8175243|
|              0|  195416|
|            208|      64|
+---------------+--------+



- A maioria das entradas no conjunto de dados apresenta uma contagem de passageiros igual a 1. Vamos excluir todas as linhas que tenham mais de um passageiro no conjunto de dados.

In [19]:
taxi2=taxi1.filter("passenger_count==1")
taxi2.show(7)

+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+
|fare_amount|    pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|
+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+
|        4.5|2009-06-15 17:26:21|      -73.844311|      40.721319|        -73.84161|       40.712278|              1|
|       16.9|2010-01-05 16:52:16|      -74.016048|      40.711303|       -73.979268|       40.782004|              1|
|        7.7|2012-04-21 04:30:42|       -73.98713|      40.733143|       -73.991567|       40.758092|              1|
|        5.3|2010-03-09 07:51:00|      -73.968095|      40.768008|       -73.956655|       40.783762|              1|
|       12.1|2011-01-06 09:50:45|      -74.000964|       40.73163|       -73.972892|       40.758233|              1|
|        7.5|2012-11-20 20:35:00|      -73.980002|      

In [20]:
# Contando o número de linhas do dataset taxi2
taxi2.count()

38337524

### **Detecção de Missing Values e Dados Duplicados**

In [21]:
# Valores faltantes
taxi2.describe('dropoff_longitude', 'dropoff_latitude').show()

+-------+------------------+------------------+
|summary| dropoff_longitude|  dropoff_latitude|
+-------+------------------+------------------+
|  count|          38337524|          38337524|
|   mean| -72.4957075071462| 39.91499944802372|
| stddev|13.562142258782064|10.597292548089198|
|    min|      -3442.024565|      -3493.651853|
|    max|        3457.62235|        3407.39148|
+-------+------------------+------------------+



- O DataFrame taxi2 sem Dados Faltantes.

In [22]:
# Dados Duplicados
taxi3=taxi2.distinct()
taxi3.show(10)

+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+
|fare_amount|    pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|
+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+
|        6.1|2009-12-10 15:37:00|      -73.969622|      40.756973|       -73.981152|       40.759712|              1|
|       10.5|2012-02-03 13:46:35|      -73.988006|      40.737723|       -73.965699|       40.766095|              1|
|        6.5|2010-05-05 19:51:00|      -73.951573|      40.773733|       -73.959827|       40.761932|              1|
|       10.1|2010-11-21 01:41:00|        -74.0045|      40.742143|        -73.99433|       40.720412|              1|
|       11.0|2014-03-24 20:29:00|       -73.98442|       40.76023|        -73.99426|        40.73891|              1|
|       65.5|2014-07-31 14:49:37|      -73.990904|      

In [23]:
# Contando o número de linhas
taxi3.count()

38336063

- O DataFrame taxi2 apresentava 1461 registros duplicados.

### **Criando novas colunas de dados a partir das informações contidas em outras colunas**

- Dia: dia da semana (1=segunda-feira, 2=terça-feira e assim por diante).
- hora: Hora do dia em que o passageiro foi pego (0–23 horas).
- distancia: A distância (pelo ar, não na rua) em milhas percorridas pelo passeio.Para calcular distâncias, este código assume que a maioria dos passeios são curtos e que, portanto, é seguro ignorar a curvatura do Terra.
$d =  \sqrt{ x^2 + y^2} $
- Um grau de latitude equivale a aproximadamente 69 milhas e um grau de longitude é igual 54,6 milhas (é possível calcular distância usando a fórmula de haversine).

In [24]:
from pyspark.sql import functions as f
taxi3 = taxi3.withColumn("Dia", f.dayofweek("pickup_datetime"))
taxi3 = taxi3.withColumn("hora", f.hour("pickup_datetime"))
taxi3.show(5)

+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+---+----+
|fare_amount|    pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|Dia|hora|
+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+---+----+
|        6.1|2009-12-10 15:37:00|      -73.969622|      40.756973|       -73.981152|       40.759712|              1|  5|  15|
|       10.5|2012-02-03 13:46:35|      -73.988006|      40.737723|       -73.965699|       40.766095|              1|  6|  13|
|        6.5|2010-05-05 19:51:00|      -73.951573|      40.773733|       -73.959827|       40.761932|              1|  4|  19|
|       10.1|2010-11-21 01:41:00|        -74.0045|      40.742143|        -73.99433|       40.720412|              1|  1|   1|
|       11.0|2014-03-24 20:29:00|       -73.98442|       40.76023|        -73.99426|        40.73891|          

In [25]:
from pyspark.sql.functions import col
taxi4=taxi3.withColumn("distancia", (((col("dropoff_longitude")-col("pickup_longitude"))*54.6)**2+((col("dropoff_latitude")-col("pickup_latitude"))*69)**2)**0.5)
taxi4.show(5)

+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+---+----+------------------+
|fare_amount|    pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|Dia|hora|         distancia|
+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+---+----+------------------+
|        6.1|2009-12-10 15:37:00|      -73.969622|      40.756973|       -73.981152|       40.759712|              1|  5|  15|0.6572942199080255|
|       10.5|2012-02-03 13:46:35|      -73.988006|      40.737723|       -73.965699|       40.766095|              1|  6|  13|2.3056226748650466|
|        6.5|2010-05-05 19:51:00|      -73.951573|      40.773733|       -73.959827|       40.761932|              1|  4|  19|0.9306642848631145|
|       10.1|2010-11-21 01:41:00|        -74.0045|      40.742143|        -73.99433|       40.720412|              1|  1|   

### **Seleção das colunas que são necessárias**

In [26]:
taxi_df=taxi4.select("fare_amount", "Dia", "hora", "distancia")
taxi_df.show(5)

+-----------+---+----+------------------+
|fare_amount|Dia|hora|         distancia|
+-----------+---+----+------------------+
|        6.1|  5|  15|0.6572942199080255|
|       10.5|  6|  13|2.3056226748650466|
|        6.5|  4|  19|0.9306642848631145|
|       10.1|  1|   1|1.5989544753504845|
|       11.0|  2|  20|1.5661190797945121|
+-----------+---+----+------------------+
only showing top 5 rows



### **Detecção e tratamento de outliers**

In [27]:
# Target fare_amount
taxi_df.select('fare_amount').summary("min", "2.5%","25%", "50%","75%", "97.5%","max").show()

+-------+-----------+
|summary|fare_amount|
+-------+-----------+
|    min|     -300.0|
|   2.5%|        3.7|
|    25%|        6.0|
|    50%|        8.5|
|    75%|       12.5|
|  97.5%|      41.33|
|    max|   75747.02|
+-------+-----------+



- O limite superior será determinado pelo valor do quantil 97.5%, enquanto o limite inferior será definido pelo valor do quantil 2.5%.

In [28]:
# Feature distancia
taxi_df.select('distancia').summary("min", "2.5%","25%", "50%","75%", "97.5%","max").show()

+-------+------------------+
|summary|         distancia|
+-------+------------------+
|    min|               0.0|
|   2.5%|               0.0|
|    25%|  0.76078843451035|
|    50%|1.3226228575568202|
|    75%| 2.402275223117913|
|  97.5%| 9.241994421497436|
|    max|493784.04604772385|
+-------+------------------+



### **Com base na detecção de outliers, estão sendo aplicados limites razoáveis às tarifas e distâncias**

In [29]:
taxi_df1=taxi_df.filter((taxi_df.distancia>0) & (taxi_df.distancia<10) & (taxi_df.fare_amount>3) & (taxi_df.fare_amount<45))
taxi_df1.show(10)

+-----------+---+----+------------------+
|fare_amount|Dia|hora|         distancia|
+-----------+---+----+------------------+
|        6.1|  5|  15|0.6572942199080255|
|       10.5|  6|  13|2.3056226748650466|
|        6.5|  4|  19|0.9306642848631145|
|       10.1|  1|   1|1.5989544753504845|
|       11.0|  2|  20|1.5661190797945121|
|        8.9|  4|   7|1.6967539076200955|
|        9.7|  4|  10| 1.261164204383147|
|       10.5|  3|  13|1.6259967224184602|
|        4.9|  7|   6|0.4631813834372527|
|        5.3|  2|  22|1.4574025511446298|
+-----------+---+----+------------------+
only showing top 10 rows



In [30]:
# Removendo as linhas duplicadas do DataFrame chamado "taxi_df1".
tarifas=taxi_df1.distinct()
tarifas.show(5)

+-----------+---+----+------------------+
|fare_amount|Dia|hora|         distancia|
+-----------+---+----+------------------+
|        9.0|  3|   9|0.9866485608659631|
|        6.5|  6|  19|0.6022288812104362|
|       15.3|  5|   9|1.3171878319224541|
|        5.7|  7|  22|0.5860206781177603|
|        7.3|  5|  17|0.7172431424677226|
+-----------+---+----+------------------+
only showing top 5 rows



In [31]:
# Contando o número de linhas
tarifas.count()

35897732

### **# Analisar  a influência das variáveis de entrada, nos valores da variable alvo fare_amount**

In [32]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

vector_col="corr-features"
assembler=VectorAssembler(inputCols=tarifas.columns, outputCol=vector_col)
df_vector=assembler.transform(tarifas).select(vector_col)
matrix= Correlation.corr(df_vector, vector_col).collect()[0][0]
corrmatrix = matrix.toArray().tolist()
print(corrmatrix)

[[1.0, 0.0005149824712395955, -0.014327301999635517, 0.8762515296410246], [0.0005149824712395955, 1.0, 0.0115453568962633, -0.015460258279181089], [-0.014327301999635517, 0.0115453568962633, 1.0, -0.03517501294358808], [0.8762515296410246, -0.015460258279181089, -0.03517501294358808, 1.0]]


- Há uma correlação significativa de aproximadamente 0.87 entre a distância percorrida e o valor da tarifa.
- A correlação entre o dia da semana, a hora do dia e o valor da tarifa é fraca, mas optaremos por manter essas colunas.







### **Criando uma coluna com todas as preditoras em novo dataset**

In [33]:
# criando uma coluna com todas as preditoras
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# Definindo as preditoras
def_preditoras = VectorAssembler(
    inputCols=[
    'Dia',
    'hora',
    'distancia'],
    outputCol= 'preditoras'
)

In [34]:
tarifas1 = def_preditoras.transform(tarifas)
tarifas1.show()

+-----------+---+----+------------------+--------------------+
|fare_amount|Dia|hora|         distancia|          preditoras|
+-----------+---+----+------------------+--------------------+
|        9.0|  3|   9|0.9866485608659631|[3.0,9.0,0.986648...|
|        6.5|  6|  19|0.6022288812104362|[6.0,19.0,0.60222...|
|       15.3|  5|   9|1.3171878319224541|[5.0,9.0,1.317187...|
|        5.7|  7|  22|0.5860206781177603|[7.0,22.0,0.58602...|
|        7.3|  5|  17|0.7172431424677226|[5.0,17.0,0.71724...|
|       12.9|  5|   9|3.2199467569510154|[5.0,9.0,3.219946...|
|        6.1|  5|   2|1.5567447425707963|[5.0,2.0,1.556744...|
|        5.0|  5|   6|0.6312142104932869|[5.0,6.0,0.631214...|
|       18.1|  3|  15|  5.22557259896926|[3.0,15.0,5.22557...|
|        7.0|  3|   8|0.6090583001039429|[3.0,8.0,0.609058...|
|       12.9|  2|  14|2.8208689400829297|[2.0,14.0,2.82086...|
|        6.1|  5|  21|0.9024410043654625|[5.0,21.0,0.90244...|
|        7.3|  7|   0|0.7105404761469086|[7.0,0.0,0.710

In [35]:
tarifas1.printSchema()

root
 |-- fare_amount: double (nullable = true)
 |-- Dia: integer (nullable = true)
 |-- hora: integer (nullable = true)
 |-- distancia: double (nullable = true)
 |-- preditoras: vector (nullable = true)



In [36]:
tarifas1.select( 'preditoras').head(5)

[Row(preditoras=DenseVector([3.0, 9.0, 0.9866])),
 Row(preditoras=DenseVector([6.0, 19.0, 0.6022])),
 Row(preditoras=DenseVector([5.0, 9.0, 1.3172])),
 Row(preditoras=DenseVector([7.0, 22.0, 0.586])),
 Row(preditoras=DenseVector([5.0, 17.0, 0.7172]))]

### **Pegando somente os atributos que serao utilizados na modelagem**

In [37]:
tarifas_ml= tarifas1.select('fare_amount', 'preditoras')

# Espiando dataset final
tarifas_ml.show()

+-----------+--------------------+
|fare_amount|          preditoras|
+-----------+--------------------+
|        9.0|[3.0,9.0,0.986648...|
|        6.5|[6.0,19.0,0.60222...|
|       15.3|[5.0,9.0,1.317187...|
|        5.7|[7.0,22.0,0.58602...|
|        7.3|[5.0,17.0,0.71724...|
|       12.9|[5.0,9.0,3.219946...|
|        6.1|[5.0,2.0,1.556744...|
|        5.0|[5.0,6.0,0.631214...|
|       18.1|[3.0,15.0,5.22557...|
|        7.0|[3.0,8.0,0.609058...|
|       12.9|[2.0,14.0,2.82086...|
|        6.1|[5.0,21.0,0.90244...|
|        7.3|[7.0,0.0,0.710540...|
|        6.5|[4.0,16.0,0.73470...|
|        6.5|[5.0,11.0,0.86636...|
|       11.0|[5.0,21.0,2.37945...|
|        4.5|[1.0,15.0,0.58824...|
|       16.9|[1.0,13.0,4.70990...|
|        4.1|[3.0,15.0,0.51708...|
|        4.5|[5.0,21.0,0.37101...|
+-----------+--------------------+
only showing top 20 rows



## **6. Treinamento dos modelos**

### **6.1 Dividindo o dataset entre treino e teste**

In [38]:
taxi_treino, taxi_teste = tarifas_ml.randomSplit([.8,.2], seed=7)

### **6.2 Regressão Linear**

In [39]:
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

In [40]:
# Definindo o modelo de regressão
rl = LinearRegression(
    featuresCol='preditoras',
    labelCol= 'fare_amount',
    predictionCol='fare_amount_pred'
)

In [41]:
# Treinando o modeo
modelo_rl = rl.fit(taxi_treino)

In [42]:
# Verificando as métricas da qualidade do ajuste do modelo através
# do atributo LinearRegressionModel.summary

print("RMSE: {0}".format(modelo_rl.summary.rootMeanSquaredError))
print("MAE: {0}".format(modelo_rl.summary.meanAbsoluteError))
print("R2: {0}".format(modelo_rl.summary.r2))

RMSE: 3.1037888787562005
MAE: 1.9938771038378
R2: 0.7683425245440496


In [43]:
# Ajustando o modelo aos dados de teste
modelo_rl_pred = modelo_rl.transform(taxi_teste)

In [44]:
# Selecionando a coluna das tarifas preditas e observadas
predita_observada = modelo_rl_pred.select('fare_amount_pred','fare_amount')
predita_observada.show()

+------------------+-----------+
|  fare_amount_pred|fare_amount|
+------------------+-----------+
| 4.393108197439274|        3.3|
|3.8962150716554684|        3.3|
| 4.257345336755085|        3.3|
| 4.377392410883911|        3.3|
| 5.040105872213397|        3.3|
| 5.399141238331736|        3.3|
| 3.931512083231464|        3.3|
| 4.233510659159725|        3.3|
| 4.426253884814156|        3.3|
| 4.522401494231135|        3.3|
|  4.05249246165571|        3.3|
| 3.636638294888405|        3.3|
| 4.138561524449731|        3.3|
| 4.823895102853837|        3.3|
| 5.516573108757559|        3.3|
|4.9613018922573735|        3.3|
| 4.583508119401037|        3.3|
| 4.670403517123431|        3.3|
| 4.742615367285026|        3.3|
| 4.846394509520472|        3.3|
+------------------+-----------+
only showing top 20 rows



In [45]:
# Verificando as métricas da qualidade do ajuste do modelo através
# do RegressionEvaluator do pacote pyspark.ml

from pyspark.mllib.evaluation import RegressionMetrics

# mllib is old so the methods are available in rdd
metricas_modelo_rl = RegressionMetrics(predita_observada.rdd)

print("RMSE: {0}".format(metricas_modelo_rl.rootMeanSquaredError))
print("MAE: {0}".format(metricas_modelo_rl.meanAbsoluteError))
print("R2: {0}".format(metricas_modelo_rl.r2))

RMSE: 3.104511574284895
MAE: 1.9938971603810265
R2: 0.7680392918679897


- O modelo está errando mais ou menos 3.10 dólares e 1.99 dólares em termos absolutos.

### **6.3 Árvores de Decisão**

In [46]:
from pyspark.ml.regression import DecisionTreeRegressor

In [47]:
# definindo a árvore
dt= DecisionTreeRegressor(
    featuresCol='preditoras',
    labelCol='fare_amount',
    predictionCol='fare_amount_pred',
    maxDepth=5)

In [48]:
# Treinando a árvore
modelo_dt = dt.fit(taxi_treino)

In [49]:
# Ajustando a árvore aos dados de teste
modelo_dt_pred = modelo_dt.transform(taxi_teste)

In [50]:
# Selecionando a coluna das tarifas preditas e observadas
predita_observada = modelo_dt_pred.select('fare_amount_pred','fare_amount')
predita_observada.show(10)

+-----------------+-----------+
| fare_amount_pred|fare_amount|
+-----------------+-----------+
|5.234503798511104|        3.3|
|5.929158660861576|        3.3|
|5.929158660861576|        3.3|
|5.234503798511104|        3.3|
|5.234503798511104|        3.3|
|5.234503798511104|        3.3|
|5.929158660861576|        3.3|
|5.929158660861576|        3.3|
|5.234503798511104|        3.3|
|5.234503798511104|        3.3|
+-----------------+-----------+
only showing top 10 rows



In [51]:
# Calculando as métricas
metricas_modelo_dt = RegressionMetrics(predita_observada.rdd)

# Calculando as métricas
print("RMSE: {0}".format(metricas_modelo_dt.rootMeanSquaredError))
print("MAE: {0}".format(metricas_modelo_dt.meanAbsoluteError))
print("R2: {0}".format(metricas_modelo_dt.r2))

RMSE: 3.0534098496063793
MAE: 1.9792927414948807
R2: 0.7756128080894685


- O modelo está errando mais ou menos 3.05 dólares e 1.97 dólares em termos absolutos. No entanto, em comparação com o modelo anterior, o desempenho deste modelo é um pouco melhor.

## **7. Utilizando o modelo para prever os valores das tarifas**

- Prevendo o valor da tarifa para uma viagem de 3 milhas feita às 16:00 no dia sábado.
- Prevendo o valor da tarifa para uma viagem de 8 milhas feita às 11:00 a.m. na segunda-feira.
- Prevendo o valor da tarifa para uma viagem de 2 milhas feita às 17:00  na sexta-feira.
- Prevendo o valor da tarifa para uma viagem de 2 milhas feita às 17:00  no dia sábado.

In [52]:
# Criando o DataFrame
df = sc.createDataFrame([
    Row(Dia=6, hora=16, distancia= 3),
    Row(Dia=1, hora=11, distancia= 8),
    Row(Dia=5, hora=17, distancia= 2),
    Row(Dia=6, hora=17, distancia= 2)
])
df.show()

+---+----+---------+
|Dia|hora|distancia|
+---+----+---------+
|  6|  16|        3|
|  1|  11|        8|
|  5|  17|        2|
|  6|  17|        2|
+---+----+---------+



In [53]:
 # Fazendo as previsaoes
 def_preditoras1 = VectorAssembler(
    inputCols=[
    'Dia',
    'hora',
    'distancia'],
    outputCol= 'preditoras'
)

v1 = def_preditoras1.transform(df)

v2= v1.select( 'preditoras')

pred=modelo_dt.transform(v2)
pred.show()

+--------------+------------------+
|    preditoras|  fare_amount_pred|
+--------------+------------------+
|[6.0,16.0,3.0]| 13.83717306520015|
|[1.0,11.0,8.0]| 31.74552998228606|
|[5.0,17.0,2.0]|10.747678537128628|
|[6.0,17.0,2.0]|10.747678537128628|
+--------------+------------------+



## **8. Conclusão**

- Com base nas informações de hora do dia, dia da semana e distância percorrida fornecidas, o modelo é capaz de realizar uma previsão estimada do valor da tarifa.

- Explorar alternativas nos algoritmos de aprendizado, como a incorporação do Random Forest, e a otimização de hiperparâmetros são estratégias fundamentais para aprimorar a precisão do modelo de previsão.