In [0]:
%run ./01-config

In [0]:
class Upserter:
    def __init__(self, merge_query, temp_view_name):
        self.merge_query = merge_query
        self.temp_view_name = temp_view_name 
        
    def upsert(self, df_micro_batch, batch_id):
        df_micro_batch.createOrReplaceTempView(self.temp_view_name)
        df_micro_batch._jdf.sparkSession().sql(self.merge_query)

# COMMAND ----------

class Silver():
    def __init__(self, env):
        self.Conf = Config() 
        self.checkpoint_base = self.Conf.base_dir_checkpoint + "/checkpoints"
        self.catalog = env
        self.db_sv_name = "ecommerce_db_sv"
        self.db_bz_name = "ecommerce_db_bz"
        self.maxFilesPerTrigger = self.Conf.maxFilesPerTrigger
        spark.sql(f"USE {self.catalog}.{self.db_sv_name}")

    def cleanup(self): 
        print(f"Đang xóa {self.checkpoint_base}...", end='')
        dbutils.fs.rm(f"{self.checkpoint_base}/customers_sv", True)
        dbutils.fs.rm(f"{self.checkpoint_base}/orders_sv", True)
        dbutils.fs.rm(f"{self.checkpoint_base}/order_items_sv", True)
        dbutils.fs.rm(f"{self.checkpoint_base}/order_payment_sv", True)
        dbutils.fs.rm(f"{self.checkpoint_base}/order_reviews_sv", True)
        dbutils.fs.rm(f"{self.checkpoint_base}/products_sv", True)
        dbutils.fs.rm(f"{self.checkpoint_base}/sellers_sv", True)
        dbutils.fs.rm(f"{self.checkpoint_base}/sale_sv", True)
        dbutils.fs.rm(f"{self.checkpoint_base}/order_detail_sv", True)
        
        print("Hoàn thành!")  

    def upsert_customer_sv(self, once=True, processing_time="15 seconds"):
        from pyspark.sql import functions as F
        query = f"""
            MERGE INTO {self.catalog}.{self.db_sv_name}.customer_sv target
            USING customer_delta source
            ON target.customer_id=source.customer_id
            WHEN MATCHED THEN UPDATE SET
            target.customer_unique_id = source.customer_unique_id,
            target.customer_city = source.customer_city,
            target.customer_state = source.customer_state
            WHEN NOT MATCHED THEN INSERT (
            customer_id, customer_unique_id, customer_city, customer_state
            ) VALUES (
            source.customer_id, source.customer_unique_id, source.customer_city, source.customer_state
            )
            """
        
        data_upserter=Upserter(query, "customer_delta")
        df_delta = (spark.readStream
                         .table(f"{self.catalog}.{self.db_bz_name}.customer_bz")
                         .selectExpr("customer_id","customer_unique_id", "customer_city", "customer_state")
                         .dropDuplicates(["customer_id"])
                   )
        
        stream_writer = (df_delta.writeStream
                                 .foreachBatch(data_upserter.upsert)
                                 .outputMode("update")
                                 .option("checkpointLocation", f"{self.checkpoint_base}/customers_sv")
                                 .queryName("customers_upsert_stream")
                        )

        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "silver_p1")
        
        if once == True:
            return stream_writer.trigger(availableNow=True).start()
        else:
            return stream_writer.trigger(processingTime=processing_time).start()
        
    def upsert_orders_sv(self, once=True, processing_time="15 seconds"):
        from pyspark.sql import functions as F
        query = f"""
            MERGE INTO {self.catalog}.{self.db_sv_name}.orders_sv AS target
            USING orders_delta AS source
            ON target.order_id = source.order_id
            WHEN MATCHED THEN UPDATE SET
            target.order_status = source.order_status,
            target.order_approved_at = source.order_approved_at,
            target.order_delivered_carrier_date = source.order_delivered_carrier_date,
            target.order_delivered_customer_date = source.order_delivered_customer_date,
            target.order_estimated_delivery_date = source.order_estimated_delivery_date
            WHEN NOT MATCHED THEN INSERT (
            order_id, customer_id, order_status,
            order_purchase_timestamp, order_approved_at,
            order_delivered_carrier_date, order_delivered_customer_date, order_estimated_delivery_date
            ) VALUES (
            source.order_id, source.customer_id, source.order_status,
            source.order_purchase_timestamp, source.order_approved_at,
            source.order_delivered_carrier_date, source.order_delivered_customer_date, source.order_estimated_delivery_date
            )
            """
        
        data_upserter=Upserter(query, "orders_delta")
        df_delta = (spark.readStream
                         .table(f"{self.catalog}.{self.db_bz_name}.orders_bz")
                         .selectExpr("order_id", "customer_id","order_status", "order_purchase_timestamp", "order_approved_at", "order_delivered_carrier_date", "order_delivered_customer_date","order_estimated_delivery_date")
                         .dropDuplicates(["order_id"])
                   )
        
        stream_writer = (df_delta.writeStream
                                 .foreachBatch(data_upserter.upsert)
                                 .outputMode("update")
                                 .option("checkpointLocation", f"{self.checkpoint_base}/orders_sv")
                                 .queryName("orders_upsert_stream")
                        )

        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "silver_p2")
        
        if once == True:
            return stream_writer.trigger(availableNow=True).start()
        else:
            return stream_writer.trigger(processingTime=processing_time).start()
        
    def upsert_order_items_sv(self, once=True, processing_time="15 seconds"):
        from pyspark.sql import functions as F
        query = f"""
            MERGE INTO {self.catalog}.{self.db_sv_name}.order_items_sv AS target
            USING order_items_delta AS source
            ON target.order_id = source.order_id AND target.order_item_id = source.order_item_id AND target.product_id = source.product_id
            WHEN MATCHED THEN UPDATE SET
            target.seller_id = source.seller_id,
            target.shipping_limit_date = source.shipping_limit_date,
            target.price = source.price,
            target.freight_value = source.freight_value
            WHEN NOT MATCHED THEN INSERT (
            order_id, order_item_id, product_id, seller_id, shipping_limit_date, price, freight_value
            ) VALUES (
            source.order_id, source.order_item_id, source.product_id, source.seller_id, source.shipping_limit_date, source.price, source.freight_value
            );
            """
        
        data_upserter=Upserter(query, "order_items_delta")
        df_delta = (spark.readStream
                         .table(f"{self.catalog}.{self.db_bz_name}.order_items_bz")
                         .selectExpr("order_id", "order_item_id", "product_id", "seller_id", "shipping_limit_date", "price", "freight_value")
                         .dropDuplicates(["order_id", "order_item_id", "product_id"])
                   )
        
        stream_writer = (df_delta.writeStream
                                 .foreachBatch(data_upserter.upsert)
                                 .outputMode("update")
                                 .option("checkpointLocation", f"{self.checkpoint_base}/order_items_sv")
                                 .queryName("order_items_upsert_stream")
                        )

        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "silver_p3")
        
        if once == True:
            return stream_writer.trigger(availableNow=True).start()
        else:
            return stream_writer.trigger(processingTime=processing_time).start()


    def delete_null_category_items(self):
        delete_query = f"""
            DELETE FROM {self.catalog}.{self.db_sv_name}.order_items_sv
            WHERE product_id IN (
                SELECT product_id
                FROM {self.catalog}.{self.db_sv_name}.products_sv
                WHERE product_category_name IS NULL
            )
        """
        spark.sql(delete_query)
    
    def upsert_order_payment_sv(self, once=True, processing_time="15 seconds"):
        from pyspark.sql import functions as F
        query = f"""
            MERGE INTO {self.catalog}.{self.db_sv_name}.order_payment_sv AS target
            USING order_payment_delta AS source
            ON target.order_id = source.order_id
            AND target.payment_sequential = source.payment_sequential
            WHEN MATCHED THEN UPDATE SET
            target.payment_type = source.payment_type,
            target.payment_installments = source.payment_installments,
            target.payment_value = source.payment_value
            WHEN NOT MATCHED THEN INSERT (
            order_id, payment_sequential, payment_type,
            payment_installments, payment_value
        ) VALUES (
            source.order_id, source.payment_sequential, source.payment_type,
            source.payment_installments, source.payment_value
        )
            """
        
        data_upserter=Upserter(query, "order_payment_delta")
        df_delta = (spark.readStream
                         .table(f"{self.catalog}.{self.db_bz_name}.order_payment_bz")
                         .selectExpr("order_id", "payment_sequential", "payment_type", "payment_installments" , "payment_value")
                         .dropDuplicates(["order_id", "payment_sequential"])
                   )
        
        stream_writer = (df_delta.writeStream
                                 .foreachBatch(data_upserter.upsert)
                                 .outputMode("update")
                                 .option("checkpointLocation", f"{self.checkpoint_base}/order_payment_sv")
                                 .queryName("order_payment_upsert_stream")
                        )

        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "silver_p4")
        
        if once == True:
            return stream_writer.trigger(availableNow=True).start()
        else:
            return stream_writer.trigger(processingTime=processing_time).start()
        
    def upsert_order_reviews_sv(self, once=True, processing_time="15 seconds"):
        from pyspark.sql import functions as F
        query = f"""
            MERGE INTO {self.catalog}.{self.db_sv_name}.order_reviews_sv AS target
            USING order_reviews_delta AS source
            ON target.review_id = source.review_id AND target.order_id = source.order_id
            WHEN MATCHED THEN UPDATE SET
            target.review_score = source.review_score,
            target.review_comment_title = source.review_comment_title,
            target.review_comment_message = source.review_comment_message,
            target.review_creation_date = source.review_creation_date,
            target.review_answer_timestamp = source.review_answer_timestamp
            WHEN NOT MATCHED THEN INSERT (
            review_id, order_id, review_score, review_comment_title,
            review_comment_message, review_creation_date, review_answer_timestamp
            ) VALUES (
            source.review_id, source.order_id, source.review_score, source.review_comment_title,
            source.review_comment_message, source.review_creation_date, source.review_answer_timestamp
            )
            """
        
        data_upserter=Upserter(query, "order_reviews_delta")
        df_delta = (spark.readStream
                         .table(f"{self.catalog}.{self.db_bz_name}.order_reviews_bz")
                         .selectExpr("review_id" , "order_id", "review_score", "review_comment_title", "review_comment_message", "review_creation_date","review_answer_timestamp")
                         .filter("review_score IS NOT NULL")
                         .dropDuplicates(["review_id", "order_id"])
                   )
        
        stream_writer = (df_delta.writeStream
                                 .foreachBatch(data_upserter.upsert)
                                 .outputMode("update")
                                 .option("checkpointLocation", f"{self.checkpoint_base}/order_reviews_sv")
                                 .queryName("order_reviews_upsert_stream")
                        )

        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "silver_p5")
        
        if once == True:
            return stream_writer.trigger(availableNow=True).start()
        else:
            return stream_writer.trigger(processingTime=processing_time).start()
        
    def upsert_sellers_sv(self, once=True, processing_time="15 seconds"):
        from pyspark.sql import functions as F
        query = f"""
            MERGE INTO {self.catalog}.{self.db_sv_name}.sellers_sv AS target
            USING sellers_delta AS source
            ON target.seller_id = source.seller_id
            WHEN MATCHED THEN UPDATE SET
            target.seller_city = source.seller_city,
            target.seller_state = source.seller_state
            WHEN NOT MATCHED THEN INSERT (
            seller_id, seller_city, seller_state
            ) VALUES (
            source.seller_id, source.seller_city, source.seller_state
            )
            """
        
        data_upserter=Upserter(query, "sellers_delta")
        df_delta = (spark.readStream
                         .table(f"{self.catalog}.{self.db_bz_name}.sellers_bz")
                         .selectExpr("seller_id", "seller_city", "seller_state")
                         .dropDuplicates(["seller_id"])
                   )
        
        stream_writer = (df_delta.writeStream
                                 .foreachBatch(data_upserter.upsert)
                                 .outputMode("update")
                                 .option("checkpointLocation", f"{self.checkpoint_base}/sellers_sv")
                                 .queryName("sellers_upsert_stream")
                        )

        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "silver_p6")
        
        if once == True:
            return stream_writer.trigger(availableNow=True).start()
        else:
            return stream_writer.trigger(processingTime=processing_time).start()
        
    def upsert_date_lookup_sv(self, once=True, processing_time="15 seconds"):
        from pyspark.sql import functions as F

        query = f"""
            MERGE INTO {self.catalog}.{self.db_sv_name}.date_lookup_sv AS target
            USING date_lookup_delta AS source
            ON target.DateKey = source.DateKey
            WHEN NOT MATCHED THEN INSERT (
                DateKey, Date, DayOfWeek, DayName, DayOfMonth, DayOfYear, MonthName, MonthOfYear, Quarter, QuarterName, Year, IsWeekday
            ) VALUES (
                source.DateKey, source.Date, source.DayOfWeek, source.DayName,
                source.DayOfMonth, source.DayOfYear, source.MonthName, source.MonthOfYear, source.Quarter,
                source.QuarterName, source.Year, source.IsWeekday
            )
        """

        data_upserter = Upserter(query, "date_lookup_delta")

        df_delta = (
            spark.readStream
                .table(f"{self.catalog}.{self.db_bz_name}.date_lookup_bz")
                .selectExpr(
                    "date_key as DateKey",
                    "full_date as Date",
                    "day_of_week as DayOfWeek",
                    "day_name as DayName",
                    "day_num_in_month as DayOfMonth",
                    "day_num_overall as DayOfYear",
                    "month_name as MonthName",
                    "month as MonthOfYear",
                    "quarter as Quarter",
                    """CASE 
                        WHEN month >= 1 AND month <= 3 THEN 'First'
                        WHEN month >= 4 AND month <= 6 THEN 'Second'
                        WHEN month >= 7 AND month <= 9 THEN 'Third'
                        WHEN month >= 10 AND month <= 12 THEN 'Fourth'
                    END AS QuarterName""",
                    "year as Year",
                    "weekday_flag as IsWeekday"
                )
                .dropDuplicates(["DateKey"])
        )

        stream_writer = (
            df_delta.writeStream
                .foreachBatch(data_upserter.upsert)
                .outputMode("update")
                .option("checkpointLocation", f"{self.checkpoint_base}/date_lookup_sv")
                .queryName("date_lookup_upsert_stream")
        )

        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "silver_p7")

        if once:
            return stream_writer.trigger(availableNow=True).start()
        else:
            return stream_writer.trigger(processingTime=processing_time).start()

    def insert_special_date_lookup_sv(self):
        from delta.tables import DeltaTable

        table_name = f"{self.catalog}.{self.db_sv_name}.date_lookup_sv"
        delta_table = DeltaTable.forName(spark, table_name)

        df = delta_table.toDF().filter("DateKey = -1")
        if df.count() == 0:
            spark.sql(f"""
                INSERT INTO {table_name}
                VALUES (-1, NULL, 0, '', 0, 0, '', 0, 0, '', 0, '0')
            """)
            print("Đã điền DateKey = -1 record.")
        else:
            print("DateKey = -1 record dã tồn tại.")

    def upsert_products_sv(self, once=True, processing_time="15 seconds"):
        from pyspark.sql import functions as F
        query = f"""
            MERGE INTO {self.catalog}.{self.db_sv_name}.products_sv AS target
            USING products_delta AS source
            ON target.product_id = source.product_id
            WHEN MATCHED THEN UPDATE SET
                target.product_category_name = source.product_category_name_english,
                target.product_name_lenght = source.product_name_lenght,
                target.product_description_lenght = source.product_description_lenght,
                target.product_photos_qty = source.product_photos_qty,
                target.product_weight_g = source.product_weight_g,
                target.product_length_cm = source.product_length_cm,
                target.product_height_cm = source.product_height_cm,
                target.product_width_cm = source.product_width_cm
            WHEN NOT MATCHED THEN INSERT (
                product_id,
                product_category_name,
                product_name_lenght,
                product_description_lenght,
                product_photos_qty,
                product_weight_g,
                product_length_cm,
                product_height_cm,
                product_width_cm
            ) VALUES (
                source.product_id,
                source.product_category_name_english,
                source.product_name_lenght,
                source.product_description_lenght,
                source.product_photos_qty,
                source.product_weight_g,
                source.product_length_cm,
                source.product_height_cm,
                source.product_width_cm
            )
        """

        data_upserter = Upserter(query, "products_delta")

        df_products = (
            spark.readStream
                .table(f"{self.catalog}.{self.db_bz_name}.products_bz")
                .filter(F.col("product_category_name").isNotNull())
        )

        df_category = spark.table(f"{self.catalog}.{self.db_bz_name}.category_translation_bz")

        df_delta = (
            df_products.alias("p")
                .join(
                    df_category.alias("c"),
                    F.col("p.product_category_name") == F.col("c.product_category_name"),
                    how="inner"
                )
                .select(
                    F.col("p.product_id"),
                    F.col("c.product_category_name_english"),
                    F.col("p.product_name_lenght"),
                    F.col("p.product_description_lenght"),
                    F.col("p.product_photos_qty"),
                    F.col("p.product_weight_g"),
                    F.col("p.product_length_cm"),
                    F.col("p.product_height_cm"),
                    F.col("p.product_width_cm")
                )
                .dropDuplicates(["product_id"])
            )

        stream_writer = (
            df_delta.writeStream
                .foreachBatch(data_upserter.upsert)
                .outputMode("update")
                .option("checkpointLocation", f"{self.checkpoint_base}/products_sv")
                .queryName("products_upsert_stream")
        )

        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "silver_p8")

        if once:
            return stream_writer.trigger(availableNow=True).start()
        else:
            return stream_writer.trigger(processingTime=processing_time).start()

    def upsert_sale_sv(self, once=True, processing_time="15 seconds"):
        from pyspark.sql import functions as F

        query = f"""
            MERGE INTO {self.catalog}.{self.db_sv_name}.sale_sv target
            USING temp_sale_delta source
            ON target.order_id = source.order_id 
            AND target.order_item_id = source.order_item_id
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
        """

        data_upserter = Upserter(query, "temp_sale_delta")

        df_order_items = spark.readStream.table(f"{self.catalog}.{self.db_bz_name}.order_items_bz")
        df_orders = spark.readStream.table(f"{self.catalog}.{self.db_bz_name}.orders_bz")
        df_customers = spark.readStream.table(f"{self.catalog}.{self.db_bz_name}.customer_bz")
        df_products = spark.readStream.table(f"{self.catalog}.{self.db_bz_name}.products_bz")
        df_sellers = spark.readStream.table(f"{self.catalog}.{self.db_bz_name}.sellers_bz")

        df_delta = (
            df_order_items.alias("oi")
            .join(df_orders.alias("o"), F.col("oi.order_id") == F.col("o.order_id"))
            .join(df_sellers.alias("s"), F.col("oi.seller_id") == F.col("s.seller_id"))
            .join(df_products.alias("p"), F.col("oi.product_id") == F.col("p.product_id"))
            .join(df_customers.alias("c"), F.col("o.customer_id") == F.col("c.customer_id"))
            .select(
                "oi.order_id",
                "c.customer_id",
                "p.product_id",
                "s.seller_id",
                "oi.order_item_id",
                "oi.price",
                "oi.freight_value",
                "o.order_purchase_timestamp",
                "o.order_delivered_customer_date",
                "o.order_estimated_delivery_date"
            )
        )

        stream_writer = (
            df_delta.writeStream
                .foreachBatch(data_upserter.upsert)
                .outputMode("append")
                .option("checkpointLocation", f"{self.checkpoint_base}/sale_sv")
                .queryName("sale_sv_upsert_stream")
        )

        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "silver_p9")

        if once:
            return stream_writer.trigger(availableNow=True).start()
        else:
            return stream_writer.trigger(processingTime=processing_time).start()
    
    def upsert_order_detail_sv(self, once=True, processing_time="15 seconds"):
        from pyspark.sql import functions as F
        query = f"""
            MERGE INTO {self.catalog}.{self.db_sv_name}.order_detail_sv target
            USING temp_order_detail_delta source
            ON target.order_id = source.order_id
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
        """

        data_upserter = Upserter(query, "temp_order_detail_delta")

        df_order_items = spark.readStream.table(f"{self.catalog}.{self.db_bz_name}.order_items_bz")
        df_orders = spark.readStream.table(f"{self.catalog}.{self.db_bz_name}.orders_bz")

        df_delta = (
            df_order_items.alias("oi")
            .join(df_orders.alias("o"), F.col("oi.order_id") == F.col("o.order_id"))
            .select(
                "oi.order_id",  
                "o.customer_id",
                "o.order_purchase_timestamp",
                "o.order_delivered_customer_date",
                "o.order_estimated_delivery_date",
                "oi.price",
                "oi.freight_value",
                "o.order_approved_at"
            )
        )

        stream_writer = (
            df_delta.writeStream
                .foreachBatch(data_upserter.upsert)
                .outputMode("append") 
                .option("checkpointLocation", f"{self.checkpoint_base}/order_detail_sv")
                .queryName("order_detail_sv_upsert_stream")
        )

        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "silver_p10")

        if once:
            return stream_writer.trigger(availableNow=True).start()
        else:
            return stream_writer.trigger(processingTime=processing_time).start()


    def load(self, once=True, processing_time="15 seconds"):
        print(f"\nTiến hành đưa dữ liệu vào Silver Layer...")
        self.upsert_customer_sv(once, processing_time) 
        self.upsert_orders_sv(once, processing_time)
        self.upsert_order_payment_sv(once, processing_time) 
        self.upsert_order_reviews_sv(once, processing_time) 
        self.upsert_products_sv(once, processing_time)
        self.upsert_order_items_sv(once, processing_time) 
        self.insert_special_date_lookup_sv()
        self.upsert_date_lookup_sv(once, processing_time)
        self.upsert_sellers_sv(once, processing_time)
        self.delete_null_category_items()
        self.upsert_sale_sv(once, processing_time)
        self.upsert_order_detail_sv(once, processing_time)
        if once:
            for stream in spark.streams.active:
                stream.awaitTermination()

        print(f"Hoàn thành đưa dữ liệu vào Silver Layer")

In [0]:
SV=Silver("dev")
SV.load(once=True) 