# Mini-Projeto 1 - Analisando Dados do Uber com Spark 

Dataset: https://github.com/fivethirtyeight/uber-tlc-foil-response

Esse conjunto de dados contém dados de mais de 4,5 milhões de captações Uber na cidade de Nova York de abril a setembro de 2014 e 14,3 milhões de captações Uber de janeiro a junho de 2015. Dados em nível de viagem sobre 10 outras empresas de veículos de aluguel (FHV) bem como dados agregados para 329 empresas de FHV, também estão incluídos. Todos os arquivos foram recebidos em 3 de agosto, 15 de setembro e 22 de setembro de 2015.

1- Quantos são e quais são as bases de carros do Uber (onde os carros ficam esperando passageiros)?

2- Qual o total de veículos que passaram pela base B02617?

3- Qual o total de corridas por base? Apresente de forma decrescente.

In [4]:
# Importando o módulo pandas
import pandas as pd

In [5]:
# Carregando o dado
df_pandas = pd.read_csv('data/uber.csv')

In [6]:
# Shape do dado
df_pandas.shape

(354, 4)

In [7]:
# Primeiras cinco linhas do dados
df_pandas.head()

Unnamed: 0,dispatching_base_number,date,active_vehicles,trips
0,B02512,1/1/2015,190,1132
1,B02765,1/1/2015,225,1765
2,B02764,1/1/2015,3427,29421
3,B02682,1/1/2015,945,7679
4,B02617,1/1/2015,1228,9537


In [8]:
# Transformando o dataframe pandas em um DataFrame Spark
df_uber = sqlContext.createDataFrame(df_pandas)

In [9]:
# Tipo de dado
type(df_uber)

pyspark.sql.dataframe.DataFrame

In [10]:
# Criando um RDD
uber = sc.textFile('data/uber.csv')

In [11]:
type(uber)

pyspark.rdd.RDD

In [12]:
# Total de registros
uber.count()

355

In [13]:
# Primeiros 10 registros
uber.take(10)

['dispatching_base_number,date,active_vehicles,trips',
 'B02512,1/1/2015,190,1132',
 'B02765,1/1/2015,225,1765',
 'B02764,1/1/2015,3427,29421',
 'B02682,1/1/2015,945,7679',
 'B02617,1/1/2015,1228,9537',
 'B02598,1/1/2015,870,6903',
 'B02598,1/2/2015,785,4768',
 'B02617,1/2/2015,1137,7065',
 'B02512,1/2/2015,175,875']

In [14]:
# Dividindo o arquivo em colunas separadas pelo caracter ','
uber_linhas = uber.map(lambda line: line.split(','))

In [15]:
type(uber_linhas)

pyspark.rdd.PipelinedRDD

In [16]:
uber_linhas.take(5)

[['dispatching_base_number', 'date', 'active_vehicles', 'trips'],
 ['B02512', '1/1/2015', '190', '1132'],
 ['B02765', '1/1/2015', '225', '1765'],
 ['B02764', '1/1/2015', '3427', '29421'],
 ['B02682', '1/1/2015', '945', '7679']]

In [17]:
# Numero de base de carros do Uber
uber_linhas.map(lambda linha: linha[0]).distinct().count() - 1 

6

In [18]:
# Bases de carros Uber
uber_linhas.map(lambda linha: linha[0]).distinct().collect()

['dispatching_base_number',
 'B02765',
 'B02682',
 'B02598',
 'B02512',
 'B02764',
 'B02617']

In [28]:
# Total de veículos da base B02617
B02617 = uber_linhas.filter(lambda x: 'B02617' in x)
B02617.map(lambda linha: int(linha[2])).reduce(lambda x,y: x + y)

79758

In [22]:
# Total de corridas base B02765
base_B02765 = uber_linhas.filter(lambda x: 'B02765' in x).map(lambda linha: int(linha[3])).reduce(lambda x,y: x + y)
print(base_B02765)

193670


In [23]:
# Total de corridas base B02617
base_B02617 = uber_linhas.filter(lambda x: 'B02617' in x).map(lambda linha: int(linha[3])).reduce(lambda x,y: x + y)
print(base_B02617)

725025


In [24]:
# Total de corridas base B02617
base_B02682 = uber_linhas.filter(lambda x: 'B02682' in x).map(lambda linha: int(linha[3])).reduce(lambda x,y: x + y)
print(base_B02682)

662509


In [27]:
# Total de corridas base B02617
base_B02598 = uber_linhas.filter(lambda x: 'B02598' in x).map(lambda linha: int(linha[3])).reduce(lambda x,y: x + y)
print(base_B02598)

540791


In [26]:
# Total de corridas base B02617
base_B02512 = uber_linhas.filter(lambda x: 'B02512' in x).map(lambda linha: int(linha[3])).reduce(lambda x,y: x + y)
print(base_B02512)

93786


In [25]:
# Total de corridas base B02617
base_B02764 = uber_linhas.filter(lambda x: 'B02764' in x).map(lambda linha: int(linha[3])).reduce(lambda x,y: x + y)
print(base_B02764)

1914449


#### Método mais simples para calcular o total de corridas por base

In [35]:
# Carregando novamente o arquivo
new_uber = sc.textFile("data/uber.csv").filter(lambda line: "base" not in line).map(lambda line:line.split(","))

In [39]:
# Primeiras cinco linhas
new_uber.take(5)

[['B02512', '1/1/2015', '190', '1132'],
 ['B02765', '1/1/2015', '225', '1765'],
 ['B02764', '1/1/2015', '3427', '29421'],
 ['B02682', '1/1/2015', '945', '7679'],
 ['B02617', '1/1/2015', '1228', '9537']]

In [40]:
# Total por base
new_uber.map(lambda kp: (kp[0], int(kp[3])) ).reduceByKey(lambda k,v: k + v).collect()

[('B02765', 193670),
 ('B02682', 662509),
 ('B02598', 540791),
 ('B02512', 93786),
 ('B02764', 1914449),
 ('B02617', 725025)]

In [42]:
# Total por base em ordem decrescente
new_uber.map(lambda kp: (kp[0], int(kp[3])) ).reduceByKey(lambda k,v: k + v).takeOrdered(10, key = lambda x: -x[1])

[('B02764', 1914449),
 ('B02617', 725025),
 ('B02682', 662509),
 ('B02598', 540791),
 ('B02765', 193670),
 ('B02512', 93786)]