In [1]:
import pyspark as pys
import pyspark.sql.functions as sf
import pyspark.sql as s
import datetime as dt
from pyspark import SparkContext
import pyspark.sql.types as st
import logging

# Setup your credentials
AWS_ACCESS_KEY=''
AWS_SECRET_KEY=''

MAX_MEMORY = "5g"

spark = pys.sql.SparkSession.builder \
    .master('local[*]') \
    .config("spark.jars.packages", "com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3")\
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY) 
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_KEY)

spark.conf.set("spark.sql.broadcastTimeout", 1200)

In [None]:
transient_path = 's3a://ifood-data-architect-test-source/order.json.gz'

In [None]:
BASE_PATH_SOURCE='s3a://ifood-data-architect-test-source/'
zone = 'raw'
dataset_name = 'order'
read_type = 'json'

PATH = BASE_PATH_SOURCE + dataset_name + '.' + read_type + '.gz'
PATH

In [None]:
transient_path = 's3a://ifood-data-architect-test-source/order.json.gz'

TRANSIENT_READ = spark.read \
    .load(transient_path, format='json', header=True, infer_schema=True)

In [None]:
TRANSIENT_READ.show(1)

In [None]:
TRANSIENT_READ.count()

In [None]:
TRANSIENT_READ.printSchema()

In [None]:
TRANSIENT_READ.write \
  .mode('overwrite') \
  .parquet('s3a://ifood-data-architect-test-source-murillo/order')

In [None]:
READ = spark \
    .read \
    .format('json')  \
    .option('path', transient_path)

READ.show(1)
#    .withColumn('partition_column', sf.lit(PARTITION_DATE))

In [None]:
def args_datasets_read_types(sys_args):
    """ param sys_args: Receive all atributos from Glue UI param.
        All datasets with "--n_" prefixes are being treating with datasets and read types.
        e.g:
            --n_consumer / csv.
            --n_order / json
            --n_....
        return dict with datasets and read types.
    """
    datasets_read_types = []
    for liu in enumerate(sys_args):
        if '--n_' in liu[1]:
            dataset = liu[1][4:]
            read_type = sys_args[liu[0]+1]
            datasets_read_types.append([dataset, read_type])
    return datasets_read_types


In [61]:
def datetime_result(date_type):
    """ Recover brazilian time.
    """ 
    if date_type == 'today':
        date_now = (dt.datetime.utcnow() - dt.timedelta(hours=3)).strftime('%Y-%m-%d')
        return str(date_now)
    if date_type == 'today_timestamp':
        date_now = (dt.datetime.utcnow() - dt.timedelta(hours=3)).strftime('%Y-%m-%d %H:%M:%S')
        return str(date_now)
    else:
        date_yesternoon = (dt.datetime.utcnow() - dt.timedelta(days=1)).strftime('%Y-%m-%d')
        return str(date_yesternoon)

In [None]:
datetime_result('today_timestamp')

In [None]:
def path_read_write(zone, dataset_name, read_type):
    """ 
        refurn absolute path.
    """
    BASE_PATH_SOURCE='s3a://ifood-data-architect-test-source'
    
    if zone == 'source':
        PATH = BASE_PATH_SOURCE + '/' + dataset_name + '.' + read_type + '.gz'
    else:
        PATH = BASE_PATH_SOURCE + '-murillo/' + zone + '/' + dataset_name
        
    return PATH

In [None]:
def read_dataset(zone, dataset_name, read_type):
    """
        e.g:
            Read json and csv from source repo, creating partition column setting now date.
        return spark dataframe
    """
    PARTITION_DATE = datetime_result('today')
       
    PATH = path_read_write(zone, dataset_name, read_type)
    
    READ = spark.read \
        .load(PATH, format=read_type, header=True, infer_schema=True) \
        .withColumn('partition_column', sf.lit(PARTITION_DATE))
    
    return READ

In [None]:
def write_dataset(zone, dataset_name, df):
    """ param zone (transient/raw/refined/trusted)
        param dataset_name
        param df (spark dataframe)
        e.g:
            Write some data in .parquet extension, overwriting existing data inside the partition.
        return spark dataframe
    """
    
    PATH = path_read_write(zone, dataset_name, '.parquet')
    
    df.write \
      .mode('overwrite') \
      .partitionBy('partition_column') \
      .parquet(PATH)

In [None]:
#teste1 = ['--n_order', 'json', '--n_restaurant', 'csv', '--n_status', 'json', '--n_status', 'csv', '--env', 'prd',]
teste1 = ['--n_status', 'json', '--n_restaurant', 'csv']

In [None]:
for dataset_name, type_read in args_datasets_read_types(teste1):
    """ Iterate from all datasets and read_types configurable.
    """
    print('Load Transient:' + datetime_result('today_timestamp') + ' - Dataset: ' + dataset_name + ' - type_read: ' + type_read)
    #logger.info('Load Transient:' + datetime_result('today_timestamp') + ' - type_read: ' + type_read + ' Dataset: ' + dataset)
    df = read_dataset('source', dataset_name, type_read)
    
    print('Write Raw:' + datetime_result('today_timestamp') + ' - Dataset: ' + dataset_name + ' - type_read: ' + type_read)
    #logger.info('Write Raw: ' + datetime_now() + ' - Namespace: ' + namespace + ' Dataset: ' + dataset)
    write_dataset('raw', dataset_name, df)

## Trusted

In [None]:
def hash_column(df, columns):
    """
    Used to hash columns
    :param df: :class:`pyspark.sql.DataFrame` instance
    :param columns: list, columns to hash
    :return df: :class:`pyspark.sql.DataFrame` instance
    """
    salt = 'QglW6kADoUt8-FXIHQQsj3tK-Vpz6QaZ2DoCCQKEARM='
    for column in columns:
        df = df.withColumn(column, sf.trim(sf.col(column)))
        df = df.withColumn(column, sf.concat(column, sf.lit(salt)))
        df = df.withColumn(column, sf.sha2(column, 256))
    return df

