# iFood Data Architect Test - Marcio de Lima

Process semi-structured data and build a datalake that provides efficient storage and performance. The datalake must be organized in the following 2 layers:
* raw layer: Datasets must have the same schema as the source, but support fast structured data reading
* trusted layer: datamarts as required by the analysis team

In [2]:
#Utility functions
def getTags() -> dict: 
  return sc._jvm.scala.collection.JavaConversions.mapAsJavaMap(
    dbutils.entry_point.getDbutils().notebook().getContext().tags()
  )

def getTag(tagName: str, defaultValue: str = None) -> str:
  values = getTags()[tagName]
  try:
    if len(values) > 0:
      return values
  except:
    return defaultValue

def getUsername() -> str:
  import uuid
  try:
    return getTag("user", str(uuid.uuid1()).replace("-", ""))
  except:
    return "userMDL"

# Get the user's userhome
def getUserhome() -> str:
  username = getUsername()
  return "dbfs:/user/{}".format(username)

def getWorkingDir(caminho:str) -> str:
  workingDir = "{}/{}".format(getUserhome(), caminho)
  return workingDir.replace("__", "_")


In [3]:
# Ifood S3 connection provided in the Challenge

#Chaves de Acesso e Bucket do S3
awsAccessKey = "ENTER HERE"
secretKey = "ENTER HERE".replace("/", "%2F")
awsBucketName = "ifood-data-architect-test-source"

#Montagem da Conexão na S3
mountPoint = f"/mnt/ifood-{getUsername()}"
try:
  dbutils.fs.unmount(mountPoint) 
except:
  ()

try:
  mountTarget = "s3a://{}:{}@{}".format(awsAccessKey, secretKey, awsBucketName)
  dbutils.fs.mount(mountTarget, mountPoint)
except:
  ()




In [4]:
#%fs ls '/mnt/ifood-marcio_de_lima@yahoo.com.br'

In [5]:
# List Files
consumer = "dbfs:{}/{}".format(mountPoint, "consumer.csv.gz")
restaurant = "dbfs:{}/{}".format(mountPoint, "restaurant.csv.gz")
order = "dbfs:{}/{}".format(mountPoint, "order.json.gz")
status = "dbfs:{}/{}".format(mountPoint, "status.json.gz")


In [6]:
# Decision on the use of the file inconsistency method in the s3 load for further analysis.

myBadRecordsConsumer = getWorkingDir("badRecordsPath/consumer/")
myBadRecordsRestaurant = getWorkingDir("badRecordsPath/restaurant/")
myBadRecordsOrder = getWorkingDir("badRecordsPath/order/")
myBadRecordsStatus = getWorkingDir("badRecordsPath/status/")


In [7]:
# Loading data to DataFrames.

consumerDF = (spark.read
  .option("delimiter", ",")
  .option("header", True)
  .option("timestampFormat", "mm/dd/yyyy hh:mm:ss a")
  .option("inferSchema", True)
  .option("badRecordsPath", myBadRecordsConsumer)
  .csv(consumer)
)

restaurantDF = (spark.read
  .option("delimiter", ",")
  .option("header", True)
  .option("timestampFormat", "mm/dd/yyyy hh:mm:ss a")
  .option("inferSchema", True)
  .option("badRecordsPath", myBadRecordsRestaurant)
  .csv(restaurant)
)



In [8]:
#Checking import inconsistency
try:
  path = "{}/*/*/*".format(myBadRecordsConsumer)
  display(spark.read.csv(path))
except:
  print('Nao encontrado inconsistencia no arquivo')

In [9]:
try:
  path = "{}/*/*/*".format(myBadRecordsRestaurant)
  display(spark.read.csv(path))
except:
  print('Nao encontrado inconsistencia no arquivo')

In [10]:
#Creating Schema structure for the provided JSON files.
#Decision made by applying Schema in Json to gain performance in reading and processing data.
#verDF = spark.read.json(order)
#verDF.printSchema()

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, \
DoubleType, BooleanType, TimestampType, DateType, ArrayType, MapType

