In [0]:
%run ./01-config

In [0]:
class Bronze():
    def __init__(self, env):
        Conf = Config()
        self.landing_zone = Conf.base_data_path
        self.catalog = env
        self.db_name = Conf.db_name
        self.initialized = False

    def create_db(self):
        print(f"Creating the database {self.catalog}.{self.db_name}...", end='')
        spark.sql(f"CREATE DATABASE IF NOT EXISTS {self.catalog}.{self.db_name}")
        spark.sql(f"USE {self.catalog}.{self.db_name}")
        self.initialized = True
        print("Done")   

# --------------------------------INSERT DATA--------------------------------
    def insert_order_table(self, csv_file_path):
        if self.initialized:
            print(f"Tiến hành chèn dữ liệu từ {csv_file_path} vào bảng order...", end='')
            df = spark.read.option("header", "true").csv(csv_file_path)
            df = df.select(
                df.order_id.cast("string"),
                df.customer_id.cast("string"),
                df.order_status.cast("string"),
                df.order_purchase_timestamp.cast("timestamp"),
                df.order_approved_at.cast("timestamp"),
                df.order_delivered_carrier_date.cast("timestamp"),
                df.order_delivered_customer_date.cast("timestamp"),
                df.order_estimated_delivery_date.cast("timestamp")
            )
            df.write.mode("append").insertInto(f"{self.catalog}.{self.db_name}.orders_bz")
            print("Hoàn thành!")
        else:
            raise ReferenceError("Không thể xác định được Application database. Không thể tạo bảng trong database}")

    def insert_order_item_table(self, csv_file_path):
            if self.initialized:
                print(f"Tiến hành chèn dữ liệu từ {csv_file_path} vào bảng order_item...", end='')
                df = spark.read.option("header", "true").csv(csv_file_path)
                df = df.select(
                    df.order_id.cast("string"),
                    df.order_item_id.cast("int"),
                    df.product_id.cast("string"),
                    df.seller_id.cast("string"),
                    df.shipping_limit_date.cast("timestamp"),
                    df.price.cast("double"),
                    df.freight_value.cast("double")
                )
                df.write.mode("append").insertInto(f"{self.catalog}.{self.db_name}.order_items_bz")
                print("Hoàn thành!")
            else:
                raise ReferenceError("Không thể xác định được Application database. Không thể tạo bảng trong database}")
    
    def load_customers(self):
            schema = """
                customer_id string,
                customer_unique_id string,
                customer_city string,
                customer_state string
            """

            df = (spark.read
                    .format("csv")
                    .schema(schema)
                    .option("header", "true")
                    .load(self.landing_zone + "/customers.csv"))

            full_table_name = f"{self.catalog}.{self.db_name}.customer_bz"

            df.write \
                .format("delta") \
                .mode("overwrite") \
                .option("overwriteSchema", "true") \
                .saveAsTable(full_table_name)

            print("Batch load to customer completed.")
    
    def insert_product_category_name_translation(self, csv_file_path):
        if self.initialized:
            print(f"Tiến hành chèn dữ liệu từ {csv_file_path} vào bảng product_category_name_translation...", end='')
            df = spark.read.option("header", "true").csv(csv_file_path)
            df = df.select(
                df.product_category_name.cast("string"),
                df.product_category_name_english.cast("string")
            )
            df.write.mode("append").insertInto(f"{self.catalog}.{self.db_name}.category_translation_bz")
            print("Hoàn thành!")
        else:
            raise ReferenceError("Không thể xác định được Application database. Không thể tạo bảng trong database}")

    def load_sellers(self):
        schema = """
            seller_id string,
            seller_zip_code_prefix int,
            seller_city string,
            seller_state string
        """

        df = (spark.read
                .format("csv")
                .schema(schema)
                .option("header", "true")
                .load(self.landing_zone + "/sellers.csv"))

        full_table_name = f"{self.catalog}.{self.db_name}.sellers_bz"

        df.write \
            .format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(full_table_name)

        print("Batch load to seller completed.")

    def load_order_payments(self):
        schema = """
            order_id string,
            payment_sequential tinyint,
            payment_type string,
            payment_installments tinyint,
            payment_value float
        """

        df = (spark.read
                .format("csv")
                .schema(schema)
                .option("header", "true")
                .load(self.landing_zone + "/order_payments.csv"))

        full_table_name = f"{self.catalog}.{self.db_name}.order_payment_bz"

        df.write \
            .format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(full_table_name)

        print("Batch load to order_payments completed.")

    def load_order_reviews(self):
        schema = """
            review_id string,
            order_id string,
            review_score tinyint,
            review_comment_title string,
            review_comment_message string,
            review_creation_date timestamp,
            review_answer_timestamp timestamp
        """

        df = (spark.read
                .format("csv")
                .schema(schema)
                .option("header", "true")
                .option("dateFormat", "yyyy-MM-dd")
                .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
                .load(self.landing_zone + "/order_reviews.csv"))

        full_table_name = f"{self.catalog}.{self.db_name}.order_reviews_bz"

        df.write \
            .format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(full_table_name)

        print("Batch load to reviews completed.")

    def insert_product_data(self, csv_file_path):
        if self.initialized:
            print(f"Tiến hành chèn dữ liệu từ {csv_file_path} vào bảng product...", end='')
            df = spark.read.option("header", "true").csv(csv_file_path)
            df = df.select(
                df.product_id.cast("string"),
                df.product_category_name.cast("string"),
                df.product_name_length.cast("double").cast("int").alias("product_name_length"),
                df.product_description_length.cast("double").cast("int").alias("product_description_length"),
                df.product_photos_qty.cast("double").cast("int").alias("product_photos_qty"),
                df.product_weight_g.cast("double"),
                df.product_length_cm.cast("double"),
                df.product_height_cm.cast("double"),
                df.product_width_cm.cast("double")
            )
            df.write.mode("append").insertInto(f"{self.catalog}.{self.db_name}.products_bz")
            print("Hoàn thành!")
        else:
            raise ReferenceError("Không thể xác định được Application database. Không thể chèn dữ liệu.")
    

    def setup(self):
        self.create_db()
        self.insert_order_table("abfss://project-unmanage@gr8sta.dfs.core.windows.net/data_zone/orders.csv")
        self.insert_order_item_table("abfss://project-unmanage@gr8sta.dfs.core.windows.net/data_zone/order_items.csv")
        self.insert_product_category_name_translation("abfss://project-unmanage@gr8sta.dfs.core.windows.net/data_zone/product_category_name_translation.csv")
        self.insert_product_data("abfss://project-unmanage@gr8sta.dfs.core.windows.net/data_zone/products.csv")
        self.load_customers()
        self.load_sellers()
        self.load_order_payments()
        self.load_order_reviews()

setup = Bronze(env="dev")
setup.setup()