@sf.udf
def hash_tel(column):
    return int(column) * 5 / 2 

In [None]:
local_path_order = '/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/raw/order'
RAW_ORDER = spark.read \
    .load(local_path_order, format='parquet') \
    .drop("partition_column","customer_name") \


local_path_status = '/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/raw/status'
RAW_STATUS = spark.read \
    .load(local_path_status, format='parquet') \
    .drop("partition_column") \
    .withColumnRenamed("created_at","status_created_at") \
    .withColumnRenamed("value", "status_value") \


local_path_consumer = '/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/raw/consumer'
RAW_CONSUMER = spark.read \
    .load(local_path_consumer, format='parquet') \
    .drop("partition_column","customer_name") \
    .withColumnRenamed("created_at","consumer_created_at") \
    .withColumn("customer_phone_number", hash_tel('customer_phone_number'))


local_path_restaurant = '/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/raw/restaurant'
RAW_RESTAURANT = spark.read \
    .load(local_path_restaurant, format='parquet') \
    .drop("partition_column") \
    .withColumnRenamed("created_at","restaurant_created_at")

# Validação Order

In [None]:
RAW_ORDER.show(2,False)

In [None]:
RAW_ORDER.count()

In [None]:
RAW_ORDER.select('customer_id').distinct().count()

In [None]:
RAW_ORDER.select('order_id').distinct().count()

In [None]:
RAW_ORDER.select('order_id').groupBy('order_id').count().filter('count > 1').show(10,False)

In [None]:
RAW_ORDER.filter("order_id == '793770c3-16ba-4d81-a1aa-cd120d395162'").show(10,False)

In [None]:
RAW_STATUS.show(10,False)

In [None]:
RAW_CONSUMER.show(10,False)

In [None]:
RAW_CONSUMER.count()

In [None]:
RAW_RESTAURANT.show(10,False)

In [None]:
new_order = RAW_ORDER.select(sf.length('cpf').alias('cpf')).groupby('cpf').agg(sf.max('cpf'))

new_order.show(10,False)

## Dedup

In [None]:
# Join Order + Consumer + Restaurant
DF_PRIMARY = RAW_ORDER \
    .join(RAW_CONSUMER, 'customer_id') \
    .join(RAW_RESTAURANT, RAW_RESTAURANT.id == RAW_ORDER.merchant_id) \

## Order
LAST_VALUE_ORDER = s.Window.partitionBy(
    "customer_id") \
    .orderBy(sf.col("order_created_at").desc())

DF_TRUSTED_ORDER = DF_PRIMARY.dropDuplicates() \
    .withColumn("distinct", sf.row_number().over(LAST_VALUE_ORDER)) \
    .filter("distinct = 1") \
    .drop("distinct")

## Status
LAST_VALUE_STATUS = s.Window.partitionBy(
    "order_id") \
    .orderBy(sf.col("status_created_at").desc())

DF_TRUSTED_STATUS = RAW_STATUS.dropDuplicates() \
    .withColumn("distinct", sf.row_number().over(LAST_VALUE_STATUS)) \
    .filter("distinct = 1") \
    .drop("distinct") \
    .select("order_id", "status_created_at", "status_value")

##
DF_TRUSTED_FINAL = DF_TRUSTED_ORDER \
    .join(DF_TRUSTED_STATUS, "order_id") \
    .withColumn("restaurant_partition", sf.date_format("restaurant_created_at", "yyyy-MM-dd"))

DF_TRUSTED_ANONYMIZED = hash_column(DF_TRUSTED_FINAL, ['cpf'])

# DF_TRUSTED_ANONYMIZED.repartition(30).write.parquet("/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/trusted/order", mode="overwrite", partitionBy="restaurant_partition")

In [None]:
DF_TRUSTED_ANONYMIZED.show(5,False)

In [None]:
local_path_order = "/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/trusted/order"
DF_TRUSTED = spark.read \
    .parquet(local_path_order) \

DF_TRUSTED.filter("order_id == 'ab0afdba-beeb-442b-918b-9e827532d4b1'").show(20,False)

In [None]:
# Recover dynamic schema 
JSON_SCHEMA = spark.read \
    .json(DF_TRUSTED.rdd.map(lambda row: row.items)).schema

JSON_SCHEMA = st.ArrayType(JSON_SCHEMA)

DF_FLATTEN = DF_TRUSTED.withColumn("items", sf.from_json("items", JSON_SCHEMA))

In [None]:
DF_FLATTEN = DF_TRUSTED.withColumn("items", sf.from_json("items", JSON_SCHEMA))

# DF_FLATTEN.show(1)

In [None]:
DF_EXPLODE_ITEMS = DF_FLATTEN.select(
    "order_id",
    sf.explode("items").alias("items"))

DF_EXPLODE_ITEMS_DETAILS = DF_EXPLODE_ITEMS \
    .select("order_id",
            sf.col("items.name").alias("name"),
            sf.col("items.addition").getItem('currency').alias("addition_currency"),
            sf.col("items.addition").getItem('value').alias("addition"),
            sf.col("items.discount").getItem('currency').alias("discount_currency"),
            sf.col("items.discount").getItem('value').alias("discount"),
            sf.col("items.quantity").alias("quantity"),
            sf.col("items.unitPrice").getItem('currency').alias("unit_price_currency"),
            sf.col("items.unitPrice").getItem('value').alias("unit_price"),
            sf.col("items.externalId").alias("external_id"),
            sf.col("items.totalValue").getItem('currency').alias("total_value_currency"),
            sf.col("items.totalValue").getItem('value').alias("total_value"),
            sf.col("items.customerNote").alias("customer_note"),
            sf.col("items.integrationId").alias("integration_id"),
            sf.col("items.totalAddition").getItem('currency').alias("tota_addition_currency"),
            sf.col("items.totalAddition").getItem('value').alias("tota_addition"),
            sf.col("items.totalDiscount").getItem('currency').alias("total_discount_currency"),
            sf.col("items.totalDiscount").getItem('value').alias("total_discount"),
            sf.explode("items.garnishItems.externalId").alias("garnish_external_id"),
            sf.col("items.garnishItems").alias("garnish_items")
            ) \
    .dropDuplicates()

