In [0]:
def mount_adls(storage_name, container_name):
    # get secrets from key vault
    clinet_id = dbutils.secrets.get(scope="databricks-scope",key="zecdatastorage-client-id")
    tenant_id = dbutils.secrets.get(scope="databricks-scope",key="zecdatastorage-tenant-id")
    secreat_id =  dbutils.secrets.get(scope="databricks-scope",key="zecdatastorage-secret-key")

    # set spark configurations
    configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": clinet_id,
          "fs.azure.account.oauth2.client.secret": secreat_id,
          "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"}
    
    # unmount the mount points if is there any exists
    if any(mount.mountPoint == f"/mnt/{storage_name}/{container_name}" for mount in dbutils.fs.mounts()):
        dbutils.fs.unmount(f"/mnt/{storage_name}/{container_name}")
    
    # Manage storage account container
    dbutils.fs.mount(
        source = f"abfss://{container_name}@{storage_name}.dfs.core.windows.net/",
        mount_point = f"/mnt/{storage_name}/{container_name}",
        extra_configs = configs)
    
    display(dbutils.fs.mounts())
    

In [0]:
mount_adls('zecdatastorage', 'raw')

/mnt/zecdatastorage/raw has been unmounted.


mountPoint,source,encryptionType
/mnt/zecdatastorage/demo,abfss://demo@zecdatastorage.dfs.core.windows.net/,
/databricks-datasets,databricks-datasets,
/mnt/zecdatastorage/processed,abfss://processed@zecdatastorage.dfs.core.windows.net/,
/Volumes,UnityCatalogVolumes,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/databricks-results,databricks-results,
/mnt/zecdatastorage/presentation,abfss://presentation@zecdatastorage.dfs.core.windows.net/,
/databricks/mlflow-registry,databricks/mlflow-registry,
/mnt/zecdatastorage/raw,abfss://raw@zecdatastorage.dfs.core.windows.net/,
/Volume,DbfsReserved,


In [0]:
mount_adls('zecdatastorage', 'processed')

/mnt/zecdatastorage/processed has been unmounted.


mountPoint,source,encryptionType
/mnt/zecdatastorage/demo,abfss://demo@zecdatastorage.dfs.core.windows.net/,
/databricks-datasets,databricks-datasets,
/mnt/zecdatastorage/processed,abfss://processed@zecdatastorage.dfs.core.windows.net/,
/Volumes,UnityCatalogVolumes,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/databricks-results,databricks-results,
/mnt/zecdatastorage/presentation,abfss://presentation@zecdatastorage.dfs.core.windows.net/,
/databricks/mlflow-registry,databricks/mlflow-registry,
/mnt/zecdatastorage/raw,abfss://raw@zecdatastorage.dfs.core.windows.net/,
/Volume,DbfsReserved,


In [0]:
mount_adls('zecdatastorage', 'presentation')

/mnt/zecdatastorage/presentation has been unmounted.


mountPoint,source,encryptionType
/mnt/zecdatastorage/demo,abfss://demo@zecdatastorage.dfs.core.windows.net/,
/databricks-datasets,databricks-datasets,
/mnt/zecdatastorage/processed,abfss://processed@zecdatastorage.dfs.core.windows.net/,
/Volumes,UnityCatalogVolumes,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/databricks-results,databricks-results,
/mnt/zecdatastorage/presentation,abfss://presentation@zecdatastorage.dfs.core.windows.net/,
/databricks/mlflow-registry,databricks/mlflow-registry,
/mnt/zecdatastorage/raw,abfss://raw@zecdatastorage.dfs.core.windows.net/,
/Volume,DbfsReserved,


In [0]:
df = spark.read.format("csv").option("header",True).option("inferSchema",True).load("/mnt/zecdatastorage/raw/employees.csv")

In [0]:
display(df.show())

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|            - |       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|            - |       100|           20|
|        202|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP|  6000|            - |       201|           20|


In [0]:
df1 = df.select("EMPLOYEE_ID","FIRST_NAME","EMAIL","SALARY")

In [0]:
df1.write.mode("overwrite").format("parquet").save("/mnt/zecdatastorage/processed")