schemaJsonStatus = StructType([
  StructField("created_at", TimestampType(), True), 
  StructField("order_id", StringType(), True), 
  StructField("status_id", StringType(), True),
  StructField("value", StringType(), True) 

])

schemaJsonOrder = StructType([
  StructField("cpf", StringType(), True), 
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True), 
  StructField("delivery_address_city", StringType(), True), 
  StructField("delivery_address_country", StringType(), True), 
  StructField("delivery_address_district", StringType(), True),
  StructField("delivery_address_external_id", StringType(), True), 
  StructField("delivery_address_latitude", StringType(), True) ,
  StructField("delivery_address_longitude", StringType(), True) ,
  StructField("delivery_address_state", StringType(), True) ,
  StructField("delivery_address_zip_code", StringType(), True) , 
  StructField("merchant_id", StringType(), True) ,
  StructField("merchant_latitude", StringType(), True), 
  StructField("merchant_longitude", StringType(), True), 
  StructField("merchant_timezone", StringType(), True),
  StructField("order_created_at", TimestampType(), True),
  StructField("order_id", StringType(), True),
  StructField("order_scheduled", BooleanType(), True),
  StructField("order_scheduled_date", TimestampType(), True),
  StructField("order_total_amount", DoubleType(), True),
  StructField("origin_platform", StringType(), True), 
  StructField("items", StringType(), True)
])



In [11]:
# Loading JSON data
orderDF = (spark.read
  .schema(schemaJsonOrder)
  .option("badRecordsPath", myBadRecordsOrder)
  .json(order)
)

statusDF = (spark.read
  .schema(schemaJsonStatus)
  .option("badRecordsPath", myBadRecordsStatus)
  .json(status)
)


In [12]:
#Checking import inconsistency
try:
  path = "{}/*/*/*".format(myBadRecordsStatus)
  display(spark.read.json(path))
except:
  print('Nao encontrado inconsistencia no arquivo')

In [13]:
#Checking import inconsistency
try:
  path = "{}/*/*/*".format(myBadRecordsOrder)
  display(spark.read.json(path))
except:
  print('Nao encontrado inconsistencia no arquivo')

In [14]:
# Treatment of duplicate data in DataFrames if they exist.
statusDF = statusDF.dropDuplicates(["status_id"])
orderDF = orderDF.dropDuplicates(["order_id"])

In [15]:
# Close connection S3
try:
  dbutils.fs.unmount(mountPoint) 
except:
  ()

## Decision to use DeltaLake


Reasons and criteria below:

* <b>ACID transactions</b> - Multiple writers can simultaneously modify a data set and see consistent views.
* <b>DELETES/UPDATES/UPSERTS</b> - Writers can modify a data set without interfering with jobs reading the data set.
* <b>Automatic file management</b> - Data access speeds up by organizing data into large files that can be read efficiently.
* <b>Statistics and data skipping</b> - Reads are 10-100x faster when statistics are tracked about the data in each file, allowing Delta to avoid reading irrelevant information.

In [17]:
# Work Directory's
deltaPathConsumer = getWorkingDir("consumer-data-delta/")
deltaPathStatus = getWorkingDir("status-data-delta/")
deltaPathOrder = getWorkingDir("order-data-delta/")
deltaPathRestaurant = getWorkingDir("restaurant-data-delta/")

In [18]:
# Security Treatment - Cleaning of Table files, if any
# These instructions are only for the challenge, in a PRD environment, the RAW tables would be incremented via batch or Stream in real time. 
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)
try:
  spark.sql(""" VACUUM delta.`{}` RETAIN 0 HOURS """.format(deltaPathConsumer))
except Exception as err: 
  print(str(err).replace("\\n", "\n").replace("'", ""))

try:
  spark.sql(""" VACUUM delta.`{}` RETAIN 0 HOURS """.format(deltaPathStatus))
except Exception as err: 
  print(str(err).replace("\\n", "\n").replace("'", ""))

try:
  spark.sql(""" VACUUM delta.`{}` RETAIN 0 HOURS """.format(deltaPathOrder))