DF_EXPLODE_ITEMS_DETAILS.filter("order_id == 'ab0afdba-beeb-442b-918b-9e827532d4b1'").show(10,False)

In [None]:
## Structure Garnish details
DF_EXPLODE_GARNISH = DF_EXPLODE_ITEMS_DETAILS.select(
    "order_id",
    "garnish_external_id",
    sf.explode("garnish_items").alias("garnish_items"))


DF_EXPLODE_GARNISH_DETAIL = DF_EXPLODE_GARNISH \
    .select("order_id",           
            sf.col("garnish_items.name").alias("garnish_name"),
            sf.col("garnish_items.addition").getItem('value').alias("garnish_addition"),
            sf.col("garnish_items.addition").getItem('currency').alias("garnish_addition_currency"),
            sf.col("garnish_items.discount").getItem('value').alias("garnish_discount"),
            sf.col("garnish_items.discount").getItem('currency').alias("garnish_discount_currency"),
            sf.col("garnish_items.quantity").alias("garnish_quantity"),
            sf.col("garnish_items.sequence").alias("garnish_sequence"),
            sf.col("garnish_items.unitPrice").getItem('value').alias("garnish_unit_price"),
            sf.col("garnish_items.unitPrice").getItem('currency').alias("garnish_unit_price_currency"),
            sf.col("garnish_items.categoryId").alias("garnish_category_id"),
            sf.col("garnish_items.categoryName").alias("garnish_category_name"),
            sf.col("garnish_items.externalId").alias("garnish_external_id"),
            sf.col("garnish_items.totalValue").getItem('value').alias("garnish_total_value"),
            sf.col("garnish_items.totalValue").getItem('currency').alias("garnish_total_value_currency"),
            sf.col("garnish_items.integrationId").alias("garnish_integration_id"),
            ) \
    .dropDuplicates()
            

## Union itens with garnish dataframe, 
## when 1 or more item can have 1 or more garnish.
DF_UNION_ITENS_GARNISH = DF_EXPLODE_ITEMS_DETAILS \
    .join(DF_EXPLODE_GARNISH_DETAIL, ["order_id","garnish_external_id"]) \
    .drop("garnish_items")

DF_UNION_ITENS_GARNISH.filter("order_id == 'ab0afdba-beeb-442b-918b-9e827532d4b1'").show(1000,False)

In [None]:
DF_UNION_ITENS_GARNISH.repartition(20) \
    .write.parquet(
        "/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/trusted/order_items", mode="overwrite")

In [None]:
# DF_FLATTEN.select("items.name").show()

DF_FLATTEN = DF_FLATTEN \
    .withColumn("name", sf.col("items.name")) \
    .withColumn("addition", sf.col("items.addition").getItem('value')) \
    .withColumn("addition_currency", sf.col("items.addition").getItem('currency')) \
    .withColumn("discount", sf.col("items.discount").getItem('value')) \
    .withColumn("discount_currency", sf.col("items.discount").getItem('currency')) \
    .withColumn("quantity", sf.col("items.quantity")) \
    .withColumn("unit_price", sf.col("items.unitPrice").getItem('value')) \
    .withColumn("unit_currency", sf.col("items.unitPrice").getItem('currency')) \
    .withColumn("external_id", sf.col("items.externalId")) \
    .withColumn("total_value", sf.col("items.totalValue").getItem('value')) \
    .withColumn("total_value_currency", sf.col("items.totalValue").getItem('currency')) \
    .withColumn("customer_note", sf.col("items.customerNote")) \
    .withColumn("integration_id", sf.col("items.integrationId")) \
    .withColumn("total_addition", sf.col("items.totalAddition").getItem('value')) \
    .withColumn("total_addition_currency", sf.col("items.totalAddition").getItem('currency')) \
    .withColumn("total_discount", sf.col("items.totalDiscount").getItem('value')) \
    .withColumn("total_discount_currency", sf.col("items.totalDiscount").getItem('currency')) \
    .withColumn("garnish_items_name", sf.col("items.garnishItems.name")) \
    .withColumn("garnish_items_addition", sf.col("items.garnishItems.addition").getItem('value')) \
    .withColumn("garnish_items_addition_currency", sf.col("items.garnishItems.addition").getItem('currency')) \
    .withColumn("garnish_items_discount", sf.col("items.garnishItems.discount").getItem('value')) \
    .withColumn("garnish_items_discount_currency", sf.col("items.garnishItems.discount").getItem('currency')) \
    .withColumn("garnish_items_quantity", sf.col("items.garnishItems.quantity")) \
    .withColumn("garnish_items_sequence", sf.col("items.garnishItems.sequence")) \
    .withColumn("garnish_items_unit_price", sf.col("items.garnishItems.unitPrice").getItem('value')) \
    .withColumn("garnish_items_unit_price_currency", sf.col("items.garnishItems.unitPrice").getItem('currency')) \
    .withColumn("garnish_items_category_id", sf.col("items.garnishItems.categoryId")) \
    .withColumn("garnish_items_category_name", sf.col("items.garnishItems.categoryName")) \
    .withColumn("garnish_items_external_id", sf.col("items.garnishItems.externalId")) \
    .withColumn("garnish_items_total_value", sf.col("items.garnishItems.totalValue").getItem('value')) \
    .withColumn("garnish_items_total_value_currency", sf.col("items.garnishItems.totalValue").getItem('currency')) \
    .withColumn("garnish_items_integration_id", sf.col("items.garnishItems.integrationId")) \


