In [1]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import from_json, col
from functools import reduce

In [2]:
spark = SparkSession.builder.appName('trusted_order') \
        .master("local[*]") \
        .config("spark.sql.streaming.schemaInference", True) \
        .config("spark.sql.warehouse.dir", '/opt/workspace/') \
        .config("spark.sql.sources.partitionOverwriteMode", 'dynamic') \
        .config("spark.sql.parquet.columnarReaderBatchSize", 1000) \
        .enableHiveSupport() \
        .getOrCreate()

In [3]:
items_schema = StructType([
    StructField('items', 
                ArrayType(
                    StructType(
                        [
                            StructField('name', StringType(), True),
                            StructField('addition', StructType([
                                StructField('value', StringType(), True),
                                StructField('currency', StringType(), True)
                            ]), True),
                            StructField('discount', StructType([
                                StructField('value', StringType(), True),
                                StructField('currency', StringType(), True)
                            ]), True),
                            StructField('quantity', StringType(), True),
                            StructField('sequence', StringType(), True),
                            StructField('unitPrice', StructType([
                                StructField('value', StringType(), True),
                                StructField('currency', StringType(), True)
                            ]), True),
                            StructField('externalId', StringType(), True),
                            StructField('totalValue', StructType([
                                StructField('value', StringType(), True),
                                StructField('currency', StringType(), True)
                            ]), True),
                            StructField('customerNote', StringType(), True),
                            StructField('garnishItems', ArrayType(
                                StructType([
                                    StructField('name', StringType(), True),
                                    StructField('addition', StructType([
                                        StructField('value', StringType(), True),
                                        StructField('currency', StringType(), True)
                                    ])),
                                    StructField('discount', StructType([
                                        StructField('value', StringType(), True),
                                        StructField('currency', StringType(), True)
                                    ])),
                                    StructField('quantity', StringType(), True),
                                    StructField('sequence', StringType(), True),
                                    StructField('unitPrice', StructType([
                                        StructField('value', StringType(), True),
                                        StructField('currency', StringType(), True)
                                        ]), True),
                                    StructField('categoryId', StringType(), True),
                                    StructField('externalId', StringType(), True),
                                    StructField('totalValue', StructType([
                                        StructField('value', StringType(), True),
                                        StructField('currency', StringType(), True)
                                    ]), True),
                                    StructField('categoryName', StringType(), True),
                                    StructField('integrationId', StringType(), True),
                                    ])
                            ), True),
                            StructField('integrationId', StringType(), True),
                            StructField('totalAddition', StructType([
                                StructField('value', StringType(), True),
                                StructField('currency', StringType(), True)
                            ]), True),
                            StructField('totalDiscount', StructType([
                                StructField('value', StringType(), True),
                                StructField('currency', StringType(), True)
                            ]), True),
                        ]
                    )
                )
               )
])

In [4]:
spark.sql("""

with deduplicated_raw_order as (
    select
        min(o.order_created_at) as order_created_at
        , order_id
        , items
    from raw.order o
    group by 2,3)
        
select to_date(order_created_at) as partition_date, order_id, CONCAT('{ "items":', items, '}') as items from deduplicated_raw_order

""").withColumn('items_structured', from_json(col('items'), items_schema)).drop('items').registerTempTable('semi_structured_items')

In [5]:
spark.udf.register('sum_garnish_items_quantity', lambda x=[0,0]: str(reduce(lambda x=0,y=0: float(x)+float(y), x, 0)))
spark.udf.register('sum_garnish_items_total_value', lambda x: str(reduce(lambda x=0,y=0: x+int(y['value']), x, 0)))

<function __main__.<lambda>(x)>

In [6]:
trusted_order_items_query = spark.sql("""

with exploded_items as (
    select 
        explode(items_structured.items) as items
        , order_id
        , partition_date
    from semi_structured_items
)


select 
    order_id
    , partition_date
    , items.name as item_name
    , items.quantity as item_ordered_quantity
    , items.addition.value as addition_value
    , items.addition.currency as addition_currency
    , items.discount.value as discount_value
    , items.discount.currency as discount_currency
    , items.unitPrice.value as unit_price_value
    , items.unitPrice.currency as unit_price_currency
    , items.totalValue.value as total_value_value
    , items.totalValue.currency as total_value_currency
    , items.totalAddition.value as total_addition_value
    , items.totalAddition.currency as total_addition_currency
    , items.totalDiscount.value as total_discount_value
    , items.totalDiscount.currency as total_discount_currency
    , items.sequence
    , items.externalId
    , items.customerNote
    , items.integrationId
    , sum_garnish_items_quantity(items.garnishItems.quantity) as total_quantity_of_garnish_items
    , sum_garnish_items_total_value(items.garnishItems.totalValue) as total_value_of_garnish_items
    , (float(items.unitPrice.value) + float(items.addition.value)) * float(items.quantity) as items_total_value_without_garnish
    , (float(items.unitPrice.value) + float(items.addition.value) - float(items.discount.value)) * float(items.quantity) + coalesce(sum_garnish_items_total_value(items.garnishItems.totalValue),0) as item_total_value_with_discount_and_garnish
    , items.garnishItems.name as garnish_items
    , items.garnishItems.addition as garnish_additions
    , items.garnishItems.discount as garnish_discounts
    , items.garnishItems.quantity as garnish_quantity
    , items.garnishItems.sequence as garnish_sequence
    , items.garnishItems.unitPrice as garnish_unit_price
    , items.garnishItems.categoryId as garnish_category_id
    , items.garnishItems.totalValue as garnish_total_value
    , items.garnishItems.categoryName as garnish_category_name
    , items.garnishItems.integrationId as garnish_integration_id
    
from exploded_items
""").write.partitionBy('partition_date').format('parquet').mode('overwrite').saveAsTable('trusted.order_items')

In [7]:
spark.stop()

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 order_id                                   | 0047a93d-289e-456c-b574-05f1330cfc31                                                                                                                                                                                                                                                                                                                                                                            
 item_name                                  | QUADRADINHA (4 PEDAÇOS) 2 SABORES                           