except Exception as err: 
  print(str(err).replace("\\n", "\n").replace("'", ""))

try:
  spark.sql(""" VACUUM delta.`{}` RETAIN 0 HOURS """.format(deltaPathRestaurant))
except Exception as err: 
  print(str(err).replace("\\n", "\n").replace("'", ""))


In [19]:
# Write Tables
# RAW Tables as requested in the Challenge.
# The OVERWRITE mode was placed, as the origin of the S3 files was not informed, if they are generated day by day only with updates for example and not as a FULL load, we can exchange OVERWRITE for the append or work with the concept of MERGE in DeltaLake . Thus, the most simple and practical method was chosen for the challenge. 

spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", True)

# Write
(statusDF.write
  .mode("overwrite")
  .format("delta")
  .partitionBy("value")
  .save(deltaPathStatus) )


(restaurantDF.write
  .mode("overwrite")
  .format("delta")
  .partitionBy("merchant_state")
  .save(deltaPathRestaurant) )


(consumerDF.write
  .mode("overwrite")
  .format("delta")
  .save(deltaPathConsumer) )

(orderDF.write
  .mode("overwrite")
  .format("delta")
  .save(deltaPathOrder) )


In [20]:
# Checking Order Table processing and writes
sqlCmd = "SELECT count(*) FROM delta.`{}` ".format(deltaPathOrder)
print(spark.sql(sqlCmd).first()[0])

In [21]:
# Checking Order Status processing and writes
sqlCmd1 = "SELECT count(*) FROM delta.`{}` ".format(deltaPathStatus)
print(spark.sql(sqlCmd1).first()[0])

In [22]:
# Optimizing RAW Tables
# OPTIMIZE => Solving the Small Files Problem, Compression in 1GB.
# Benefit: Gain in performance and decreased latency time.
# ZORDER => Performance gains in queries

spark.sql("""OPTIMIZE delta.`{}` ZORDER by (order_id)""".format(deltaPathStatus))

spark.sql("""OPTIMIZE delta.`{}` ZORDER by (id)""".format(deltaPathRestaurant))

spark.sql("""OPTIMIZE delta.`{}` ZORDER by (customer_id)""".format(deltaPathConsumer))

spark.sql("""OPTIMIZE delta.`{}` ZORDER by (order_id)""".format(deltaPathOrder))


## Constructing Trust Layer 


  * Order dataset -  one line per order with all data from order, consumer, restaurant and the LAST status from order statuses dataset. To help analysis, it would be a nice to have: data partitioned on the restaurant LOCAL date.
  * Order Items dataset - easy to read dataset with one-to-many relationship with Order dataset. Must contain all data from _order_ items column.
  * Order statuses - Dataset containing one line per order with the timestamp for each registered event: CONCLUDED, REGISTERED, CANCELLED, PLACED.
  * For the trusted layer, anonymize any sensitive data.

In [24]:
# Work Directory
deltaPathOrderTrust = getWorkingDir("order-trust-delta/")
deltaPathOrderItem = getWorkingDir("order-item-trust-delta/")
deltaPathOrderStatus = getWorkingDir("order-status-trust-delta/")

In [25]:
# Creating the Status Table - Trust Layer
ustatusDF = spark.sql("SELECT order_id, created_at, value FROM delta.`{}` group by order_id, created_at, value".format(deltaPathStatus))

(ustatusDF
  .write
  .mode("overwrite")
  .format("delta")
  .partitionBy("value")
  .save(deltaPathOrderStatus) 
)

In [26]:
from pyspark.sql.functions import to_json, from_json
from pyspark.sql.functions import explode