# DF_FLATTEN.show(10,False)

In [None]:
DF_ORDER_ITEMS.repartition(30).write.parquet("/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/trusted/order", mode="overwrite", partitionBy="restaurant_partition")

In [None]:
RAW_STATUS.show(5,False)

In [None]:
RAW_ORDER.show(1,False)

In [None]:
### Or

In [None]:
local_path_order = "/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/trusted/order"
DF_ORDER_STATUSES = spark.read \
    .parquet(local_path_order) \

In [None]:
local_path_status = '/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/raw/status'
RAW_STATUS = spark.read \
    .load(local_path_status, format='parquet') \
    .drop("partition_column") \
    .withColumnRenamed("created_at","status_created_at") \
    .withColumnRenamed("value", "status_value") \

In [None]:
DF_ORDER_STATUSES.createOrReplaceTempView("status")
RAW_ORDER.createOrReplaceTempView("order")

In [None]:
df_status_order = spark.sql("""
select  ord.order_id,
        st1.status_value status_1,
        st1.status_created_at status_created_at_1,
        st2.status_value status_2,
        st2.status_created_at status_created_at_2,
        st3.status_value status_3,
        st3.status_created_at status_created_at_3,
        st4.status_value status_4,
        st4.status_created_at status_created_at_4
from order ord
inner join (select status_created_at, status_value, order_id
             from status
            where status_value = 'REGISTERED') st1 on st1.order_id = ord.order_id
            
left join (select status_created_at, status_value, order_id
             from status
            where status_value = 'PLACED') st2 on st2.order_id = st1.order_id
            
left join (select status_created_at, status_value, order_id
             from status
            where status_value = 'CONCLUDED') st3 on st3.order_id = st2.order_id
      
left join (select status_created_at, status_value, order_id
             from status
            where status_value = 'CANCELLED') st4 on st4.order_id = st3.order_id
""").dropDuplicates()

In [36]:
def hash_column(df, columns):
    """
    Used to hash columns
    :param df: :class:`pyspark.sql.DataFrame` instance
    :param columns: list, columns to hash
    :return df: :class:`pyspark.sql.DataFrame` instance
    """
    salt = 'QglW6kADoUt8-FXIHQQsj3tK-Vpz6QaZ2DoCCQKEARM='
    for column in columns:
        df = df.withColumn(column, sf.trim(sf.col(column)))
        df = df.withColumn(column, sf.concat(column, sf.lit(salt)))
        df = df.withColumn(column, sf.sha2(column, 256))
    return df

@sf.udf
def hash_tel(column):
    return int(column) * 5 / 2 

In [132]:
def path_read_write(zone, dataset_name):
    """
        refurn absolute bucket path.
    """
    BASE_PATH_SOURCE='s3://ifood-data-architect-test-source'

    PATH = BASE_PATH_SOURCE + '/' + zone + '/' + dataset_name
    
    return PATH

In [133]:
path_read_write("raw", "order")

's3://ifood-data-architect-test-source/raw/order'

In [37]:
local_path_order = '/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/raw/order'
RAW_ORDER = spark.read \
    .load(local_path_order, format='parquet') \
    .drop("partition_column","customer_name") \


local_path_status = '/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/raw/status'
RAW_STATUS = spark.read \
    .load(local_path_status, format='parquet') \
    .drop("partition_column") \
    .withColumnRenamed("created_at","status_created_at") \
    .withColumnRenamed("value", "status_value") \


local_path_consumer = '/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/raw/consumer'
RAW_CONSUMER = spark.read \
    .load(local_path_consumer, format='parquet') \
    .drop("partition_column","customer_name") \
    .withColumnRenamed("created_at","consumer_created_at") \
    .withColumn("customer_phone_number", hash_tel('customer_phone_number'))


local_path_restaurant = '/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/raw/restaurant'
RAW_RESTAURANT = spark.read \
    .load(local_path_restaurant, format='parquet') \
    .drop("partition_column") \
    .withColumnRenamed("created_at","restaurant_created_at")

In [38]:
def dedup_dataframe(pk_col, max_col, dataframe):
    
    LAST_VALUE_ORDER = s.Window.partitionBy(
        pk_col) \
        .orderBy(sf.col(max_col).desc())

    DF_DEDUP = dataframe.dropDuplicates() \
        .withColumn("distinct", sf.row_number().over(LAST_VALUE_ORDER)) \
        .filter("distinct = 1") \
        .drop("distinct")
    
    return DF_DEDUP

In [54]:
def write_trusted(dataframe, repartition, partition, dataset):
    
    PATH = path_read_write("trusted",dataset)

    dataframe.repartition(repartition).write \
    .parquet(PATH,
             mode="overwrite",
             partitionBy=partition)

In [None]:
def trusted_order():

    # Join Order + Consumer + Restaurant
    DF_PRIMARY = RAW_ORDER \
        .join(RAW_CONSUMER, 'customer_id') \
        .join(RAW_RESTAURANT, RAW_RESTAURANT.id == RAW_ORDER.merchant_id).limit(1000)

    ## Order
#     LAST_VALUE_ORDER = s.Window.partitionBy(
#         "customer_id") \
#         .orderBy(sf.col("order_created_at").desc())

#     DF_TRUSTED_ORDER = DF_PRIMARY.dropDuplicates() \
#         .withColumn("distinct", sf.row_number().over(LAST_VALUE_ORDER)) \
#         .filter("distinct = 1") \
#         .drop("distinct")

    ## Status
#     LAST_VALUE_STATUS = s.Window.partitionBy(
#         "order_id") \
#         .orderBy(sf.col("status_created_at").desc())

