In [1]:
from pyspark.sql.functions import sum, col, concat, when
from pyspark.sql.functions import to_timestamp, date_format, to_date
from pyspark.sql.functions import desc, last_day, lpad

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 3, Finished, Available)

## **Get cleaned_df**

In [3]:
df = spark.sql("SELECT * FROM LTT_BronzeLakehouse.cleaned_df")

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 5, Finished, Available)

In [4]:
df.count()

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 6, Finished, Available)

5322

In [6]:
dim_customer = spark.sql("SELECT * FROM LTT_GoldLakehouse.dim_customer")
dim_product = spark.sql("SELECT * FROM LTT_GoldLakehouse.dim_product")
dim_category = spark.sql("SELECT * FROM LTT_GoldLakehouse.dim_category")
dim_department = spark.sql("SELECT * FROM LTT_GoldLakehouse.dim_department")

dim_store = spark.sql("SELECT * FROM LTT_GoldLakehouse.dim_store")
dim_destination = spark.sql("SELECT * FROM LTT_GoldLakehouse.dim_destination")

dim_order_status = spark.sql("SELECT * FROM LTT_GoldLakehouse.dim_order_status")
dim_delivery_risk = spark.sql("SELECT * FROM LTT_GoldLakehouse.dim_delivery_risk")
dim_delivery_status = spark.sql("SELECT * FROM LTT_GoldLakehouse.dim_delivery_status")
dim_shipping = spark.sql("SELECT * FROM LTT_GoldLakehouse.dim_shipping_mode")
dim_transaction_type = spark.sql("SELECT * FROM LTT_GoldLakehouse.dim_transaction_type")

fact_sales = spark.sql("SELECT * FROM LTT_GoldLakehouse.fact_sales")

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 8, Finished, Available)

In [7]:
display(df.head(5))

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 9, Finished, Available)

SynapseWidget(Synapse.DataFrame, 490a64f5-1d1a-4b1d-8be1-b5e332bbfd38)

In [8]:
fact_sales.count()

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 10, Finished, Available)

100

## **Merge with mapping tables**

In [9]:
df.select('order_id')
# mapping tables
df.select('order_status', 'late_delivery_risk', 'shipping_mode','transaction_type', 'delivery_status')
# location tables
df.select('concat_destination_address', 'concat_customer_region')
# scd tables
df.select('customer_id', 'product_id')
# other columns
df.select('order_item_quantity', 'order_item_discount', 'order_date', 'shipping_date', 'sales', 'benefit_per_order')

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 11, Finished, Available)

DataFrame[order_item_quantity: int, order_item_discount: double, order_date: timestamp, shipping_date: timestamp, sales: double, benefit_per_order: double]

In [10]:
order_columns = [ \
        'order_id',
        'order_status', 'late_delivery_risk', 'shipping_mode', 'transaction_type', 'delivery_status',
        'concat_destination_address', 'concat_customer_region',
        'customer_id', 'product_id',
        'order_item_quantity', 'order_item_discount', 'order_date', 'shipping_date', 'sales', 'benefit_per_order'
]

# product - category - deparment
orders = df.select(order_columns)
display(orders.head(5))

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 12, Finished, Available)

SynapseWidget(Synapse.DataFrame, 66c3d074-ee3f-4d0a-9afa-cf343f45fcc3)

In [11]:
# các hình thức shipping này là như thế nào?
# dim_shipping.show()

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 13, Finished, Available)

In [12]:
orders = orders.join(
    dim_order_status,
    orders['order_status'] == dim_order_status['order_status']
).drop(orders['order_status'], dim_order_status['order_status'])

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 14, Finished, Available)

In [13]:
orders = orders.join(
    dim_shipping,
    orders['shipping_mode'] == dim_shipping['shipping_mode']
).drop(orders['shipping_mode'], dim_shipping['shipping_mode'])

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 15, Finished, Available)

In [14]:
orders = orders.join(
    dim_transaction_type,
    orders['transaction_type'] == dim_transaction_type['transaction_type']
).drop(orders['transaction_type'], dim_transaction_type['transaction_type'])

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 16, Finished, Available)

In [15]:
dim_delivery_status.show()

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 17, Finished, Available)

+-------------------+-----------------+
|delivery_status_key|  delivery_status|
+-------------------+-----------------+
|                  3|Shipping canceled|
|                  4| Shipping on time|
|                  1| Advance shipping|
|                  2|    Late delivery|
+-------------------+-----------------+



In [16]:
orders = orders.join(
    dim_delivery_status,
    orders['delivery_status'] == dim_delivery_status['delivery_status']
).drop(orders['delivery_status'], dim_delivery_status['delivery_status'])

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 18, Finished, Available)

In [17]:
orders = orders.withColumn("delivery_risk_name",
                            when(orders["late_delivery_risk"] == 1, "is late")
                            .when(orders["late_delivery_risk"] == 0, "not late")
                            .otherwise("unknown"))
orders = orders.drop('late_delivery_risk')

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 19, Finished, Available)

In [18]:
orders = orders.join(
    dim_delivery_risk,
    orders['delivery_risk_name'] == dim_delivery_risk['delivery_risk_name']
).drop(orders['delivery_risk_name'], dim_delivery_risk['delivery_risk_name'])

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 20, Finished, Available)

