In [0]:
spark

In [0]:
# Read file from Azure Data Lake Storage Gen2

configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "['client_id']",
"fs.azure.account.oauth2.client.secret": '[secret_value]',
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/[directory]/oauth2/token"}


if any(mount.mountPoint == "/mnt/cask-mount" for mount in dbutils.fs.mounts()):
    print("Mount point already exists. Skipping mount.")
else:
    dbutils.fs.mount(
        source="abfss://cask-container@caskng.dfs.core.windows.net",
        mount_point="/mnt/cask-mount",
        extra_configs=configs
    )


Mount point already exists. Skipping mount.


In [0]:
# Read the file from the mounted location
orders_df = spark.read.format("csv") \
                .option("header", "True") \
                .option("inferSchema", "True") \
                .load("/mnt/cask-mount/raw-data/order_sink.csv")

orders_df.show(10)




+-------+----------+--------+--------------------+----------+--------------------+-----------+---------+-----------+---------+-----------+--------------------+--------+-------+-------------------+
|orderId| orderDate|branchId|          branchName|CustomerID|        customerName|totalAmount|   status|statusValue|productId|productCode|         productName|isMaster|  price|               note|
+-------+----------+--------+--------------------+----------+--------------------+-----------+---------+-----------+---------+-----------+--------------------+--------+-------+-------------------+
|OD00001|2024-10-12|       3|Chi nhánh Hồ Chí ...|   KH00988|    Huỳnh Thị Thu Hà|         67|  Pending|          0|     4371|      SP003|Điện thoại thông ...|    true|2831662|   Mua kèm giảm giá|
|OD00002|2024-05-07|       3|   Chi nhánh Đà Nẵng|   KH00707|        Vũ Minh Quân|         25|  Pending|          2|     6136|      SP001|Điện thoại thông ...|    true|2613885|Khuyến mãi đặc biệt|
|OD00003|2024-1

In [0]:
# Read the file from the mounted location

customerdb_df = spark.read.format("csv") \
                .option("header", "True") \
                .option("inferSchema", "True") \
                .load("/mnt/cask-mount/raw-data/customerdb_sink.csv")

customerdb_df.show(10)

+----------+--------------------+------+----------+-------------+-----------+--------------------+--------------------+-------+-------+----------+------------+-----------+
|CUSTOMERID|        CUSTOMERNAME|GENDER| BIRTHDATE|CONTACTNUMBER|     REGION|               EMAIL|        ORGANIZATION|TAXCODE|   DEBT|RETAILERID|MODIFIEDDATE|CREATEDDATE|
+----------+--------------------+------+----------+-------------+-----------+--------------------+--------------------+-------+-------+----------+------------+-----------+
|   KH00001|        Đo Hoang Anh| false|1970-01-01|  84559607840|  Hai Phong|đohoanganh@exampl...|Doanh nghiep Hoa ...| VN9293|6253.49|        20|  2024-09-06| 2023-01-13|
|   KH00002|        Đo Hoang Anh| false|1970-01-01|  84339104876|  Hai Phong|đohoanganh@exampl...|Doanh nghiep Hoa ...| VN4635| 526.94|        71|  2024-01-04| 2023-05-11|
|   KH00003|         Bui Thi Lan| false|1974-04-02|  84173415628|  Hai Phong|buithilan@example...|Cong ty Co phan T...| VN8622|1459.81|     

In [0]:
# Deduplicate the data
from pyspark.sql.functions import to_date, col

order_date = orders_df.withColumn("orderdate", to_date(col("orderdate"), "yyyy-MM-dd"))
order_deduplicate = order_date.dropDuplicates()

customerdb_deduplicate = customerdb_df.dropDuplicates()


In [0]:
# Join the data
df_result = order_deduplicate.alias('o').join(
    customerdb_deduplicate.alias('c'),
    on='CUSTOMERID',
    how='left'
)
df_result = df_result.select('customerid','c.customername','c.region','o.orderid','o.OrderDate','o.totalamount')

df_result.show(10)

+----------+----------------+-----------+-------+----------+-----------+
|customerid|    customername|     region|orderid| OrderDate|totalamount|
+----------+----------------+-----------+-------+----------+-----------+
|   KH00569|   Nguyen Van An|Ho Chi Minh|OD02264|2024-01-14|         57|
|   KH00485|  Hoang Van Tien|     Ha Noi|OD01352|2024-09-12|         19|
|   KH00705|   Nguyen Van An|Ho Chi Minh|OD00431|2024-03-19|         72|
|   KH00264|     Bui Thi Lan|    Can Tho|OD00234|2024-09-28|         62|
|   KH00008|Huynh Thi Thu Ha|    Can Tho|OD00295|2024-04-15|         49|
|   KH00354|    Cao Minh Tam|  Hai Phong|OD00855|2024-12-11|         70|
|   KH00602|   Le Minh Hoang|  Hai Phong|OD00920|2024-09-28|         48|
|   KH00109|    Đo Hoang Anh|    Can Tho|OD01574|2024-10-26|         55|
|   KH00655|  Ly Thi Kim Chi|    Can Tho|OD00815|2024-10-03|         40|
|   KH00105|    Phan Thi Mai|     Ha Noi|OD00874|2024-11-03|         39|
+----------+----------------+-----------+-------+--

In [0]:
# Write the data to Delta Lake
df_result.write \
    .format("delta") \
    .mode("overwrite") \
    .option("header", "true") \
    .save("/mnt/cask-mount/integrated-data/IntegratedOrders")