#     DF_TRUSTED_STATUS = RAW_STATUS.dropDuplicates() \
#         .withColumn("distinct", sf.row_number().over(LAST_VALUE_STATUS)) \
#         .filter("distinct = 1") \
#         .drop("distinct") \
#         .select("order_id", "status_created_at", "status_value")
    
    DF_TRUSTED_ORDER = dedup_dataframe("customer_id", "order_created_at", DF_PRIMARY)
    
    DF_TRUSTED_STATUS = dedup_dataframe("order_id", "status_created_at", RAW_STATUS) \
        .select("order_id", "status_created_at", "status_value")
    
    ## Join
    DF_TRUSTED_FINAL = DF_TRUSTED_ORDER \
        .join(DF_TRUSTED_STATUS, "order_id") \
        .withColumn("partition", sf.lit(datetime_result("today")))

    DF_TRUSTED_ANONYMIZED = hash_column(DF_TRUSTED_FINAL, ['cpf'])
    
    # Write DF in trusted zone.
    write_trusted(DF_TRUSTED_ANONYMIZED, 30, "restaurant_partition", "order")

#     DF_TRUSTED_ANONYMIZED.repartition(30).write \
#         .parquet("/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/trusted/order",
#                  mode="overwrite",
#                  partitionBy="restaurant_partition")

In [None]:
trusted_order()

In [None]:
def trusted_order_items():
    
    local_path_order = "/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/trusted/order"
    DF_TRUSTED = spark.read \
        .parquet(local_path_order)

    # Recover dynamic schema 
    JSON_SCHEMA = spark.read \
        .json(DF_TRUSTED.rdd.map(lambda row: row.items)).schema

    JSON_SCHEMA = st.ArrayType(JSON_SCHEMA)
    DF_FLATTEN = DF_TRUSTED.withColumn("items", sf.from_json("items", JSON_SCHEMA))
    
    # Explode and struturing Items for construct Order Items
    DF_EXPLODE_ITEMS = DF_FLATTEN.select(
        "order_id",
        sf.explode("items").alias("items"))

    DF_EXPLODE_ITEMS_DETAILS = DF_EXPLODE_ITEMS \
        .select("order_id",
                sf.col("items.name").alias("name"),
                sf.col("items.addition").getItem('currency').alias("addition_currency"),
                sf.col("items.addition").getItem('value').alias("addition"),
                sf.col("items.discount").getItem('currency').alias("discount_currency"),
                sf.col("items.discount").getItem('value').alias("discount"),
                sf.col("items.quantity").alias("quantity"),
                sf.col("items.unitPrice").getItem('currency').alias("unit_price_currency"),
                sf.col("items.unitPrice").getItem('value').alias("unit_price"),
                sf.col("items.externalId").alias("external_id"),
                sf.col("items.totalValue").getItem('currency').alias("total_value_currency"),
                sf.col("items.totalValue").getItem('value').alias("total_value"),
                sf.col("items.customerNote").alias("customer_note"),
                sf.col("items.integrationId").alias("integration_id"),
                sf.col("items.totalAddition").getItem('currency').alias("tota_addition_currency"),
                sf.col("items.totalAddition").getItem('value').alias("tota_addition"),
                sf.col("items.totalDiscount").getItem('currency').alias("total_discount_currency"),
                sf.col("items.totalDiscount").getItem('value').alias("total_discount"),
                sf.explode("items.garnishItems.externalId").alias("garnish_external_id"),
                sf.col("items.garnishItems").alias("garnish_items")
                ) \
        .dropDuplicates()
    
    # Explode and struturing Garnish for additing Order Items
    DF_EXPLODE_GARNISH = DF_EXPLODE_ITEMS_DETAILS.select(
        "order_id",
        "garnish_external_id",
        sf.explode("garnish_items").alias("garnish_items"))

    DF_EXPLODE_GARNISH_DETAIL = DF_EXPLODE_GARNISH \
        .select("order_id",           
                sf.col("garnish_items.name").alias("garnish_name"),
                sf.col("garnish_items.addition").getItem('value').alias("garnish_addition"),
                sf.col("garnish_items.addition").getItem('currency').alias("garnish_addition_currency"),
                sf.col("garnish_items.discount").getItem('value').alias("garnish_discount"),
                sf.col("garnish_items.discount").getItem('currency').alias("garnish_discount_currency"),
                sf.col("garnish_items.quantity").alias("garnish_quantity"),
                sf.col("garnish_items.sequence").alias("garnish_sequence"),
                sf.col("garnish_items.unitPrice").getItem('value').alias("garnish_unit_price"),
                sf.col("garnish_items.unitPrice").getItem('currency').alias("garnish_unit_price_currency"),
                sf.col("garnish_items.categoryId").alias("garnish_category_id"),
                sf.col("garnish_items.categoryName").alias("garnish_category_name"),
                sf.col("garnish_items.externalId").alias("garnish_external_id"),
                sf.col("garnish_items.totalValue").getItem('value').alias("garnish_total_value"),
                sf.col("garnish_items.totalValue").getItem('currency').alias("garnish_total_value_currency"),
                sf.col("garnish_items.integrationId").alias("garnish_integration_id"),
                ) \
        .dropDuplicates()

    ## Union itens with garnish dataframe, 
    ## when 1 or more item can have 1 or more garnish.
    DF_UNION_ITENS_GARNISH = DF_EXPLODE_ITEMS_DETAILS \
        .join(DF_EXPLODE_GARNISH_DETAIL, ["order_id","garnish_external_id"]) \
        .drop("garnish_items")
    
    # Write DF in trusted zone.
    write_trusted(DF_UNION_ITENS_GARNISH, 10, None, "order_items")

#     DF_UNION_ITENS_GARNISH.repartition(20).write \
#         .parquet("/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/trusted/order_items",
#                 mode="overwrite")

