In [0]:
configs = {
  "fs.azure.account.auth.type": "CustomAccessToken",
  "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}

# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
  source = "abfss://bronze@adlsgen2nwmc.dfs.core.windows.net/",
  mount_point = "/mnt/bronze",
  extra_configs = configs)

True

In [0]:
dbutils.fs.ls("/mnt/bronze/")

[FileInfo(path='dbfs:/mnt/bronze/Dimension/', name='Dimension/', size=0, modificationTime=1736646936000),
 FileInfo(path='dbfs:/mnt/bronze/Fact/', name='Fact/', size=0, modificationTime=1736646940000)]

In [0]:
# Unmount the existing mount point if it exists
dbutils.fs.unmount("/mnt/argent")

configs = {
  "fs.azure.account.auth.type": "CustomAccessToken",
  "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}

# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
  source = "abfss://argent@adlsgen2nwmc.dfs.core.windows.net/",
  mount_point = "/mnt/argent",
  extra_configs = configs)

/mnt/argent has been unmounted.


True

In [0]:
dbutils.fs.unmount("/mnt/oor")
configs = {
  "fs.azure.account.auth.type": "CustomAccessToken",
  "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}

# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
  source = "abfss://oor@adlsgen2nwmc.dfs.core.windows.net/",
  mount_point = "/mnt/oor",
  extra_configs = configs)

/mnt/oor has been unmounted.


True

In [0]:
dbutils.fs.ls("/mnt/argent")
dbutils.fs.ls("/mnt/oor")

[]

In [0]:
input_path='/mnt/bronze/Dimension/Date/Date.parquet'

In [0]:
df = spark.read.parquet(input_path)
display(df)

Date,Day_Number,Day,Month,Short_Month,Calendar_Month_Number,Calendar_Month_Label,Calendar_Year,Calendar_Year_Label,Fiscal_Month_Number,Fiscal_Month_Label,Fiscal_Year,Fiscal_Year_Label,ISO_Week_Number
2013-01-01,1,1,January,Jan,1,CY2013-Jan,2013,CY2013,3,FY2013-Jan,2013,FY2013,1
2013-01-02,2,2,January,Jan,1,CY2013-Jan,2013,CY2013,3,FY2013-Jan,2013,FY2013,1
2013-01-03,3,3,January,Jan,1,CY2013-Jan,2013,CY2013,3,FY2013-Jan,2013,FY2013,1
2013-01-04,4,4,January,Jan,1,CY2013-Jan,2013,CY2013,3,FY2013-Jan,2013,FY2013,1
2013-01-05,5,5,January,Jan,1,CY2013-Jan,2013,CY2013,3,FY2013-Jan,2013,FY2013,1
2013-01-06,6,6,January,Jan,1,CY2013-Jan,2013,CY2013,3,FY2013-Jan,2013,FY2013,1
2013-01-07,7,7,January,Jan,1,CY2013-Jan,2013,CY2013,3,FY2013-Jan,2013,FY2013,2
2013-01-08,8,8,January,Jan,1,CY2013-Jan,2013,CY2013,3,FY2013-Jan,2013,FY2013,2
2013-01-09,9,9,January,Jan,1,CY2013-Jan,2013,CY2013,3,FY2013-Jan,2013,FY2013,2
2013-01-10,10,10,January,Jan,1,CY2013-Jan,2013,CY2013,3,FY2013-Jan,2013,FY2013,2


In [0]:
df.toPandas().info()  

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1461 entries, 0 to 1460
Data columns (total 14 columns):
 #   Column                 Non-Null Count  Dtype 
---  ------                 --------------  ----- 
 0   Date                   1461 non-null   object
 1   Day_Number             1461 non-null   int32 
 2   Day                    1461 non-null   object
 3   Month                  1461 non-null   object
 4   Short_Month            1461 non-null   object
 5   Calendar_Month_Number  1461 non-null   int32 
 6   Calendar_Month_Label   1461 non-null   object
 7   Calendar_Year          1461 non-null   int32 
 8   Calendar_Year_Label    1461 non-null   object
 9   Fiscal_Month_Number    1461 non-null   int32 
 10  Fiscal_Month_Label     1461 non-null   object
 11  Fiscal_Year            1461 non-null   int32 
 12  Fiscal_Year_Label      1461 non-null   object
 13  ISO_Week_Number        1461 non-null   int32 
dtypes: int32(6), object(8)
memory usage: 125.7+ KB


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
table=[]

for i in dbutils.fs.ls("/mnt/bronze/Dimension"):
    table.append(i.name.split("/")[0])

table

['City',
 'Customer',
 'Date',
 'Employee',
 'PaymentMethod',
 'StockItem',
 'Supplier',
 'TransactionType']

In [0]:
for i in table:
    inputPath=f"/mnt/bronze/Dimension/{i}/{i}.parquet"
    df=spark.read.parquet(inputPath)
    columns=df.columns
    for col in columns:
        if "Date" in col or "date" in col or col in ["Valid_From","Valid_To"]:
            df=df.withColumn(col,df[col].cast(DateType()))

    outputPath=f"/mnt/argent/Dimension/{i}/"
    df.write.format("delta").mode("overwrite").save(outputPath)

In [0]:
display(df)

Transaction_Type_Key,WWI_Transaction_Type_ID,Transaction_Type,Valid_From,Valid_To,Lineage_Key
0,0,Unknown,2013-01-01,9999-12-31,0
1,1,Customer Invoice,2013-01-01,9999-12-31,7
2,2,Customer Credit Note,2013-01-01,9999-12-31,7
3,3,Customer Payment Received,2013-01-01,9999-12-31,7
4,4,Customer Refund,2013-01-01,9999-12-31,7
5,5,Supplier Invoice,2013-01-01,9999-12-31,7
6,6,Supplier Credit Note,2013-01-01,9999-12-31,7
7,7,Supplier Payment Issued,2013-01-01,9999-12-31,7
8,8,Supplier Refund,2013-01-01,9999-12-31,7
9,9,Stock Transfer,2013-01-01,9999-12-31,7
