In [1]:
import os, socket
import pyspark

conf = pyspark.SparkConf()
conf.set("spark.sql.repl.eagerEval.enabled", True)
conf.set("spark.executor.memory", "8g")
conf.set("spark.driver.memory", "8g")

sc = pyspark.SparkContext('local[*]', conf=conf)
sc

In [2]:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "minioadmin")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "minioadmin")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://{}:9000".format(socket.gethostbyname_ex('minio-s3')[2][0]))
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")

sqlc = pyspark.sql.SQLContext(sc)

# testing s3 connection (minio service)-
sqlc.read.parquet('s3a://raw-data/restaurant.csv.parquet').limit(3)

id,created_at,enabled,price_range,average_ticket,takeout_time,delivery_time,minimum_order_value,merchant_zip_code,merchant_city,merchant_state,merchant_country
02c94103-61f3-490...,2017-01-23T12:52:...,False,3,60.0,0,50,30.0,14025,RIBEIRAO PRETO,SP,BR
15e7f5fd-090d-47b...,2017-01-20T13:14:...,True,3,60.0,0,0,30.0,50180,SAO PAULO,SP,BR
33ca5d3d-b99f-404...,2017-01-23T12:46:...,True,5,100.0,0,45,10.0,23090,RIO DE JANEIRO,RJ,BR


# Exploratory Analysis

## Format reading benchmarking


In [3]:
%%time
df_order_parquet = sqlc.read.parquet('s3a://raw-data/order.json.parquet/')

CPU times: user 2.24 ms, sys: 0 ns, total: 2.24 ms
Wall time: 146 ms


In [7]:
df_order_parquet.limit(3)

cpf,customer_id,customer_name,delivery_address_city,delivery_address_country,delivery_address_district,delivery_address_external_id,delivery_address_latitude,delivery_address_longitude,delivery_address_state,delivery_address_zip_code,items,merchant_id,merchant_latitude,merchant_longitude,merchant_timezone,order_created_at,order_id,order_scheduled,order_total_amount,origin_platform,order_scheduled_date
80532101763,977b9a89-825f-464...,GUSTAVO,FRANCA,BR,JARDIM ESPRAIADO,6736655,-47.39,-20.55,SP,14403,"[{""name"": ""Parmeg...",eb4197f9-964c-4f8...,-47.39,-20.55,America/Sao_Paulo,2019-01-17T22:50:...,dd4f8f0a-c2cb-45c...,False,46.0,ANDROID,
43352103961,e969cc0d-388b-402...,MICHELLE,SANTOS,BR,CAMPO GRANDE,8759216,-46.34,-23.96,SP,11070,"[{""name"": ""Filé M...",927d46f9-4bb3-48f...,-46.34,-23.96,America/Sao_Paulo,2019-01-17T17:51:...,8dd80f0b-db00-4b8...,False,104.5,ANDROID,
38650991217,e08dcc8b-f998-405...,VICTOR,GUARULHOS,BR,JARDIM ROSSI,8765930,-46.53,-23.44,SP,71304,"[{""name"": ""GRANDE...",71ad62c5-5947-451...,-46.53,-23.44,America/Sao_Paulo,2019-01-17T22:53:...,430f9887-a563-45e...,False,35.0,IOS,


In [4]:
%%time
df_order_json = sqlc.read.json('s3a://raw-data/order.json/')

CPU times: user 8.15 ms, sys: 1.02 ms, total: 9.17 ms
Wall time: 42.6 s


In [5]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

## Loading & exploring main dataframes

In [6]:
def describe_dataframe(df):
    
    raw_count=df.count()
    no_duplicates_count = df.dropDuplicates().count()
    diff_count = raw_count - no_duplicates_count
    percent_duplicates = no_duplicates_count/raw_count * 100
    print("Raw   rows: {}".format(raw_count))
    print("Nodup rows: {}".format(no_duplicates_count))
    print("Diff      : {} ({} % not dup.)".format(diff_count, percent_duplicates))

### Restaurant

In [7]:
df_restaurant = sqlc.read.parquet('s3a://raw-data/restaurant.csv.parquet/')