In [None]:
trusted_order_items()

In [40]:
def trusted_order_status():
    
    local_path_order = "/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/trusted/order"
    DF_ORDER_STATUSES = spark.read \
        .parquet(local_path_order) \

    #     local_path_status = '/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/raw/status'
    #     RAW_STATUS = spark.read \
    #         .load(local_path_status, format='parquet') \
    #         .drop("partition_column") \
    #         .withColumnRenamed("created_at","status_created_at") \
    #         .withColumnRenamed("value", "status_value")

    DF_ORDER_STATUSES.createOrReplaceTempView("order")
    RAW_STATUS.createOrReplaceTempView("status")

    DF_STATUSES_ORDER = spark.sql("""
    select  ord.order_id,
            st1.status_value status_1,
            st1.status_created_at status_created_at_1,
            st2.status_value status_2,
            st2.status_created_at status_created_at_2,
            st3.status_value status_3,
            st3.status_created_at status_created_at_3,
            st4.status_value status_4,
            st4.status_created_at status_created_at_4
    from order ord
    left join (select status_created_at, status_value, order_id
                 from status
                where status_value = 'REGISTERED') st1 on st1.order_id = ord.order_id

    left join (select status_created_at, status_value, order_id
                 from status
                where status_value = 'PLACED') st2 on st2.order_id = ord.order_id

    left join (select status_created_at, status_value, order_id
                 from status
                where status_value = 'CONCLUDED') st3 on st3.order_id = ord.order_id

    left join (select status_created_at, status_value, order_id
                 from status
                where status_value = 'CANCELLED') st4 on st4.order_id = ord.order_id
    """).dropDuplicates()

    write_trusted(DF_STATUSES_ORDER, 1, None, "order_statuses")
    
    
#     DF_STATUSES_ORDER.repartition(1).write \
#         .parquet("/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/trusted/order_statuses",
#                  mode="overwrite")


In [41]:
trusted_order_status()

In [31]:
spark.sql(""" select * from order where order_id = '0002fe02-d7dc-4232-b7ac-3394019ce240'""").show()

+--------+-----------+---+---------------------+------------------------+-------------------------+----------------------------+-------------------------+--------------------------+----------------------+-------------------------+-----+-----------+-----------------+------------------+-----------------+----------------+---------------+--------------------+------------------+---------------+--------+-------------------+------+-------------------+---------------------+---+---------------------+-------+-----------+--------------+------------+-------------+-------------------+-----------------+-------------+--------------+----------------+-----------------+------------+--------------------+
|order_id|customer_id|cpf|delivery_address_city|delivery_address_country|delivery_address_district|delivery_address_external_id|delivery_address_latitude|delivery_address_longitude|delivery_address_state|delivery_address_zip_code|items|merchant_id|merchant_latitude|merchant_longitude|merchant_timezone|or

In [32]:
DF_STATUSES_ORDER.filter("order_id = 'cda2ded6-2f13-4310-8d41-30592eb450f0'").show(20,False)

+------------------------------------+----------+------------------------+--------+------------------------+---------+------------------------+--------+-------------------+
|order_id                            |status_1  |status_created_at_1     |status_2|status_created_at_2     |status_3 |status_created_at_3     |status_4|status_created_at_4|
+------------------------------------+----------+------------------------+--------+------------------------+---------+------------------------+--------+-------------------+
|cda2ded6-2f13-4310-8d41-30592eb450f0|REGISTERED|2019-01-06T23:48:43.000Z|PLACED  |2019-01-06T23:48:44.000Z|CONCLUDED|2019-01-07T00:27:45.000Z|null    |null               |
+------------------------------------+----------+------------------------+--------+------------------------+---------+------------------------+--------+-------------------+



In [33]:
RAW_STATUS.filter("order_id == 'cda2ded6-2f13-4310-8d41-30592eb450f0'").show(10)

+--------------------+--------------------+--------------------+------------+
|   status_created_at|            order_id|           status_id|status_value|
+--------------------+--------------------+--------------------+------------+
|2019-01-06T23:48:...|cda2ded6-2f13-431...|a388eccf-5e0d-4f7...|      PLACED|
|2019-01-06T23:48:...|cda2ded6-2f13-431...|a388eccf-5e0d-4f7...|      PLACED|
|2019-01-07T00:27:...|cda2ded6-2f13-431...|0da68f25-f397-4bd...|   CONCLUDED|
|2019-01-07T00:27:...|cda2ded6-2f13-431...|0da68f25-f397-4bd...|   CONCLUDED|
|2019-01-06T23:48:...|cda2ded6-2f13-431...|9d9dfbe9-ca94-48c...|  REGISTERED|
|2019-01-06T23:48:...|cda2ded6-2f13-431...|9d9dfbe9-ca94-48c...|  REGISTERED|
+--------------------+--------------------+--------------------+------------+



In [None]:
trusted_order_status()

In [None]:
local_path_order = "/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/trusted/order"
df_val = spark.read \
    .parquet(local_path_order).show(1,False)

In [None]:
local_path_order = "/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/trusted/order_items"
df_val = spark.read \
    .parquet(local_path_order).show(10,False)

In [44]:
local_path_order = "/home/murillo-silva/Documents/Projetos-Pessoais/Testes/ifood/database/trusted/order_statuses"
df_vali = spark.read \
    .parquet(local_path_order).show(10,False)

