In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from datetime import datetime
from collections import namedtuple
import dateutil.parser

In [2]:
#copiado os arquivos do s3 de s3://ifood-data-architect-test-source/ para o pessoal s3://databricks-dev-ifood-bucket
#aws s3 cp s3://ifood-data-architect-test-source/order.json.gz s3://databricks-dev-ifood-bucket/
#aws s3 cp s3://ifood-data-architect-test-source/restaurant.csv.gz s3://databricks-dev-ifood-bucket/
#aws s3 cp s3://ifood-data-architect-test-source/consumer.csv.gz s3://databricks-dev-ifood-bucket/


In [3]:
#TODO apagar as accesskey antes de subir
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 'coloqueAccessKey')
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", 'coloque SecretKey')

consumerPAth = 's3n://databricks-dev-ifood-bucket/consumer.csv.gz'
restaurantPath = 's3n://databricks-dev-ifood-bucket/restaurant.csv.gz'
orderPath = 's3n://databricks-dev-ifood-bucket/order.json.gz'


In [4]:
consumer = sc.textFile(consumerPAth)

In [5]:
consumer.take(2)

In [6]:
#remover o header
consumerNoHeader = consumer.filter(lambda x: 'customer_phone' not in x)

In [7]:
consumerNoHeader.take(5)

In [8]:
#criar uma namedtuple para o consumer para criar a estrutura do dado
fieldsConsumer = ('customer_id','language','created_at','active','customer_name','customer_phone_area','customer_phone_number')

Consumer = namedtuple('Consumer', fieldsConsumer)

def parseConsumer(row):
  row[2] = dateutil.parser.parse(row[2]).strftime('%Y-%m-%d %H:%M:%S')
  row[3] = int(row[3] == 'true')
  row[5] = int(row[5])
  row[6] = int(row[6])
  return Consumer(*row[:7])


In [9]:
#splitar o rdd por , e mapear pela estrutura
consumerParsed = consumerNoHeader.map(lambda x: x.split(',')).map(parseConsumer)

In [10]:
consumerParsed.take(5)

In [11]:
consumerDF = consumerParsed.toDF()
consumerDF.show(5)

In [12]:
#cria tabela temporaria para poder ser acessado via linha de comando sql
consumerDF.createOrReplaceTempView('consumer')
consumerDF.printSchema()

In [13]:
%sql select * from consumer limit 5;

customer_id,language,created_at,active,customer_name,customer_phone_area,customer_phone_number
00039466-560f-4e57-85a2-d4753cd901be,pt-br,2018-04-05 14:49:18,1,NUNO,46,816135924
001a1267-31a3-4f5b-a028-d7e323864b08,pt-br,2018-01-14 21:40:02,1,ADRIELLY,59,231330577
003ae1d5-67b8-4a04-b055-0e4e9622771a,pt-br,2018-01-07 03:47:15,1,PAULA,62,347597883
004629bf-c3fc-42f5-a133-fd34d2bd17fa,pt-br,2018-01-10 22:17:08,1,HELTON,13,719366842
00467336-6561-4406-b6f2-987b06e77401,pt-br,2018-04-06 00:16:20,1,WENDER,76,543232158


In [14]:
restaurant = sc.textFile(restaurantPath)

In [15]:
restaurant.take(5)

In [16]:
#remover o header
restaurantNoHeader = restaurant.filter(lambda x:'takeout_time' not in x)

In [17]:
restaurantNoHeader.take(5)

In [18]:
#mesma logica para o restaurante
fieldsRestaurant = ('id','created_at','enabled','price_range','average_ticket','takeout_time','delivery_time','minimum_order_value','merchant_zip_code','merchant_city','merchant_state','merchant_country')

Restaurant = namedtuple('Restaurant', fieldsRestaurant)

def parseRestaurant(row):
  row[1] = dateutil.parser.parse(row[1]).strftime('%Y-%m-%d %H:%M:%S')
  row[2] = int(row[2] == 'true')
  row[3] = int(row[3])
  row[4] = float(row[4])
  row[5] = int(row[5])
  row[6] = int(row[6])
  row[7] = float(row[7])
  row[8] = int(row[8])
  return Restaurant(*row[:12])

In [19]:
restaurantParsed = restaurantNoHeader.map(lambda x: x.split(',')).map(parseRestaurant)

In [20]:
restaurantParsed.take(5)

In [21]:
restaurantDF = restaurantParsed.toDF()
restaurantDF.show(5)