#Schema Items
schemaItem = ArrayType(StructType([
  StructField("name", StringType(), True), 
  StructField("quantity", DoubleType(), True),
  StructField("externalId", StringType(), True),
  StructField("sequence", IntegerType(), True),
  StructField("addition", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("discount", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("unitPrice", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("totalValue", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("customerNote", StringType(), True),
  StructField("integrationId", StringType(), True),
  StructField("categoryName", StringType(), True),
  StructField("totalAddition", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("totalDiscount", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("garnishItems", ArrayType(StructType([ 
        StructField("name", StringType(), True) , 
        StructField("quantity", DoubleType(), True),
        StructField("addition", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
        StructField("discount", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
        StructField("sequence", IntegerType(), True),
        StructField("unitPrice", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
        StructField("categoryId", StringType(), True),
        StructField("categoryName", StringType(), True),
        StructField("integrationId", StringType(), True),
        StructField("totalValue", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
              ])), True)
  

]))

## Creation of the Trust Items Table - Query, Transformations and Treatments
itemDFTrust = spark.sql("SELECT * FROM delta.`{}`".format(deltaPathOrder)) \
            .select(col("order_id"), from_json("items", schemaItem).alias("items")).cache() \
            .select(col("order_id"), explode("items").alias("e")).selectExpr("order_id", \
                                 "e.name", \
                                 "e.quantity", \
                                 "e.externalId", \
                                 "e.sequence", \
               "(case when e.unitPrice.value = '0' then 0.00 else cast(e.unitPrice.value as long) / 100 end) as unitPrice_value", \
                                 "e.unitPrice.currency as unitPrice_currency", \
               "(case when e.totalValue.value = '0' then 0.00 else cast(e.totalValue.value as long) / 100 end) as totalValue_value", \
                                "e.totalValue.currency as totalValue_currency", \
               "(case when e.addition.value = '0' then 0.00 else cast(e.addition.value as long) / 100 end) as addition_value", \
                                 "e.addition.currency as addition_currency", \
               "(case when e.discount.value = '0' then 0.00 else cast(e.discount.value as long) / 100 end) as discount_value", \
                                 "e.discount.currency as discount_currency", \
                                 "e.customerNote", \
                                 "e.integrationId", \
                                 "e.categoryName", \
               "(case when e.totalAddition.value = '0' then 0.00 else cast(e.totalAddition.value as long) / 100 end) as totalAddition_value", \
                                 "e.totalAddition.currency as totalAddition_currency", \
               "(case when e.totalDiscount.value = '0' then 0.00 else cast(e.totalDiscount.value as long) / 100 end) as totalDiscount_value", \
                                 "e.totalDiscount.currency as totalDiscount_currency", \
                                 "e.garnishItems"
                                ).fillna("")

# Write Table Item in DeltaLake
(itemDFTrust
  .write
  .mode("overwrite")
  .format("delta")
  .save(deltaPathOrderItem) 
)

In [27]:
# Creating the Order data with the requested Inner Joins and Data Anomimation

# Last Order House Status
ultimoStatusDF = spark.sql("SELECT order_id, max(created_at) as created_at_status, max(value) as lastStatus FROM delta.`{}` group by order_id".format(deltaPathOrderStatus)).drop("created_at_status")

# Generating Order with Status
orderDeltaDF = spark.sql("SELECT * FROM delta.`{}` ".format(deltaPathOrder)).join(ultimoStatusDF, "order_id") \
                .drop("cpf").drop("delivery_address_latitude").drop("delivery_address_longitude") \
                .drop("created_at").drop("status_id").drop("value").drop("items")

# Generating Order with Consumer
consumerDeltaDF = spark.sql("SELECT customer_id, created_at as customer_created_at, language as customer_language, active as customer_active FROM delta.`{}` ".format(deltaPathConsumer))
orderConsumerTrustDF = orderDeltaDF.join(consumerDeltaDF, "customer_id")

# Generating Order with Restaurant
restaurantDeltaDF = spark.sql("SELECT id, to_date(created_at, 'yyyy-MM-dd') as merchant_created_at, enabled as merchant_enabled, \
price_range as merchant_price_range, average_ticket as merchant_average_ticket, takeout_time as merchant_takeout_time, \
delivery_time as merchant_delivery_time, minimum_order_value as merchant_minimum_order_value, merchant_zip_code, \
merchant_city, merchant_state, merchant_country FROM delta.`{}`".format(deltaPathRestaurant)).withColumnRenamed("id","merchant_id") 

orderTrustDF = orderConsumerTrustDF.join(restaurantDeltaDF, "merchant_id").fillna("")

In [28]:
# Generating Order Data on DeltaLake
(orderTrustDF
  .write
  .mode("overwrite")
  .format("delta")
  .save(deltaPathOrderTrust) 
)


In [29]:
# Showing the table saved in DeltaLake - Order
display(spark.sql("SELECT * FROM delta.`{}` where order_id = '002bbaff-af82-4a50-9083-07f7967ca4ea'".format(deltaPathOrderTrust)))

merchant_id,customer_id,order_id,customer_name,delivery_address_city,delivery_address_country,delivery_address_district,delivery_address_external_id,delivery_address_state,delivery_address_zip_code,merchant_latitude,merchant_longitude,merchant_timezone,order_created_at,order_scheduled,order_scheduled_date,order_total_amount,origin_platform,lastStatus,customer_created_at,customer_language,customer_active,merchant_created_at,merchant_enabled,merchant_price_range,merchant_average_ticket,merchant_takeout_time,merchant_delivery_time,merchant_minimum_order_value,merchant_zip_code,merchant_city,merchant_state,merchant_country
83cc6d5d-d91b-4316-88ac-acbd9cf7ab8c,58563e65-ce21-4c6a-9d81-91582e9416d5,002bbaff-af82-4a50-9083-07f7967ca4ea,FILIPE,RIO DE JANEIRO,BR,COPACABANA,2671315,RJ,22070,-43.19,-22.98,America/Sao_Paulo,2019-01-31T23:35:11.000+0000,False,,61.1,IOS,REGISTERED,2018-01-07T20:47:54.420+0000,pt-br,True,2017-01-20,True,3,80.0,0,60,0.0,22071,RIO DE JANEIRO,RJ,BR


In [30]:
# Showing the table saved in DeltaLake - OrderItem
display(spark.sql("SELECT * FROM delta.`{}` where order_id = '002bbaff-af82-4a50-9083-07f7967ca4ea'".format(deltaPathOrderItem)))

order_id,name,quantity,externalId,sequence,unitPrice_value,unitPrice_currency,totalValue_value,totalValue_currency,addition_value,addition_currency,discount_value,discount_currency,customerNote,integrationId,categoryName,totalAddition_value,totalAddition_currency,totalDiscount_value,totalDiscount_currency,garnishItems
002bbaff-af82-4a50-9083-07f7967ca4ea,COCA COLA 2 LT,1.0,545925f105fa42ada820065b258efde7,4,8.2,BRL,8.2,BRL,0.0,BRL,0.0,BRL,,,,0.0,BRL,0.0,BRL,List()
002bbaff-af82-4a50-9083-07f7967ca4ea,GIGANTE,1.0,98063bda8b574f2f9c5ba6de18c94ec9,1,0.0,BRL,0.0,BRL,0.0,BRL,0.0,BRL,,,,0.0,BRL,0.0,BRL,"List(List(MASSA TRADICIONAL , 1.0, List(0, BRL), List(0, BRL), 2, List(0, BRL), 0025, Escolha a sua Preferência, null, List(0, BRL)), List(CALABRESA ESPECIAL, 1.0, List(0, BRL), List(0, BRL), 3, List(5290, BRL), SBR, Escolha um sabor, null, List(5290, BRL)))"


In [31]:
# Showing the table saved in DeltaLake - Status
display(spark.sql("SELECT * FROM delta.`{}` where order_id = '002bbaff-af82-4a50-9083-07f7967ca4ea'".format(deltaPathOrderStatus)))

order_id,created_at,value
002bbaff-af82-4a50-9083-07f7967ca4ea,2019-01-31T23:35:12.000+0000,PLACED
002bbaff-af82-4a50-9083-07f7967ca4ea,2019-01-31T23:35:11.000+0000,REGISTERED
002bbaff-af82-4a50-9083-07f7967ca4ea,2019-01-02T01:40:08.000+0000,CONCLUDED


## Challenge Finished

### Thanks

### Marcio de Lima