+------------------------------------+----------+------------------------+--------+------------------------+---------+------------------------+--------+-------------------+
|order_id                            |status_1  |status_created_at_1     |status_2|status_created_at_2     |status_3 |status_created_at_3     |status_4|status_created_at_4|
+------------------------------------+----------+------------------------+--------+------------------------+---------+------------------------+--------+-------------------+
|63fdd8db-8e81-4594-a139-73fb4214557c|REGISTERED|2019-01-09T21:27:51.000Z|PLACED  |2019-01-09T21:27:52.000Z|CONCLUDED|2019-01-09T23:30:05.000Z|null    |null               |
|a3352216-91b8-43dd-be37-7f682a61b370|REGISTERED|2019-01-09T19:12:00.000Z|PLACED  |2019-01-09T19:12:01.000Z|CONCLUDED|2019-01-09T21:15:02.000Z|null    |null               |
|dad0c4a0-5e9b-4bde-b972-67d6dd0f3da3|REGISTERED|2019-01-16T23:02:16.000Z|PLACED  |2019-01-16T23:02:17.000Z|CONCLUDED|2019-01-17T01:05:

In [56]:
val_aws = 's3a://ifood-data-architect-test-source-murillo/trusted/order/'
df_vali = spark.read \
    .parquet(val_aws).show(1,False)

+------------------------------------+------------------------------------+----------------------------------------------------------------+---------------------+------------------------+-------------------------+----------------------------+-------------------------+--------------------------+----------------------+-------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [57]:
val_aws = 's3a://ifood-data-architect-test-source-murillo/trusted/order_items/'
df_vali = spark.read \
    .parquet(val_aws).show(1,False)

+------------------------------------+--------------------------------+----------------------------------------------+-----------------+--------+-----------------+--------+--------+-------------------+----------+--------------------------------+--------------------+-----------+-------------+--------------+----------------------+-------------+-----------------------+--------------+----------------+----------------+-------------------------+----------------+-------------------------+----------------+----------------+------------------+---------------------------+-------------------+---------------------+-------------------+----------------------------+----------------------+
|order_id                            |garnish_external_id             |name                                          |addition_currency|addition|discount_currency|discount|quantity|unit_price_currency|unit_price|external_id                     |total_value_currency|total_value|customer_note|integration_id|tota_additi

In [59]:
val_aws = 's3a://ifood-data-architect-test-source-murillo/trusted/order_statuses/'
df_vali = spark.read \
    .parquet(val_aws).show(1,False)

+------------------------------------+----------+------------------------+--------+------------------------+---------+------------------------+--------+-------------------+
|order_id                            |status_1  |status_created_at_1     |status_2|status_created_at_2     |status_3 |status_created_at_3     |status_4|status_created_at_4|
+------------------------------------+----------+------------------------+--------+------------------------+---------+------------------------+--------+-------------------+
|0002fe02-d7dc-4232-b7ac-3394019ce240|REGISTERED|2019-01-24T23:04:27.000Z|PLACED  |2019-01-24T23:04:28.000Z|CONCLUDED|2019-01-25T01:05:07.000Z|null    |null               |
+------------------------------------+----------+------------------------+--------+------------------------+---------+------------------------+--------+-------------------+
only showing top 1 row



In [None]:
def datetime_result(date_type):
    """ Recover brazilian time.
    """ 
    if date_type == 'today':
        date_now = (dt.datetime.utcnow() - dt.timedelta(hours=3)).strftime('%Y-%m-%d')
        return str(date_now)
    if date_type == 'today_timestamp':
        date_now = (dt.datetime.utcnow() - dt.timedelta(hours=3)).strftime('%Y-%m-%d %H:%M:%S')
        return str(date_now)
    else:
        date_yesternoon = (dt.datetime.utcnow() - dt.timedelta(days=1)).strftime('%Y-%m-%d')
        return str(date_yesternoon)

In [None]:
def checking_quality(actual, previous):
    """
    :param actual: number of records comming from last trusted zone.
    :param previous: number of records comming from the wildcard previues trusted zone.
    :return: TRUE, if the dataset is OK to be inserted / FALSE, if any problem
    related to quality was found.
    """
    # Checking the percentage of drop in the counting
    maximum_percentage_of_drop_in_count = 80

    logger.info(f"Quality Message: Actual {actual} - Previous {previous}")
    
    if actual < previous:
        if ((((actual / previous) - 1) * 100) * -1) >= maximum_percentage_of_drop_in_count:  # noqa
            return False

    return True

In [134]:
def write_trusted(dataframe, repartition, partition, dataset):
    
    PATH = path_read_write("trusted", dataset)
    try:
        # Todo, find last dataset wildcard using boto3 for figureout that.
        DF_LATEST = spark.read \
            .parquet(PATH + '/partition=' + datetime_result("yesterday"))
        latest_v2_count = DF_LATEST.count()
    
    except Exception:
        latest_v2_count = 1
    
    logger.info("Getting the count from the current versions of the trusted")
    latest_count = dataframe.count()
    quality_check = checking_quality(latest_count, latest_v2_count)
    
    if quality_check:
        
        dataframe.repartition(repartition).write \
        .parquet(PATH + '/partition=' + datetime_result("today"),
             mode="overwrite",
             partitionBy=partition)
    else:
        logger.info(f"Quality Urgent: Dataset: {dataset} are 80% lower. Check it out!!")

In [67]:
import boto3

BUCKET='ifood-data-architect-test-source-murillo'

In [64]:
s3 = boto3.client('s3')
s3.list_objects_v2(Bucket=BUCKET)