In [22]:
restaurantDF.createOrReplaceTempView('restaurant')
restaurantDF.printSchema()

In [23]:
%sql select * from restaurant limit 5;

id,created_at,enabled,price_range,average_ticket,takeout_time,delivery_time,minimum_order_value,merchant_zip_code,merchant_city,merchant_state,merchant_country
02c94103-61f3-4906-a4a9-55611db9f28c,2017-01-23 12:52:30,0,3,60.0,0,50,30.0,14025,RIBEIRAO PRETO,SP,BR
15e7f5fd-090d-47b9-9f14-b6f7fce3c95d,2017-01-20 13:14:48,1,3,60.0,0,0,30.0,50180,SAO PAULO,SP,BR
33ca5d3d-b99f-404d-84d9-8df8f38a2261,2017-01-23 12:46:33,1,5,100.0,0,45,10.0,23090,RIO DE JANEIRO,RJ,BR
4927035f-a343-4a65-a9be-945818e2efff,2017-01-20 13:15:04,1,3,80.0,0,0,18.9,40255,SALVADOR,BA,BR
52feaad8-4961-4afc-8d60-3f29ffd0a7a7,2017-01-20 13:14:27,1,3,60.0,0,0,25.0,64600,BARUERI,SP,BR


In [24]:
order = sqlContext.read.json(orderPath)

In [25]:
order.take(5)

In [26]:
order.printSchema()

In [27]:
#criar estrutura para order
orderDF = order.withColumn("cpf", order["cpf"].cast(IntegerType()))\
.withColumn("customer_id", order["customer_id"].cast(StringType()))\
.withColumn("customer_name", order["customer_name"].cast(StringType()))\
.withColumn("delivery_address_city", order["delivery_address_city"].cast(StringType()))\
.withColumn("delivery_address_country", order["delivery_address_country"].cast(StringType()))\
.withColumn("delivery_address_district", order["delivery_address_district"].cast(StringType()))\
.withColumn("delivery_address_external_id", order["delivery_address_external_id"].cast(IntegerType()))\
.withColumn("delivery_address_latitude", order["delivery_address_latitude"].cast(FloatType()))\
.withColumn("delivery_address_longitude", order["delivery_address_longitude"].cast(FloatType()))\
.withColumn("delivery_address_state", order["delivery_address_state"].cast(StringType()))\
.withColumn("delivery_address_zip_code", order["delivery_address_zip_code"].cast(IntegerType()))\
.withColumn("items", order["items"].cast(StringType()))\
.withColumn("merchant_id", order["merchant_id"].cast(StringType()))\
.withColumn("merchant_latitude", order["merchant_latitude"].cast(FloatType()))\
.withColumn("merchant_timezone", order["merchant_timezone"].cast(StringType()))\
.withColumn("order_created_at", order["order_created_at"].cast(StringType()))\
.withColumn("order_id", order["order_id"].cast(StringType()))\
.withColumn("order_scheduled", order["order_scheduled"].cast(BooleanType()))\
.withColumn("order_scheduled_date", order["order_scheduled_date"].cast(StringType()))\
.withColumn("origin_platform", order["origin_platform"].cast(StringType()))

In [28]:
orderDF.printSchema()

In [29]:
orderDF.createOrReplaceTempView('order')

In [30]:
%sql select * from order limit 5;

