# Analisando dados do Uber Utilizando o Spark

DataSet: https://github.com/fivethirtyeight/uber-tlc-foil-response

In [1]:
from pandas import read_csv

In [2]:
uberFile = read_csv('C:/Users/skite/OneDrive/Documentos/GitHub/spark_dados_uber/uber.csv')

In [3]:
uberFile.head()

Unnamed: 0,dispatching_base_number,date,active_vehicles,trips
0,B02512,1/1/2015,190,1132
1,B02765,1/1/2015,225,1765
2,B02764,1/1/2015,3427,29421
3,B02682,1/1/2015,945,7679
4,B02617,1/1/2015,1228,9537


In [4]:
uberFile.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 354 entries, 0 to 353
Data columns (total 4 columns):
 #   Column                   Non-Null Count  Dtype 
---  ------                   --------------  ----- 
 0   dispatching_base_number  354 non-null    object
 1   date                     354 non-null    object
 2   active_vehicles          354 non-null    int64 
 3   trips                    354 non-null    int64 
dtypes: int64(2), object(2)
memory usage: 11.2+ KB


**Vamos transformar o Dataframe (Pandas) e um Dataframe (Spark)**

In [5]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\opt\\spark\\spark-3.1.1-bin-hadoop2.7'

In [6]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.rdd import RDD

conf = pyspark.SparkConf().setAppName('SparkApp').setMaster('local[*]')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
sc._conf.get('spark.driver.memory')

In [11]:
sqlContext = SQLContext(sc)

uberDF = sqlContext.createDataFrame(uberFile)

In [12]:
type(uberDF)

pyspark.sql.dataframe.DataFrame

**Após verificamos que a comunicação entre pandas e spark está funcionando vamos criar criar uma RDD apartir do arquivo .csv**

In [15]:
uberRDD = sc.textFile('C:/Users/skite/OneDrive/Documentos/GitHub/spark_dados_uber/uber.csv')
type(uberRDD)

pyspark.rdd.RDD

In [16]:
uberRDD.count()

355

In [17]:
uberRDD.first()

'dispatching_base_number,date,active_vehicles,trips'

**Dividindo o arquivo em coulunas, seperadas pelo caracter ','**

In [18]:
uberLinhas = uberRDD.map(lambda line: line.split(','))

In [19]:
type(uberLinhas)

pyspark.rdd.PipelinedRDD

**Note que após a transformação da split nossa RDD agora é uma Pipeline**

In [23]:
uberLinhas.take(10)

[['dispatching_base_number', 'date', 'active_vehicles', 'trips'],
 ['B02512', '1/1/2015', '190', '1132'],
 ['B02765', '1/1/2015', '225', '1765'],
 ['B02764', '1/1/2015', '3427', '29421'],
 ['B02682', '1/1/2015', '945', '7679'],
 ['B02617', '1/1/2015', '1228', '9537'],
 ['B02598', '1/1/2015', '870', '6903'],
 ['B02598', '1/2/2015', '785', '4768'],
 ['B02617', '1/2/2015', '1137', '7065'],
 ['B02512', '1/2/2015', '175', '875']]

In [25]:
uberLinhas.map(lambda linha: linha[0]).distinct().count()

7

In [27]:
uberLinhas.map(lambda linha: linha[0]).distinct().collect()

['dispatching_base_number',
 'B02765',
 'B02682',
 'B02598',
 'B02512',
 'B02764',
 'B02617']

**Testando o filtro de um elementos na coluna base**

In [28]:
uberLinhas.filter(lambda linha: 'B02617' in linha).count()

59

In [29]:
B02617_RDD = uberLinhas.filter(lambda linha: 'B02617' in linha)

**Fazendo um filtro na coluna trips**

In [30]:
B02617_RDD.filter(lambda linha: int(linha[3]) > 15000).count()

6

In [31]:
B02617_RDD.filter(lambda linha: int(linha[3]) > 15000).collect()

[['B02617', '1/31/2015', '1394', '15756'],
 ['B02617', '2/6/2015', '1526', '15417'],
 ['B02617', '2/13/2015', '1590', '16996'],
 ['B02617', '2/14/2015', '1486', '16999'],
 ['B02617', '2/20/2015', '1574', '16856'],
 ['B02617', '2/21/2015', '1443', '16098']]

**Agora vamos realizar algumas operações de MapReduce que é muito comum no Hadoop, mas que o Spark consegue fazer com mais rapidez e eficiencia.**

In [32]:
uberRDD2 = sc.textFile('C:/Users/skite/OneDrive/Documentos/GitHub/spark_dados_uber/uber.csv').filter(lambda line: 'base' not in line).map(lambda line:line.split(','))

**Criando um novo DataFrame com ele vamos remover a primeira linha que é o cabeçario.**

In [34]:
uberRDD2.map(lambda kp: (kp[0], int(kp[3]))).reduceByKey(lambda k,v: k + v).collect()

[('B02765', 193670),
 ('B02682', 662509),
 ('B02598', 540791),
 ('B02512', 93786),
 ('B02764', 1914449),
 ('B02617', 725025)]

**Note que agora para cada uma das base tem a sua quantidade de viagens (trips)**

In [35]:
uberRDD2.map(lambda kp: (kp[0], int(kp[3]))).reduceByKey(lambda k,v: k + v).takeOrdered(10, key = lambda x: -x[1])

[('B02764', 1914449),
 ('B02617', 725025),
 ('B02682', 662509),
 ('B02598', 540791),
 ('B02765', 193670),
 ('B02512', 93786)]

**Podemos usar a função TakeOrdered para fazer um Map reduce a diferença entre elas é que o take ordered nos retorna o resultado de forma ordenada, ou seja, em ordem crescente.**