{'ResponseMetadata': {'RequestId': 'A8AA3AB6C8A384BF',
  'HostId': 'eAppGnGbVQeDV/rlbGvhOyB86zLNBzpNMP+6koFMT1oPpZg9DwccKO4uTMvu/XBp14EXW/frvKU=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'eAppGnGbVQeDV/rlbGvhOyB86zLNBzpNMP+6koFMT1oPpZg9DwccKO4uTMvu/XBp14EXW/frvKU=',
   'x-amz-request-id': 'A8AA3AB6C8A384BF',
   'date': 'Mon, 20 Jul 2020 13:37:18 GMT',
   'x-amz-bucket-region': 'us-east-1',
   'content-type': 'application/xml',
   'transfer-encoding': 'chunked',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'IsTruncated': False,
 'Contents': [{'Key': 'raw/consumer/partition_column=2020-07-20/part-00000-d3e75e66-72b6-451f-948b-38542c601572.c000.snappy.parquet',
   'LastModified': datetime.datetime(2020, 7, 20, 13, 3, 28, tzinfo=tzutc()),
   'ETag': '"d3a309eb02792515fef07a026c1e0490"',
   'Size': 47544578,
   'StorageClass': 'STANDARD'},
  {'Key': 'raw/consumer/partition_column=2020-07-20_$folder$',
   'LastModified': datetime.datetime(2020, 7, 20, 13, 3, 28, tzinfo=

In [65]:
def get_s3_keys(bucket):
    """Get a list of keys in an S3 bucket."""
    keys = []
    resp = s3.list_objects_v2(Bucket=bucket)
    for obj in resp['Contents']:
        keys.append(obj['Key'])
    return keys

In [114]:
len("order")

5

In [127]:
keys = []
resp = s3.list_objects_v2(Bucket=BUCKET)
prefix = "trusted"
# suffix = "part-"
for obj in resp['Contents']:
    #keys.append(obj['Key'])
    key = obj['Key']
    if key.startswith(prefix):
        
        ln = len("status")
        st = key.find("order",ln)
        #idx = key.find("/",9)
        print(key[ln+ln+4:])
#         print(st)


staurant_partition=2020-07-20/part-00000-a05f8403-73e2-4816-aaf4-adc5e67265ef.c000.snappy.parquet
staurant_partition=2020-07-20/part-00001-a05f8403-73e2-4816-aaf4-adc5e67265ef.c000.snappy.parquet
staurant_partition=2020-07-20/part-00002-a05f8403-73e2-4816-aaf4-adc5e67265ef.c000.snappy.parquet
staurant_partition=2020-07-20/part-00003-a05f8403-73e2-4816-aaf4-adc5e67265ef.c000.snappy.parquet
staurant_partition=2020-07-20/part-00004-a05f8403-73e2-4816-aaf4-adc5e67265ef.c000.snappy.parquet
staurant_partition=2020-07-20/part-00005-a05f8403-73e2-4816-aaf4-adc5e67265ef.c000.snappy.parquet
staurant_partition=2020-07-20/part-00006-a05f8403-73e2-4816-aaf4-adc5e67265ef.c000.snappy.parquet
staurant_partition=2020-07-20/part-00007-a05f8403-73e2-4816-aaf4-adc5e67265ef.c000.snappy.parquet
staurant_partition=2020-07-20/part-00008-a05f8403-73e2-4816-aaf4-adc5e67265ef.c000.snappy.parquet
staurant_partition=2020-07-20/part-00009-a05f8403-73e2-4816-aaf4-adc5e67265ef.c000.snappy.parquet
staurant_partition=

In [77]:
st = 'trusted/order/restaurant_partition=2020-07-20/part-00000-a05f8403-73e2-4816-aaf4-adc5e67265ef.c000.snappy.parquet'

st.find("/",start=1)

TypeError: find() takes no keyword arguments

In [85]:
def get_matching_s3_keys(bucket, prefix='', suffix=''):
    """
    Generate the keys in an S3 bucket.

    :param bucket: Name of the S3 bucket.
    :param prefix: Only fetch keys that start with this prefix (optional).
    :param suffix: Only fetch keys that end with this suffix (optional).
    """
    s3 = boto3.client('s3')
    kwargs = {'Bucket': bucket}

    # If the prefix is a single string (not a tuple of strings), we can
    # do the filtering directly in the S3 API.
    if isinstance(prefix, str):
        kwargs['Prefix'] = prefix

    while True:

        # The S3 API response is a large blob of metadata.
        # 'Contents' contains information about the listed objects.
        resp = s3.list_objects_v2(**kwargs)
        for obj in resp['Contents']:
            key = obj['Key']
            if key.startswith(prefix) and key.endswith(suffix):
                print(key)

        # The S3 API is paginated, returning up to 1000 keys at a time.
        # Pass the continuation token into the next response, until we
        # reach the final page (when this field is missing).
#         try:
#             kwargs['ContinuationToken'] = resp['NextContinuationToken']
#         except KeyError:
#             break

In [88]:
get_matching_s3_keys(BUCKET, prefix="order", suffix="/")

KeyError: 'Contents'

In [68]:
get_s3_keys(BUCKET)

['raw/consumer/partition_column=2020-07-20/part-00000-d3e75e66-72b6-451f-948b-38542c601572.c000.snappy.parquet',
 'raw/consumer/partition_column=2020-07-20_$folder$',
 'raw/consumer_$folder$',
 'raw/order/partition_column=2020-07-20/part-00000-5df0702b-bc9e-4622-ac30-7b8458e6127a.c000.snappy.parquet',
 'raw/order/partition_column=2020-07-20_$folder$',
 'raw/order_$folder$',
 'raw/restaurant/partition_column=2020-07-20/part-00000-06771bd7-8c0a-435b-a808-938e0239f2d3.c000.snappy.parquet',
 'raw/restaurant/partition_column=2020-07-20_$folder$',
 'raw/restaurant_$folder$',
 'raw/status/partition_column=2020-07-20/part-00000-7ff354c6-aa30-4e10-8061-2eed71471918.c000.snappy.parquet',
 'raw/status/partition_column=2020-07-20_$folder$',
 'raw/status_$folder$',
 'raw_$folder$',
 'trusted/',
 'trusted/order/restaurant_partition=2020-07-20/part-00000-a05f8403-73e2-4816-aaf4-adc5e67265ef.c000.snappy.parquet',
 'trusted/order/restaurant_partition=2020-07-20/part-00001-a05f8403-73e2-4816-aaf4-adc5e6