cpf,customer_id,customer_name,delivery_address_city,delivery_address_country,delivery_address_district,delivery_address_external_id,delivery_address_latitude,delivery_address_longitude,delivery_address_state,delivery_address_zip_code,items,merchant_id,merchant_latitude,merchant_longitude,merchant_timezone,order_created_at,order_id,order_scheduled,order_scheduled_date,order_total_amount,origin_platform
,977b9a89-825f-464b-8ef6-0f453d7334c1,GUSTAVO,FRANCA,BR,JARDIM ESPRAIADO,6736655,-47.39,-20.55,SP,14403,"[{""name"": ""Parmegiana de Filé de Frango (2 pessoas)"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 1, ""unitPrice"": {""value"": ""2800"", ""currency"": ""BRL""}, ""externalId"": ""0bcd6764fd5e466d9c04b18ac0eb69e6"", ""totalValue"": {""value"": ""2800"", ""currency"": ""BRL""}, ""customerNote"": null, ""garnishItems"": [{""name"": ""COM Arroz branco"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 2, ""unitPrice"": {""value"": ""0"", ""currency"": ""BRL""}, ""categoryId"": ""13HDH"", ""externalId"": ""384bd2b4eb7d454d8e0274e7d590ab4f"", ""totalValue"": {""value"": ""0"", ""currency"": ""BRL""}, ""categoryName"": ""PERSONALIZAR"", ""integrationId"": null}], ""integrationId"": ""PMFR"", ""totalAddition"": {""value"": ""0"", ""currency"": ""BRL""}, ""totalDiscount"": {""value"": ""0"", ""currency"": ""BRL""}}, {""name"": ""Lasanha Frango (2 pessoas)"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 3, ""unitPrice"": {""value"": ""1800"", ""currency"": ""BRL""}, ""externalId"": ""a361f5eec6a44ac0817892e81bf22e80"", ""totalValue"": {""value"": ""1800"", ""currency"": ""BRL""}, ""customerNote"": null, ""garnishItems"": [], ""integrationId"": ""LF"", ""totalAddition"": {""value"": ""0"", ""currency"": ""BRL""}, ""totalDiscount"": {""value"": ""0"", ""currency"": ""BRL""}}]",eb4197f9-964c-4f87-8307-709e498aab87,-47.39,-20.55,America/Sao_Paulo,2019-01-17T22:50:06.000Z,dd4f8f0a-c2cb-45c6-a002-c3be6b305e5f,False,,46.0,ANDROID
,e969cc0d-388b-4025-9351-0db0f718d81c,MICHELLE,SANTOS,BR,CAMPO GRANDE,8759216,-46.34,-23.96,SP,11070,"[{""name"": ""Filé Mignon à Cubana"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 1, ""unitPrice"": {""value"": ""0"", ""currency"": ""BRL""}, ""externalId"": ""e0e81b2027c241cca5bf60a4768800ec"", ""totalValue"": {""value"": ""0"", ""currency"": ""BRL""}, ""customerNote"": null, ""garnishItems"": [{""name"": ""334 - 1/2 porção "", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 2, ""unitPrice"": {""value"": ""7350"", ""currency"": ""BRL""}, ""categoryId"": ""1J3T0"", ""externalId"": ""b869c10747d1417d8968d7e3e1bd7de4"", ""totalValue"": {""value"": ""7350"", ""currency"": ""BRL""}, ""categoryName"": ""escolha sua porção "", ""integrationId"": null}], ""integrationId"": null, ""totalAddition"": {""value"": ""0"", ""currency"": ""BRL""}, ""totalDiscount"": {""value"": ""0"", ""currency"": ""BRL""}}, {""name"": ""603- Pudim de leite"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 4, ""unitPrice"": {""value"": ""800"", ""currency"": ""BRL""}, ""externalId"": ""4436f76edc164052aaf3c21a6567b570"", ""totalValue"": {""value"": ""800"", ""currency"": ""BRL""}, ""customerNote"": null, ""garnishItems"": [], ""integrationId"": null, ""totalAddition"": {""value"": ""0"", ""currency"": ""BRL""}, ""totalDiscount"": {""value"": ""0"", ""currency"": ""BRL""}}, {""name"": ""601-Torta Holandesa"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 5, ""unitPrice"": {""value"": ""800"", ""currency"": ""BRL""}, ""externalId"": ""c0b788e876c54f198307dc142fed7aff"", ""totalValue"": {""value"": ""800"", ""currency"": ""BRL""}, ""customerNote"": null, ""garnishItems"": [], ""integrationId"": null, ""totalAddition"": {""value"": ""0"", ""currency"": ""BRL""}, ""totalDiscount"": {""value"": ""0"", ""currency"": ""BRL""}}, {""name"": ""513 - Guarnição Completa II"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 3, ""unitPrice"": {""value"": ""1500"", ""currency"": ""BRL""}, ""externalId"": ""385fb04a358340878b14116df9970e46"", ""totalValue"": {""value"": ""1500"", ""currency"": ""BRL""}, ""customerNote"": null, ""garnishItems"": [], ""integrationId"": null, ""totalAddition"": {""value"": ""0"", ""currency"": ""BRL""}, ""totalDiscount"": {""value"": ""0"", ""currency"": ""BRL""}}]",927d46f9-4bb3-48f7-be1d-584deaf18adc,-46.34,-23.96,America/Sao_Paulo,2019-01-17T17:51:26.000Z,8dd80f0b-db00-4b88-b7e2-02ca706fc5a5,False,,104.5,ANDROID
,e08dcc8b-f998-405e-b3f2-7107ea8958cf,VICTOR,GUARULHOS,BR,JARDIM ROSSI,8765930,-46.53,-23.44,SP,71304,"[{""name"": ""GRANDE 2 SABORES"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 1, ""unitPrice"": {""value"": ""0"", ""currency"": ""BRL""}, ""externalId"": ""87a24efdd88c4b7b8bf1132cabfd4195"", ""totalValue"": {""value"": ""0"", ""currency"": ""BRL""}, ""customerNote"": null, ""garnishItems"": [{""name"": ""TRADICIONAL"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 2, ""unitPrice"": {""value"": ""0"", ""currency"": ""BRL""}, ""categoryId"": ""0025"", ""externalId"": ""2b89a4d4d6f14541a8830e5808d1f09f"", ""totalValue"": {""value"": ""0"", ""currency"": ""BRL""}, ""categoryName"": ""Escolha a sua Preferência"", ""integrationId"": null}, {""name"": ""1/2 21 - PIZZA FRANGO COM BACON E CATUPIRY "", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 3, ""unitPrice"": {""value"": ""1750"", ""currency"": ""BRL""}, ""categoryId"": ""SABOR"", ""externalId"": ""c7f63cc58a3643c7aa3f24858c7029f4"", ""totalValue"": {""value"": ""1750"", ""currency"": ""BRL""}, ""categoryName"": ""Escolha um sabor"", ""integrationId"": null}, {""name"": ""1/2 21 - PIZZA FRANGO COM BACON E CATUPIRY "", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 3, ""unitPrice"": {""value"": ""1750"", ""currency"": ""BRL""}, ""categoryId"": ""SABOR2"", ""externalId"": ""c7f63cc58a3643c7aa3f24858c7029f4"", ""totalValue"": {""value"": ""1750"", ""currency"": ""BRL""}, ""categoryName"": ""Escolha o segundo sabor"", ""integrationId"": null}, {""name"": ""1/2 29 - PIZZA À MODA DA CASA"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 4, ""unitPrice"": {""value"": ""1750"", ""currency"": ""BRL""}, ""categoryId"": ""SABOR"", ""externalId"": ""0dcd4a195d6243ec806eb5a29682c797"", ""totalValue"": {""value"": ""1750"", ""currency"": ""BRL""}, ""categoryName"": ""Escolha um sabor"", ""integrationId"": null}, {""name"": ""1/2 29 - PIZZA À MODA DA CASA"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 4, ""unitPrice"": {""value"": ""1750"", ""currency"": ""BRL""}, ""categoryId"": ""SABOR2"", ""externalId"": ""0dcd4a195d6243ec806eb5a29682c797"", ""totalValue"": {""value"": ""1750"", ""currency"": ""BRL""}, ""categoryName"": ""Escolha o segundo sabor"", ""integrationId"": null}], ""integrationId"": null, ""totalAddition"": {""value"": ""0"", ""currency"": ""BRL""}, ""totalDiscount"": {""value"": ""0"", ""currency"": ""BRL""}}]",71ad62c5-5947-4518-9846-976fbdd2f881,-46.53,-23.44,America/Sao_Paulo,2019-01-17T22:53:47.000Z,430f9887-a563-45ee-8001-1cb29597d9dd,False,,35.0,IOS
,6b40a7cc-de6a-4456-ada3-66d67453b88e,ANNIE,SAO PAULO,BR,PARQUE SAO JORGE,7834087,-46.57,-23.53,SP,30870,"[{""name"": ""CALABRESA"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 1, ""unitPrice"": {""value"": ""2040"", ""currency"": ""BRL""}, ""externalId"": ""5b598957c69a4aa68e02a20e3ce617a1"", ""totalValue"": {""value"": ""2040"", ""currency"": ""BRL""}, ""customerNote"": null, ""garnishItems"": [], ""integrationId"": null, ""totalAddition"": {""value"": ""0"", ""currency"": ""BRL""}, ""totalDiscount"": {""value"": ""0"", ""currency"": ""BRL""}}, {""name"": ""CHEESE SALADA"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 2, ""unitPrice"": {""value"": ""2040"", ""currency"": ""BRL""}, ""externalId"": ""3a64b961d342467ea23792556c99f7c2"", ""totalValue"": {""value"": ""2040"", ""currency"": ""BRL""}, ""customerNote"": ""Sem tomate"", ""garnishItems"": [], ""integrationId"": null, ""totalAddition"": {""value"": ""0"", ""currency"": ""BRL""}, ""totalDiscount"": {""value"": ""0"", ""currency"": ""BRL""}}]",27d71c8f-fa45-41a6-a964-dc8aefbd4540,-46.57,-23.53,America/Sao_Paulo,2019-01-17T23:56:53.000Z,1294917e-2247-4891-a183-4a4d4a3fa356,False,,40.8,IOS
,647469d8-e0f8-4fce-8fa9-eafd1802ccf1,DANIEL,VITORIA,BR,JARDIM CAMBURI,7211683,-40.27,-20.25,ES,29090,"[{""name"": ""GRANDE (35CM) 8 PDÇ 2 SABORES"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 1, ""unitPrice"": {""value"": ""0"", ""currency"": ""BRL""}, ""externalId"": ""83127d50665f4a269febd427eb49306b"", ""totalValue"": {""value"": ""0"", ""currency"": ""BRL""}, ""customerNote"": null, ""garnishItems"": [{""name"": ""BORDA CATUPIRY GRÁTIS "", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 2, ""unitPrice"": {""value"": ""0"", ""currency"": ""BRL""}, ""categoryId"": ""0025"", ""externalId"": ""e9a71c078a6045eabb09bd4d38b991c6"", ""totalValue"": {""value"": ""0"", ""currency"": ""BRL""}, ""categoryName"": ""Escolha a sua Preferência"", ""integrationId"": null}, {""name"": ""1/2 MISTA"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 3, ""unitPrice"": {""value"": ""2425"", ""currency"": ""BRL""}, ""categoryId"": ""SABOR"", ""externalId"": ""46510932dfd448af910ae16e112a8d3a"", ""totalValue"": {""value"": ""2425"", ""currency"": ""BRL""}, ""categoryName"": ""Escolha um sabor"", ""integrationId"": null}, {""name"": ""1/2 MISTA"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 3, ""unitPrice"": {""value"": ""2425"", ""currency"": ""BRL""}, ""categoryId"": ""SABOR2"", ""externalId"": ""46510932dfd448af910ae16e112a8d3a"", ""totalValue"": {""value"": ""2425"", ""currency"": ""BRL""}, ""categoryName"": ""Escolha o segundo sabor"", ""integrationId"": null}, {""name"": ""1/2 À MODA CAPIXABA"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 4, ""unitPrice"": {""value"": ""2425"", ""currency"": ""BRL""}, ""categoryId"": ""SABOR"", ""externalId"": ""5be89603526f4c42a6d9ac9e05f8a19a"", ""totalValue"": {""value"": ""2425"", ""currency"": ""BRL""}, ""categoryName"": ""Escolha um sabor"", ""integrationId"": null}, {""name"": ""1/2 À MODA CAPIXABA"", ""addition"": {""value"": ""0"", ""currency"": ""BRL""}, ""discount"": {""value"": ""0"", ""currency"": ""BRL""}, ""quantity"": 1.00, ""sequence"": 4, ""unitPrice"": {""value"": ""2425"", ""currency"": ""BRL""}, ""categoryId"": ""SABOR2"", ""externalId"": ""5be89603526f4c42a6d9ac9e05f8a19a"", ""totalValue"": {""value"": ""2425"", ""currency"": ""BRL""}, ""categoryName"": ""Escolha o segundo sabor"", ""integrationId"": null}], ""integrationId"": null, ""totalAddition"": {""value"": ""0"", ""currency"": ""BRL""}, ""totalDiscount"": {""value"": ""0"", ""currency"": ""BRL""}}]",06764c46-8a87-490e-9b8d-e9310d0d6822,-40.27,-20.25,America/Sao_Paulo,2019-01-17T23:40:53.000Z,34755d74-781a-461b-8564-45a4e8d99dbd,False,,48.5,ANDROID


In [31]:
#kafka - ler os producers, adicionado maxOffsetsPerTrigger com 10, pois estava estourando a memoria
#topico produzindo as mesangens de-order-status-events
kafkaDF = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "a49784be7f36511e9a6b60a341003dc2-1378330561.us-east-1.elb.amazonaws.com:9092, a4996369ef36511e9a6b60a341003dc2-1583999828.us-east-1.elb.amazonaws.com:9092") \
  .option("subscribe", "de-order-status-events") \
  .option("startingOffsets", "earliest") \
  .option("maxOffsetsPerTrigger", 10) \
  .load()

In [32]:
kafkaDF.printSchema()

In [33]:
#ler a chave e valor por serem do tipo binary e cast para string
kafkaValuesDF = kafkaDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [34]:
#gravar em disco as mensagens, tempo por 10 segundos
query = kafkaValuesDF.writeStream \
.option("path", "dbfs:/saida/") \
.option("checkpointLocation", "dbfs:/ifood/") \
 .format("parquet") \
 .start() 

import time
time.sleep(5)
query.stop()

In [35]:
%fs ls /saida

path,name,size
dbfs:/saida/_spark_metadata/,_spark_metadata/,0
dbfs:/saida/part-00000-0825c241-9cb9-49ab-9154-ed13567914f5-c000.snappy.parquet,part-00000-0825c241-9cb9-49ab-9154-ed13567914f5-c000.snappy.parquet,1674
dbfs:/saida/part-00000-10abd6e6-945f-43cc-834a-55bafef33db6-c000.snappy.parquet,part-00000-10abd6e6-945f-43cc-834a-55bafef33db6-c000.snappy.parquet,1868
dbfs:/saida/part-00000-1f952a03-5d02-4826-92b3-c3620ee435a8-c000.snappy.parquet,part-00000-1f952a03-5d02-4826-92b3-c3620ee435a8-c000.snappy.parquet,1829
dbfs:/saida/part-00000-4727ade8-5a7d-46c7-ae91-d16f19c4c06c-c000.snappy.parquet,part-00000-4727ade8-5a7d-46c7-ae91-d16f19c4c06c-c000.snappy.parquet,1824
dbfs:/saida/part-00000-60896883-83bb-4626-ad71-4921300ed9c1-c000.snappy.parquet,part-00000-60896883-83bb-4626-ad71-4921300ed9c1-c000.snappy.parquet,1712
dbfs:/saida/part-00000-68fb181b-fbc9-4a96-8413-b5f1f01df6f5-c000.snappy.parquet,part-00000-68fb181b-fbc9-4a96-8413-b5f1f01df6f5-c000.snappy.parquet,2031
dbfs:/saida/part-00000-79a6360b-a355-420c-9307-c8e145f08367-c000.snappy.parquet,part-00000-79a6360b-a355-420c-9307-c8e145f08367-c000.snappy.parquet,1829
dbfs:/saida/part-00000-9e0e530d-2a69-4ee3-ad0f-875812f8ad16-c000.snappy.parquet,part-00000-9e0e530d-2a69-4ee3-ad0f-875812f8ad16-c000.snappy.parquet,1684
dbfs:/saida/part-00000-c4c9abd0-56a2-49d3-a8f6-4aef711f0857-c000.snappy.parquet,part-00000-c4c9abd0-56a2-49d3-a8f6-4aef711f0857-c000.snappy.parquet,1829


In [36]:
%fs rm -r /saida/_spark_metadata

In [37]:
#carrega os arquivos em um dataframe
parquetDF = sqlContext.read.parquet("/saida")

display(parquetDF)

key,value
,"{""created_at"":""2019-01-06T16:20:27.000Z"",""order_id"":""0013fc5c-4c10-4402-886c-1b8166e4632e"",""status_id"":""d0a3ffd5-4e48-4cc4-9739-d5764678c19f"",""value"":""CONCLUDED""}"
,"{""created_at"":""2019-01-05T21:40:22.000Z"",""order_id"":""001a3efd-debc-414e-83ca-e7ba6945aed6"",""status_id"":""59ed8c63-41c2-4bc7-acdd-de7ce85e0cf3"",""value"":""CONCLUDED""}"
,"{""created_at"":""2019-01-05T19:38:32.000Z"",""order_id"":""001a3efd-debc-414e-83ca-e7ba6945aed6"",""status_id"":""b06594e2-76a8-4e0b-b3bc-526145e86e64"",""value"":""PLACED""}"
,"{""created_at"":""2019-01-05T19:38:31.000Z"",""order_id"":""001a3efd-debc-414e-83ca-e7ba6945aed6"",""status_id"":""259d7d3f-9a24-4f4f-b7d0-df5b333707ad"",""value"":""REGISTERED""}"
,"{""created_at"":""2019-01-30T04:00:07.000Z"",""order_id"":""001d98c0-2741-472d-a2bc-16b5cee1a0e1"",""status_id"":""dd7356e0-2a11-4053-ab35-a4b077fcd24a"",""value"":""CONCLUDED""}"
,"{""created_at"":""2019-01-30T01:59:31.000Z"",""order_id"":""001d98c0-2741-472d-a2bc-16b5cee1a0e1"",""status_id"":""803b899c-8249-4064-9fc4-51845630b76b"",""value"":""REGISTERED""}"
,"{""created_at"":""2019-01-30T01:59:32.000Z"",""order_id"":""001d98c0-2741-472d-a2bc-16b5cee1a0e1"",""status_id"":""0df5ba65-cb07-44c7-8715-f83ed6cb1f33"",""value"":""PLACED""}"
,"{""created_at"":""2019-01-16T00:07:50.000Z"",""order_id"":""001f93c8-5228-4085-9909-32a855c02a98"",""status_id"":""39fd2072-2269-4c98-b21c-fb4f93b197ee"",""value"":""PLACED""}"
,"{""created_at"":""2019-01-16T00:07:49.000Z"",""order_id"":""001f93c8-5228-4085-9909-32a855c02a98"",""status_id"":""7a02de81-1999-488b-b0aa-eb49850570ba"",""value"":""REGISTERED""}"
,"{""created_at"":""2019-01-16T02:10:10.000Z"",""order_id"":""001f93c8-5228-4085-9909-32a855c02a98"",""status_id"":""71956d3b-4d9d-44b4-91ff-6686813c7084"",""value"":""CONCLUDED""}"


In [38]:
#criar schema com alguns campos do order events
schema = StructType() \
.add('created_at', TimestampType()) \
.add('order_id', StringType()) \
.add('status_id', StringType()) \
.add('value', StringType())

In [39]:
#cria um DF com a estrutura do schema
orderEvents = parquetDF\
        .select(from_json(col("value"), schema).alias("orderEvents"))

In [40]:
#remove o format json do dado
orderEventsDF = orderEvents.select("orderEvents.*")

orderEventsDF.show(5)

In [41]:
orderEventsDF.createOrReplaceTempView('orderEvents')

In [42]:
%sql select * from orderEvents limit 5

created_at,order_id,status_id,value
2019-01-06T16:20:27.000+0000,0013fc5c-4c10-4402-886c-1b8166e4632e,d0a3ffd5-4e48-4cc4-9739-d5764678c19f,CONCLUDED
2019-01-05T21:40:22.000+0000,001a3efd-debc-414e-83ca-e7ba6945aed6,59ed8c63-41c2-4bc7-acdd-de7ce85e0cf3,CONCLUDED
2019-01-05T19:38:32.000+0000,001a3efd-debc-414e-83ca-e7ba6945aed6,b06594e2-76a8-4e0b-b3bc-526145e86e64,PLACED
2019-01-05T19:38:31.000+0000,001a3efd-debc-414e-83ca-e7ba6945aed6,259d7d3f-9a24-4f4f-b7d0-df5b333707ad,REGISTERED
2019-01-30T04:00:07.000+0000,001d98c0-2741-472d-a2bc-16b5cee1a0e1,dd7356e0-2a11-4053-ab35-a4b077fcd24a,CONCLUDED


In [43]:
#Até esse ponto, acredito que foi entrega o ETL building
#analise pode ser feito em SQL
#dado está estruturado. Coluna items foi deixado o json dentro ao invés de fazer o explode em varias colunas, pois náo será usada
#para anaálise
#ETL esta utilizando pyspark para auto scaling

In [44]:

%sql select delivery_address_city,delivery_address_state, substr(order_created_at,1,10), count(delivery_address_city) from order group by delivery_address_city,delivery_address_state, substr(order_created_at,1,10) order by 4 desc limit 10;

delivery_address_city,delivery_address_state,"substring(order_created_at, 1, 10)",count(delivery_address_city)
SAO PAULO,SP,2019-01-29,31137
SAO PAULO,SP,2019-01-15,28490
SAO PAULO,SP,2019-01-22,27405
SAO PAULO,SP,2019-01-10,26762
SAO PAULO,SP,2019-01-28,25577
SAO PAULO,SP,2019-01-30,25477
SAO PAULO,SP,2019-01-09,25133
SAO PAULO,SP,2019-01-21,24440
SAO PAULO,SP,2019-01-14,24016
SAO PAULO,SP,2019-01-25,23247


In [45]:
#Select para trazer o count de ordens por cidade e estado
#Count of orders per day for each city and state in our database
ordersCityAndStateSql = sqlContext.sql(""" select delivery_address_city,delivery_address_state, substr(order_created_at,1,10) as order_created_at, count(delivery_address_city) as quantidade from order group by delivery_address_city,delivery_address_state, substr(order_created_at,1,10) order by 4 desc """)

In [46]:
ordersCityAndStateSql.show(5)

In [47]:
%sql  select r.customer_id, r.merchant_id from (
select customer_id, merchant_id, rank() over (partition by customer_id order by count_merchant desc ) as rank from (         
      select customer_id, merchant_id, count(merchant_id) as count_merchant from order  group by customer_id, merchant_id order by 3 desc )) r where rank <= 10 limit 20       
                

customer_id,merchant_id
00039466-560f-4e57-85a2-d4753cd901be,d0a9153a-44dd-4414-9e8d-eb18b79f4b46
00039466-560f-4e57-85a2-d4753cd901be,a8636ed6-de84-4c8a-8814-cc596a6baa53
00039466-560f-4e57-85a2-d4753cd901be,0852aa91-5cc8-4218-82d3-6cb7a10b7c3b
00039466-560f-4e57-85a2-d4753cd901be,e9509550-65f4-4b18-8991-484a848e3b01
00039466-560f-4e57-85a2-d4753cd901be,606123db-a3d4-4158-8040-e3a1fa8bd16b
00039466-560f-4e57-85a2-d4753cd901be,8e8ac07c-e366-4e45-809f-2745a2e6a6e4
00039466-560f-4e57-85a2-d4753cd901be,3f5b1137-ed98-4105-aa92-893b04741684
001a1267-31a3-4f5b-a028-d7e323864b08,e36b52b5-f1a8-4b55-b077-814634068c84
001a1267-31a3-4f5b-a028-d7e323864b08,e35819a6-3968-4c1f-88a9-26178de9b9c8
001a1267-31a3-4f5b-a028-d7e323864b08,be11e8c3-6901-4f8a-affb-f1c243d682d2


In [48]:
%sql select customer_id, merchant_id, count(merchant_id) from order where customer_id ='001a1267-31a3-4f5b-a028-d7e323864b08' group by customer_id, merchant_id order by 3 desc

customer_id,merchant_id,count(merchant_id)
001a1267-31a3-4f5b-a028-d7e323864b08,e36b52b5-f1a8-4b55-b077-814634068c84,3
001a1267-31a3-4f5b-a028-d7e323864b08,be11e8c3-6901-4f8a-affb-f1c243d682d2,2
001a1267-31a3-4f5b-a028-d7e323864b08,16bb7fb2-0f6b-4810-badf-5f6ed776f37a,2
001a1267-31a3-4f5b-a028-d7e323864b08,ef7495ef-776c-46b2-a750-35e54578f9a5,2
001a1267-31a3-4f5b-a028-d7e323864b08,e35819a6-3968-4c1f-88a9-26178de9b9c8,2
001a1267-31a3-4f5b-a028-d7e323864b08,8ab3be07-b346-41cc-bb63-89e806516432,1
001a1267-31a3-4f5b-a028-d7e323864b08,95630aa6-b916-42f5-8ab2-ac0e5590f3f0,1
001a1267-31a3-4f5b-a028-d7e323864b08,b746a05a-592f-4c4c-af53-408f7fa1d75e,1
001a1267-31a3-4f5b-a028-d7e323864b08,78b24fd3-a9bf-4d8c-8b11-384d7c652413,1
001a1267-31a3-4f5b-a028-d7e323864b08,c2deb290-3fa0-4a6d-8d7b-111ea10644fa,1


In [49]:
#Select para trazer os 10 restaurantes por cliente
#Top 10 restaurants per customer
top10RestaurantCustomerSql = sqlContext.sql(""" select r.customer_id, r.merchant_id from (
select customer_id, merchant_id, rank() over (partition by customer_id order by count_merchant desc ) as rank from (         
      select customer_id, merchant_id, count(merchant_id) as count_merchant from order  group by customer_id, merchant_id order by 3 desc )) r where rank <= 10  """)

In [50]:
top10RestaurantCustomerSql.show(5)

In [51]:
ordersCityAndState.take(5)

In [52]:
ordersCityAndStateSql.write.format('json').save('/arquivos/pedidos.JSON')

In [53]:
top10RestaurantCustomerSql.write.format('json').save('/arquivos/top10.JSON')