In [0]:
client_id = dbutils.secrets.get(scope = "ecom-secret-scope", key = "clientid")
tenant_id = dbutils.secrets.get(scope = "ecom-secret-scope", key = "tenantid")
secret_value = dbutils.secrets.get(scope = "ecom-secret-scope", key = "secretvalue")

In [0]:
if not any(mount.mountPoint == '/mnt/ecomdata' for mount in dbutils.fs.mounts()):
    configs = {
        "fs.azure.account.auth.type": "OAuth",
        "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
        "fs.azure.account.oauth2.client.id": client_id,
        "fs.azure.account.oauth2.client.secret": secret_value,
        "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
    }

    dbutils.fs.mount(
        source="abfss://ecomstoragecontainer@ecomstorageidenitifier.dfs.core.windows.net",
        mount_point="/mnt/ecomdata",
        extra_configs=configs
    )

In [0]:
%fs ls '/mnt/ecomdata/'

path,name,size,modificationTime
dbfs:/mnt/ecomdata/processed_data/,processed_data/,0,1717779001000
dbfs:/mnt/ecomdata/raw_data/,raw_data/,0,1717778989000


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder.appName("EcomDataPipeline").getOrCreate()

In [0]:
spark

In [0]:
# Read parquet file from /mnt/ecomdata1/user-raw-2 folder
usersDF = spark.read.format("parquet")\
    .option("header",'true')\
    .option("inferSchema",'true')\
    .load("/mnt/ecomdata/processed_data/to_processed/users_data")

In [0]:
usersDF.show(5)

+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----------------+-----------+
|      identifierHash|type|   country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasAnyApp|hasAndroidApp|hasIosApp|hasProfilePicture|daysSinceLastLogin|seniority|seniorityAsMonths|seniorityAsYears|countryCode|
+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+-

In [0]:
usersDF.write.format("delta")\
    .mode("overwrite")\
    .save("/mnt/delta/tables/bronze/users")

In [0]:
buyersDF = spark.read.format("parquet")\
    .option("header",'true')\
    .option("inferSchema",'true')\
    .load("/mnt/ecomdata/processed_data/to_processed/buyers_data")

+-----------+------+---------+-------------+------------+----------+---------------+-------------+-----------------+--------------------+----------------------+------------------+-------------------------+---------------------+-------------------+-------------------+------------------+----------------------+----------------------+---------------------+------------------+------------------+-----------------+---------------------+---------------------+--------------------+---------------+------------------+-------------+-------------+----------------+----------------+
|    country|buyers|topbuyers|topbuyerratio|femalebuyers|malebuyers|topfemalebuyers|topmalebuyers|femalebuyersratio|topfemalebuyersratio|boughtperwishlistratio|boughtperlikeratio|topboughtperwishlistratio|topboughtperlikeratio|totalproductsbought|totalproductswished|totalproductsliked|toptotalproductsbought|toptotalproductswished|toptotalproductsliked|meanproductsbought|meanproductswished|meanproductsliked|topmeanproductsbo

In [0]:
buyersDF.write.format("delta")\
    .mode("overwrite")\
    .save("/mnt/delta/tables/bronze/buyers")

In [0]:
sellersDF = spark.read.format("parquet")\
    .option("header",'true')\
    .option("inferSchema",'true')\
    .load("/mnt/ecomdata/processed_data/to_processed/sellers_data")

sellersDF.write.format("delta")\
    .mode("overwrite")\
    .save("/mnt/delta/tables/bronze/sellers")

In [0]:
countriesDF = spark.read.format("parquet")\
    .option("header",'true')\
    .option("inferSchema",'true')\
    .load("/mnt/ecomdata/processed_data/to_processed/countries_data")

countriesDF.write.format("delta")\
    .mode("overwrite")\
    .save("/mnt/delta/tables/bronze/countries")