In [19]:
display(orders.head(5))

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 21, Finished, Available)

SynapseWidget(Synapse.DataFrame, 2b74841b-6f4a-4cf3-869f-7b0c59e0fda3)

## **Merge with location tables**

In [20]:
orders = \
orders.join(
    dim_destination,
    orders['concat_destination_address'] == dim_destination['concat_destination_address'],
).drop('concat_destination_address', 'desti_city', 'desti_state', 'desti_country', 'desti_region', 'desti_market')

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 22, Finished, Available)

In [21]:
orders = \
orders.join(
    dim_store,
    orders['concat_customer_region'] == dim_store['concat_customer_region'],
).drop('concat_customer_region', 'store_country', 'store_state', 'store_city', 'store_name', 'longitude', 'latitude')

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 23, Finished, Available)

In [22]:
display(orders.head(5))

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 24, Finished, Available)

SynapseWidget(Synapse.DataFrame, 6fb93259-4582-4c7e-a6ec-8b7b50581dee)

## **Merge with SCD Tables**

In [23]:
orders = \
orders.join(
    dim_customer,
    orders['customer_id'] == dim_customer['customer_id']
).drop('customer_id', 'customer_fname', 'customer_lname', 'customer_segment', 'valid_from', 'valid_to', 'is_valid')

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 25, Finished, Available)

In [24]:
orders = \
orders.join(
    dim_product,
    orders['product_id'] == dim_product['product_id']
).drop('product_id', 'product_category_id', 'product_name', 'product_price', 'product_status', 'valid_from', 'valid_to', 'is_valid')

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 26, Finished, Available)

## **Merge with order date and order time**

In [25]:
# Merge order_date_key and shipping_time_key into orders
orders = orders.withColumn('order_time_key', date_format('order_date', 'hhmmss'))
orders = orders.withColumn('order_date', date_format('order_date', 'yyyyMMdd'))
orders = orders.withColumnRenamed('order_date', 'order_date_key')

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 27, Finished, Available)

In [26]:
# Merge shipping_date_key and shipping_time_key into orders
orders = orders.withColumn('shipping_time_key', date_format('shipping_date', 'hhmmss'))
orders = orders.withColumn('shipping_date', date_format('shipping_date', 'yyyyMMdd'))
orders = orders.withColumnRenamed('shipping_date', 'shipping_date_key')

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 28, Finished, Available)

In [27]:
columns = [
    'order_id',
    'product_key',
    'customer_key',
    'store_key',
    'desti_key',
    'delivery_risk_key',
    'delivery_status_key',
    'transaction_type_key',
    'order_status_key',
    'shipping_mode_key',
    'order_item_discount',
    'order_item_quantity',
    'order_date_key',
    'order_time_key',
    'shipping_date_key',
    'shipping_time_key',
    'sales',
    'benefit_per_order'
]
orders = orders.select(columns)

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 29, Finished, Available)

In [28]:
print(fact_sales.count())
print(orders.count())

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 30, Finished, Available)

100
5322


In [29]:
fact_sales = fact_sales.dropDuplicates()
orders = orders.dropDuplicates()
print(fact_sales.count())
print(orders.count())
orders_append = orders.exceptAll(fact_sales)
print(orders_append.count())

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 31, Finished, Available)

100
5322
5222


In [30]:
orders.createOrReplaceTempView('orders')
fact_sales.createOrReplaceTempView('fact_sales')

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 32, Finished, Available)

In [31]:
# select new orders and modified orders except from fact_sales

# new_orders = \
# spark.sql("""
#     select *
#     from orders 
#     where order_id not in (
#         select order_id
#         from fact_sales
#     )
# """)

# modified_orders = \
# spark.sql("""
#     select o.*
#     from orders o
#     join fact_sales f
#     on o.order_id == f.order_id
#     where (
#         o.product_key != f.product_key
#         or o.customer_key != f.customer_key
#         or o.store_key != f.store_key
#         or o.desti_key != f.desti_key
#         or o.delivery_risk_key != f.delivery_risk_key
#         or o.delivery_status_key != f.delivery_status_key
#         or o.transaction_type_key != f.transaction_type_key
#         or o.order_status_key != f.order_status_key
#         or o.shipping_mode_key != f.shipping_mode_key
#         or o.order_item_discount != f.order_item_discount
#         or o.order_item_quantity != f.order_item_quantity
#         or o.order_date_key != f.order_date_key
#         or o.order_time_key != f.order_time_key
#         or o.shipping_date_key != f.shipping_date_key
#         or o.sales != f.sales
#         or o.benefit_per_order != f.benefit_per_order
#     )
# """)

# print(new_orders.count())
# print(modified_orders.count())

# # combine 2 tables above
# combined_orders = new_orders.union(modified_orders)

# print(combined_orders.count())

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 33, Finished, Available)

In [32]:
orders_append.write.format('delta').mode('append').saveAsTable('LTT_GoldLakehouse.fact_sales')

StatementMeta(, 6ae812bf-6990-431e-8e9e-bf6a709cdc7c, 34, Finished, Available)