df_restaurant.printSchema()
describe_dataframe(df_restaurant)
df_restaurant.limit(3)

root
 |-- id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- enabled: boolean (nullable = true)
 |-- price_range: integer (nullable = true)
 |-- average_ticket: double (nullable = true)
 |-- takeout_time: integer (nullable = true)
 |-- delivery_time: integer (nullable = true)
 |-- minimum_order_value: double (nullable = true)
 |-- merchant_zip_code: integer (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- merchant_state: string (nullable = true)
 |-- merchant_country: string (nullable = true)

Raw   rows: 7292
Nodup rows: 7292
Diff      : 0 (100.0 % not dup.)


id,created_at,enabled,price_range,average_ticket,takeout_time,delivery_time,minimum_order_value,merchant_zip_code,merchant_city,merchant_state,merchant_country
02c94103-61f3-490...,2017-01-23T12:52:...,False,3,60.0,0,50,30.0,14025,RIBEIRAO PRETO,SP,BR
15e7f5fd-090d-47b...,2017-01-20T13:14:...,True,3,60.0,0,0,30.0,50180,SAO PAULO,SP,BR
33ca5d3d-b99f-404...,2017-01-23T12:46:...,True,5,100.0,0,45,10.0,23090,RIO DE JANEIRO,RJ,BR


### Order Statuses

In [8]:
df_status = sqlc.read.parquet('s3a://raw-data/status.json.parquet/')

df_status.printSchema()
describe_dataframe(df_status)
df_status.filter(df_status.order_id == "bb779eab-d791-482a-94f5-abd89ee52002").limit(5).show(5, False)

root
 |-- order_id: string (nullable = true)
 |-- status_id: string (nullable = true)
 |-- value: string (nullable = true)
 |-- created_at: string (nullable = true)

Raw   rows: 11075048
Nodup rows: 7340326
Diff      : 3734722 (66.27805134569168 % not dup.)
+------------------------------------+------------------------------------+----------+------------------------+
|order_id                            |status_id                           |value     |created_at              |
+------------------------------------+------------------------------------+----------+------------------------+
|bb779eab-d791-482a-94f5-abd89ee52002|fbd8d04d-d5bf-4fe9-9f09-02c06fc6a7c4|REGISTERED|2019-01-31T23:59:59.000Z|
|bb779eab-d791-482a-94f5-abd89ee52002|9828451d-aae0-4c0f-a373-e691e42bc73b|PLACED    |2019-01-02T00:00:00.000Z|
|bb779eab-d791-482a-94f5-abd89ee52002|751d2263-6718-4aa9-8e84-e1039ee772eb|CONCLUDED |2019-01-02T02:00:07.000Z|
+------------------------------------+--------------------------------

### Consumer

In [12]:
df_consumer = sqlc.read.csv('s3a://raw-data/consumer.csv/', header=True)

df_consumer.printSchema()
describe_dataframe(df_consumer)
df_consumer.limit(3)

root
 |-- customer_id: string (nullable = true)
 |-- language: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- active: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- customer_phone_area: string (nullable = true)
 |-- customer_phone_number: string (nullable = true)

Raw   rows: 809323
Nodup rows: 809323
Diff      : 0 (100.0 % not dup.)


customer_id,language,created_at,active,customer_name,customer_phone_area,customer_phone_number
00039466-560f-4e5...,pt-br,2018-04-05T14:49:...,True,NUNO,46,816135924
001a1267-31a3-4f5...,pt-br,2018-01-14T21:40:...,True,ADRIELLY,59,231330577
003ae1d5-67b8-4a0...,pt-br,2018-01-07T03:47:...,True,PAULA,62,347597883


### Order

In [13]:
df_order = sqlc.read.parquet('s3a://raw-data/order.json.parquet/')

df_order.printSchema()
describe_dataframe(df_order)
describe_dataframe(df_order)

root
 |-- cpf: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- delivery_address_city: string (nullable = true)
 |-- delivery_address_country: string (nullable = true)
 |-- delivery_address_district: string (nullable = true)
 |-- delivery_address_external_id: string (nullable = true)
 |-- delivery_address_latitude: string (nullable = true)
 |-- delivery_address_longitude: string (nullable = true)
 |-- delivery_address_state: string (nullable = true)
 |-- delivery_address_zip_code: string (nullable = true)
 |-- items: string (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- merchant_latitude: string (nullable = true)
 |-- merchant_longitude: string (nullable = true)
 |-- merchant_timezone: string (nullable = true)
 |-- order_created_at: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_scheduled: boolean (nullable = true)
 |-- order_total_amount: double (nullable = true)
 |-- o

In [14]:
df_order.sort(F.col("order_id").desc()).limit(10)

cpf,customer_id,customer_name,delivery_address_city,delivery_address_country,delivery_address_district,delivery_address_external_id,delivery_address_latitude,delivery_address_longitude,delivery_address_state,delivery_address_zip_code,items,merchant_id,merchant_latitude,merchant_longitude,merchant_timezone,order_created_at,order_id,order_scheduled,order_total_amount,origin_platform,order_scheduled_date
84046272628,b3d8ab42-501f-4c0...,LUIZ,TAUBATE,BR,CENTRO,7768797,-45.56,-23.03,SP,12010,"[{""name"": ""Segund...",6f00d0fe-46a3-477...,-45.56,-23.03,America/Sao_Paulo,2019-01-23T15:51:...,fffffbfa-5827-432...,False,11.0,ANDROID,
67945725258,b3d8ab42-501f-4c0...,LUIZ,TAUBATE,BR,CENTRO,7768797,-45.56,-23.03,SP,12010,"[{""name"": ""Segund...",6f00d0fe-46a3-477...,-45.56,-23.03,America/Sao_Paulo,2018-12-24T15:51:...,fffffbfa-5827-432...,False,11.0,ANDROID,
4866727801,97863872-c491-439...,JOÃO,BELO HORIZONTE,BR,NOVA SUISSA,3928809,-43.98,-19.93,MG,30421,"[{""name"": ""Batata...",e72c1870-aaf7-49d...,-43.98,-19.93,America/Sao_Paulo,2018-12-07T22:53:...,fffff39a-2b12-44f...,False,33.8,DESKTOP,
56631150468,97863872-c491-439...,JOÃO,BELO HORIZONTE,BR,NOVA SUISSA,3928809,-43.98,-19.93,MG,30421,"[{""name"": ""Batata...",e72c1870-aaf7-49d...,-43.98,-19.93,America/Sao_Paulo,2019-01-06T22:53:...,fffff39a-2b12-44f...,False,33.8,DESKTOP,
79586140037,182651b4-efca-454...,NATHALIA,SAO PAULO,BR,JARDIM DAS ACACIAS,9398041,-46.69,-23.62,SP,47040,"[{""name"": ""Creme ...",08384e6a-56fe-48a...,-46.69,-23.62,America/Sao_Paulo,2019-01-24T23:30:...,ffffd242-fd05-4a1...,False,89.8,ANDROID,
81041553016,182651b4-efca-454...,NATHALIA,SAO PAULO,BR,JARDIM DAS ACACIAS,9398041,-46.69,-23.62,SP,47040,"[{""name"": ""Creme ...",08384e6a-56fe-48a...,-46.69,-23.62,America/Sao_Paulo,2018-12-25T23:30:...,ffffd242-fd05-4a1...,False,89.8,ANDROID,
89971888098,206346ef-464d-493...,ANDRÉ,SAO PAULO,BR,VILA CARLOS DE CA...,5996321,-46.54,-23.53,SP,36400,"[{""name"": ""Esfhir...",be092a99-71ce-410...,-46.54,-23.53,America/Sao_Paulo,2018-12-09T22:44:...,ffffcfcb-78de-45d...,False,26.3,ANDROID,
5401250148,206346ef-464d-493...,ANDRÉ,SAO PAULO,BR,VILA CARLOS DE CA...,5996321,-46.54,-23.53,SP,36400,"[{""name"": ""Esfhir...",be092a99-71ce-410...,-46.54,-23.53,America/Sao_Paulo,2019-01-08T22:44:...,ffffcfcb-78de-45d...,False,26.3,ANDROID,
93855608979,fee1264c-0f73-46f...,Laura,RIO DE JANEIRO,BR,LARANJEIRAS,8741496,-43.18,-22.93,RJ,22240,"[{""name"": ""ROL. C...",12f6ec30-77b6-49b...,-43.18,-22.93,America/Sao_Paulo,2019-01-30T22:36:...,ffffc3c9-8c9e-487...,False,79.1,IOS,
64264389175,d656999c-7ba5-40e...,JANILSON,NATAL,BR,NEOPOLIS,8834596,-35.21,-5.86,RN,59080,"[{""name"": ""Promoç...",7b57905d-8eca-493...,-35.21,-5.86,America/Fortaleza,2019-01-24T01:05:...,ffffc34d-874a-4ff...,False,18.5,DESKTOP,


In [15]:
df_status.filter(df_status.order_id == "00001cdb-2399-417f-b630-f87919d25eaa").sort("created_at").limit(10).show(10, False)

df_order.filter(df_order.order_id == "00001cdb-2399-417f-b630-f87919d25eaa").sort("order_created_at").select(*["order_id","order_created_at","cpf"]).show(5, False)


+------------------------------------+------------------------------------+----------+------------------------+
|order_id                            |status_id                           |value     |created_at              |
+------------------------------------+------------------------------------+----------+------------------------+
|00001cdb-2399-417f-b630-f87919d25eaa|dc1d5c26-5fc6-4d28-bf25-fc6789db218b|REGISTERED|2019-01-29T01:04:07.000Z|
|00001cdb-2399-417f-b630-f87919d25eaa|dc1d5c26-5fc6-4d28-bf25-fc6789db218b|REGISTERED|2019-01-29T01:04:07.000Z|
|00001cdb-2399-417f-b630-f87919d25eaa|a69f0793-1cac-4a7c-b767-e878d842b0e2|PLACED    |2019-01-29T01:04:08.000Z|
|00001cdb-2399-417f-b630-f87919d25eaa|a69f0793-1cac-4a7c-b767-e878d842b0e2|PLACED    |2019-01-29T01:04:08.000Z|
|00001cdb-2399-417f-b630-f87919d25eaa|aead61ab-c98e-45c9-a2a8-e53fcdf55afa|CONCLUDED |2019-01-29T03:05:06.000Z|
|00001cdb-2399-417f-b630-f87919d25eaa|aead61ab-c98e-45c9-a2a8-e53fcdf55afa|CONCLUDED |2019-01-29T03:05:0

Well... we have a problem to handle. Not sure why this happened.


In [None]:
df_status.dropDuplicates() [data, data2] [cpf1,cpf2]

# Dataframe Sanitization

In [9]:
from pyspark.sql.functions import lit,unix_timestamp

#converting specific columns to timestamp and dropping duplicates
df_status = df_status.withColumn('created_at',unix_timestamp(lit(df_status.created_at),"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").cast("timestamp"))
df_status = df_status.dropDuplicates()

df_order = df_order.withColumn('order_created_at',unix_timestamp(lit(df_order.order_created_at),"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").cast("timestamp"))
df_order = df_order.dropDuplicates()


In [None]:
df_order.groupBy("order_id").count().filter("count >= 2").count() 

In [None]:
df_order.groupBy("order_id").count().filter("count >= 3").count() 

In [10]:
def add_prefix_to_df_columns(df, prefix="", separator="_", escape=[]):

    for c in df.columns:
        if (c not in escape) and (prefix not in c) :
            df = df.withColumnRenamed(c, '{}{}{}'.format(prefix, separator, c))
    return df

In [11]:
add_prefix_to_df_columns(df_status, "status").printSchema()

root
 |-- status_order_id: string (nullable = true)
 |-- status_id: string (nullable = true)
 |-- status_value: string (nullable = true)
 |-- status_created_at: timestamp (nullable = true)



# Calculating metrics for future validation

In [None]:
metrics_df_order      = calculate_metrics(df_order)
metrics_df_restaurant = calculate_metrics(df_restaurant)
metrics_df_consumer   = calculate_metrics(df_consumer)
metrics_df_status     = calculate_metrics(df_status)

# Job 1: Trusted dataset building 

In [None]:
# df_trusted_order        = build_df_trusted_order(df_order, df_restaurant, df_consumer, df_status)
# df_trusted_items        = build_df_trusted_items(df_order, df_restaurant, df_consumer, df_status)
# df_trusted_order_status = build_df_trusted_items(df_order, df_restaurant, df_consumer, df_status)


# compare_metrics()

# anonimize()

# serialize_df(df_trusted_order, partition_key="")
# serialize_df(df_trusted_items, partition_key="")
# serialize_df(df_trusted_order_status, partition_key="")


In [12]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

def validate_json(record):
    import json
    import re
      
    try:
        updated_record = record.replace(r'\", "', '\ ", "')
        updated_record = updated_record.replace(r'\"}', '\ "}')      
        updated_record = updated_record.replace(r'\"', '\"')      
      
        json.loads(updated_record)   # trying to parse...
        return updated_record
        
    except Exception as e:
        
        return 'invalid'
    
parse_and_fix_json = F.udf(validate_json, T.StringType())

In [13]:
df_order = df_order.withColumn("items_json_text", parse_and_fix_json(F.col("items")))

count_of_invalid_json_items = df_order.filter(df_order.items_json_text == 'invalid').count()

print("Total of invalid json items: {}".format(count_of_invalid_json_items))

Total of invalid json items: 0


In [None]:
from pyspark.sql.functions import from_json, col


items_json_schema = sqlc.read.json(df_order.rdd.map(lambda row: row.items_json_text)).schema
# remark about previous schema building: we could improve this infering process by taking a sample of data. Further evaluation of wuch strategy will be need and therefore will not be adopted here

df_order = df_order.withColumn('items', from_json(col('items_json_text'), items_json_schema)).drop("items_json_text")


In [None]:
# df_order_sample = df_order.sample(False, fraction=0.001, seed=None)

In [None]:
from pyspark.sql.functions import col, max as max_

df_restaurant = add_prefix_to_df_columns(df_restaurant,"merchant", escape=[])
df_consumer   = add_prefix_to_df_columns(df_consumer,"customer", escape=[])
df_status     = add_prefix_to_df_columns(df_status,"status", escape=['order_id'])
df_status     = add_prefix_to_df_columns(df_status,"status", escape=['order_id'])


df_order_laststatus = df_status.groupBy("order_id").agg(max_("status_created_at").alias("last_event")).alias("df_event_agg")\
                               .join(df_status, (df_status.order_id == col('df_event_agg.order_id')) & (df_status.status_created_at == col('df_event_agg.last_event'))).select("df_event_agg.order_id","status_value")

In [None]:
df_order_laststatus.printSchema()
df_order_laststatus.groupBy("order_id").count().filter("count >= 2").count() 

In [None]:
df_trusted_order = df_order.join(df_consumer, ['customer_id'])
# .join(df_restaurant, "merchant_id").join(df_order_laststatus, "order_id")

In [None]:
df_order.groupBy("order_id").count().filter("count >= 2").count() 


In [None]:
# creating column with LOCAL timestamp
df_trusted_order = df_trusted_order.withColumn("localtime_order_created_at",pyspark.sql.functions.to_utc_timestamp(col("order_created_at"), col("merchant_timezone")))

In [None]:
df_trusted_order.show(4, False)

In [None]:
df_order.groupBy("order_id").count().filter("count >= 2").count() 

In [None]:
df_trusted_order.printSchema()

In [None]:
from pyspark.sql.functions import year, month, col, to_date


df_trusted_order = df_trusted_order.withColumn("localtime_order_created_at_date", to_date(col("localtime_order_created_at") ))


df_trusted_order\
    .write\
    .partitionBy("localtime_order_created_at_date")\
    .mode("overwrite")\
    .format("parquet")\
    .save("s3a://trusted-data/orders")




In [None]:
df_status.filter(df_status.order_id == "bb779eab-d791-482a-94f5-abd89ee52002").show(10, False)

In [None]:
df_status.count()


In [None]:
df_status_raw = sqlc.read.json('s3a://raw-data/status.json')

df_status_raw.printSchema()
df_status_raw.filter(df_status_raw.order_id == "bb779eab-d791-482a-94f5-abd89ee52002").show(10, False)
