
#L6

#dbutils commands

In [0]:
dbutils.help()

##dbutils fs commands (file system)

In [0]:
dbutils.fs.help()

In [0]:
# Lists the contents within the path provided
dbutils.fs.ls("/FileStore/tables/")

Out[11]: [FileInfo(path='dbfs:/FileStore/tables/temp/', name='temp/', size=0, modificationTime=0)]

In [0]:
# Create a file at the path and enter content into it
dbutils.fs.put("/FileStore/tables/cpdemo.txt", "Welcome to Cloudpandith online training", True)

##dbutils.fs.ls("/FileStore/tables/")

Wrote 39 bytes.
Out[20]: True

In [0]:
# Remove this cell

dbutils.fs.ls("/FileStore/tables/")

Out[35]: [FileInfo(path='dbfs:/FileStore/tables/temp/', name='temp/', size=0, modificationTime=0)]

In [0]:
# View the content in the file
dbutils.fs.head("/FileStore/tables/cpdemo.txt")

Out[21]: 'Welcome to Cloudpandith online training'

In [0]:
# Create a folder at the path
dbutils.fs.mkdirs("/FileStore/tables/input")
dbutils.fs.mkdirs("/FileStore/tables/output")

Out[23]: True

In [0]:
# Copy a file from source to destination path
dbutils.fs.cp("/FileStore/tables/cpdemo.txt", "/FileStore/tables/input")

Out[27]: True

In [0]:
# Move the file from source to destination path
dbutils.fs.mv("/FileStore/tables/input/cpdemo.txt", "/FileStore/tables/output")

Out[28]: True

In [0]:
# To remove a file or folder
dbutils.fs.rm("/FileStore/tables/cpdemo.txt")

# Setting recursively = True, it will delete the files and the folder as well
dbutils.fs.rm("/FileStore/tables/output", True)

Out[34]: True

In [0]:
# Displays all the mount points availiable
dbutils.fs.mounts()

Out[1]: [MountInfo(mountPoint='/mnt/dataquaility-data', source='wasbs://dataquaility-data@dbprojectblob0809.blob.core.windows.net', encryptionType=''),
 MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType='sse-s3'),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType='sse-s3'),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType='sse-s3'),
 MountInfo(mountPoint='/mnt/global', source='wasbs://global@blob0411new.blob.core.windows.net', encryptionType=''),
 MountInfo(mountPoint='/mnt/input', source='wasbs://input@adlsgen201102023.blob.core.windows.net', encryptionType=''),
 MountInfo(mountPoint='/', source='DatabricksRoot', encryptionType='sse-s3')]

In [0]:
# To remove a mount point connection
dbutils.fs.unmount('/mnt/global')

/mnt/global has been unmounted.
Out[2]: True

##dbutils.secrets (credential security)

In [0]:
dbutils.secrets.help()

####### To establish connection between databricks and keyvault

* .net/#secrets/createScope

In [0]:
# To view all the connections to keyvault
dbutils.secrets.listScopes()

In [0]:
# will return the secrets names
dbutils.secrets.list("akvconnection")

In [0]:
# To retrive the contents stored in keyvault
dbutils.secrets.get("akvconnection", "dbpwd")

In [0]:
# Trick to see the contents in the keyvault
for i in dbutils.secrets.get("akvconnection", "dbpwd"):
    print(i)

#L7

##dbutils.widgets (Notebook parameters)

In [0]:
dbutils.widgets.help()

In [0]:
# Create a notebook parameter
dbutils.widgets.text("load_type", "adhoc-load")
dbutils.widgets.text("load_date", "2024/01/01")
dbutils.widgets.text("meta_loc", "/mnt/global/india/landing/cust")

In [0]:
# Gives the value within the notebook parameter
dbutils.widgets.get("load_type")

ltype = dbutils.widgets.get("load_type")
ldate = dbutils.widgets.get("load_date")
metaloc = dbutils.widgets.get("meta_loc")

In [0]:
print(ltype)
print(ldate)
print(metaloc)

incremental-load
2024/01/01
/mnt/global/india/landing/cust


In [0]:
# Remove a notebook parameter
dbutils.widgets.remove("meta_loc")

In [0]:
# Removes all the notebook parameters
dbutils.widgets.removeAll()

In [0]:
# Only a single value can be selected from the dropdown at a time. Default value must be included in choices
dbutils.widgets.dropdown('loaddates1', '2023/02/01',['2023/02/01', '2023/02/02','2023/02/03','2023/02/04','2023/02/05'])
ldates1 = dbutils.widgets.get('loaddates1')
print(ldates1)

2023/02/04


In [0]:
# Can select multiple values at a time
dbutils.widgets.multiselect('loaddates2', '2023/02/01',['2023/02/01', '2023/02/10', '2023/02/02','2023/02/03','2023/02/04','2023/02/05'])
ldates2 = dbutils.widgets.get('loaddates2')
print(ldates2)

2023/02/01,2023/02/02,2023/02/10


In [0]:
# Only a single value can be selected from the dropdown at a time. Default value must not be included in choices
dbutils.widgets.combobox('loaddates', '2023/02/01',['2023/02/02','2023/02/03','2023/02/04','2023/02/05'])
ldates = dbutils.widgets.get('loaddates')
print(ldates)

2023/02/05


#dbutils.notebook (Execute or terminate the another notebook from current notebook)

In [0]:
%run /databrickscomplete/Demo

Thank you for executing the notebook!


In [0]:
print(a)
print(b)

100
200


In [0]:
%run /databrickscomplete/Demo $pa=100 $pb=400

Thank you for executing the notebook!


In [0]:
print(a)
print(b)

100
400


In [0]:
dbutils.notebook.run("/databrickscomplete/Demo", 10000, {"pa":200, "pb":400})

In [0]:
if a-b!=0:
    dbutils.notebook.exit("Record count is not matching")

#L8


* <span style="background-color: #ff0000">The mount point is an workspace level connection.</span>

#Mount bolb storage using Access Key

In [0]:
# Creating mount point to blob storage container using Access key
dbutils.fs.mount(
  source = "wasbs://global@blobstorage01132023.blob.core.windows.net",
  mount_point = "/mnt/global",
  extra_configs = {"fs.azure.account.key.blobstorage01132023.blob.core.windows.net":"qMb4/lCDPXQwsFCTrFgZSYbDHn8u/XpRQGiVEVYf2kKwCuZBQX8V7aalEU0+iQCi6c75++yZZee9+ASt4S0FwQ=="}) # Blob storage Accesskey

Out[3]: True

In [0]:
# Read or create a dataframe with the data present in the container
src_path = "/mnt/global/"

df = spark.read.csv(src_path, header = True, sep = ",")
df.show()

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
|  Kishan| 24|  Male|
|   Kumar| 15|  Male|
|   Reddy| 45|  Male|
|Thamatam| 23|  Male|
| Venkata| 67|Female|
+--------+---+------+



In [0]:
# creating a new column and adding in the current date
from pyspark.sql.functions import current_date
df1 = df.withColumn("load_date",current_date())
df1.show()

+--------+---+------+----------+
|    Name|Age|Gender| load_date|
+--------+---+------+----------+
|  Kishan| 24|  Male|2024-01-13|
|   Kumar| 15|  Male|2024-01-13|
|   Reddy| 45|  Male|2024-01-13|
|Thamatam| 23|  Male|2024-01-13|
| Venkata| 67|Female|2024-01-13|
+--------+---+------+----------+



In [0]:
# Write to the blob storage container
tgt_path = "/mnt/global/output/"

df1.write.csv(tgt_path, header = True)

#L9

#Mount Adls Gen2 storage using Access Key


* In order to mount a adls Gen2 we need to make use of <span style="background-color: #ff0000">Service Principle identity</span> using the <span style="background-color: #ff0000">App Registration</span>

* Service Principle - When you join a new project the orgainaztion will generate a new userid and password to login to services.

* App registration - We create and manage this userid within App registraions in Azure Active Directory

* We need to establish connection between the app_registration_user and the adlsgen2 within the Identity Access Management(IAM) in adle Gen2 storage account

In [0]:
# Create a mount point to adls gen2 via app registration
configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": "ffb9a02a-90df-46b5-b169-04056b393c37", # App registration (Application ID)
       "fs.azure.account.oauth2.client.secret": "tOv8Q~KASxnez3T55ZVrIgv68P2tA6cwd9uwFbQ8", # App registration --> certificates & secrets --> New client secret --> Add (Value)
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/423a42da-ad3f-4e69-8aec-c86c0d0620b1/oauth2/token", # App registration --> Tenant ID
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

dbutils.fs.mount(
source = "abfss://global@adlsgen201142023.dfs.core.windows.net/",
mount_point = "/mnt/global",
extra_configs = configs)

Out[4]: True

In [0]:
# Reading the file from adls gen2 location as a dataframe
src_path = "/mnt/global/input/"

df = spark.read.csv(src_path, header = True)
df.show()

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
|  Kishan| 24|  Male|
|   Kumar| 15|  Male|
|   Reddy| 45|  Male|
|Thamatam| 23|  Male|
| Venkata| 67|Female|
+--------+---+------+



In [0]:
# Writing the file to the location as a csv
tgt_path = "mnt/global/output/"

df.write.mode("overwrite").csv(tgt_path, header = True) # overwrite the data
df.write.mode("append").csv(tgt_path, header = True) # append the data

In [0]:
# Reading the file from adls gen2 location as a dataframe
src_path = "/mnt/global/input/"

df1 = spark.read.format("csv").option("header",True).load(src_path)
df1.show()

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
|  Kishan| 24|  Male|
|   Kumar| 15|  Male|
|   Reddy| 45|  Male|
|Thamatam| 23|  Male|
| Venkata| 67|Female|
+--------+---+------+



In [0]:
# Writing the file to the location as a csv
tgt_path = "mnt/global/output/"

df1.write.mode("overwrite").format("csv").option("header", True).save(tgt_path)
df1.write.mode("append").format("csv").option("header", True).save(tgt_path)

#L10


* <span style="background-color: #ff0000">The direct connection is an spark session level connection.</span>

* <span style="background-color: #ff0000">New spark session is created for every notebook or if you detach and reattach the spark cluster.</span>

#Direct connection to Blob Storage (Access Key)

In [0]:
# Establishing the blob connection using access keys to the spark session
spark.conf.set(
    "fs.azure.account.key.blob01152023.blob.core.windows.net","qdX3lbktTG8F/INRH3didkwt2NSugn4RUyHd49hg3YYtQ6kL9dKvqRrPNxIlOaFd3TmNHbOW5fzk+AStbrG7Lw==") # Blob storage access key

In [0]:
# Reading the file from blob location as a dataframe
src_path = "wasbs://global@blob01152023.blob.core.windows.net/input/"

df = spark.read.format("csv").option("header", True).load(src_path)
df.show()

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
|  Kishan| 24|  Male|
|   Kumar| 15|  Male|
|   Reddy| 45|  Male|
|Thamatam| 23|  Male|
| Venkata| 67|Female|
+--------+---+------+



In [0]:
# Writing the file to the location as a csv
tgt_path = "wasbs://global@blob01152023.blob.core.windows.net/output/"

df.write.mode("overwrite").format("csv").option("header", True).save(tgt_path)

#Direct connection to adls Gen2 Access Key

In [0]:
# Establishing the adlsgen2 connection using access key to the spark session
spark.conf.set(
    "fs.azure.account.key.adlsgen201152023.dfs.core.windows.net","tPPTiw7KMy0X9FUxx2Kxzg7ZqQ4YHd4jeZGApe9HF0W9g73YToCDiYdoScKzbJBI9Dz0vZxyeUH0+AStrgKERg==") # Adlsgen2 access key

In [0]:
# Reading the file from adls gen2 location as a dataframe
src_path = "abfss://global@adlsgen201152023.dfs.core.windows.net/input/"

df = spark.read.format("csv").option("header", True).load(src_path)
df.show()

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
|  Kishan| 24|  Male|
|   Kumar| 15|  Male|
|   Reddy| 45|  Male|
|Thamatam| 23|  Male|
| Venkata| 67|Female|
+--------+---+------+



In [0]:
# Writing the file to the location as a csv
tgt_path = "abfss://global@adlsgen201152023.dfs.core.windows.net/output/"

df.write.mode("overwrite").format("csv").option("header", True).save(tgt_path)

# Direct Connection to adls Gen2 Service Principal

In [0]:
# Establishing the adlsgen2 connection using service principle to the spark session
spark.conf.set("fs.azure.account.auth.type.adlsgen201152023.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.adlsgen201152023.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.adlsgen201152023.dfs.core.windows.net", "44f21ab5-0696-4d62-b5f9-d2c3c4d6ccf7") # Application ID
spark.conf.set("fs.azure.account.oauth2.client.secret.adlsgen201152023.dfs.core.windows.net", "mbZ8Q~glUphbC502JV~fYtEE_1VvDDm1YHvzocMq") # App registration --> certificates & secrets --> New client secret --> Add (Value)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.adlsgen201152023.dfs.core.windows.net", "https://login.microsoftonline.com/423a42da-ad3f-4e69-8aec-c86c0d0620b1/oauth2/token") # Directory (tenant) ID

In [0]:
src_path = "abfss://global@adlsgen201152023.dfs.core.windows.net/input/"

df = spark.read.format("csv").option("header", True).load(src_path)
df.show()

+--------+---+------+
|    Name|Age|Gender|
+--------+---+------+
|  Kishan| 24|  Male|
|   Kumar| 15|  Male|
|   Reddy| 45|  Male|
|Thamatam| 23|  Male|
| Venkata| 67|Female|
+--------+---+------+



In [0]:
# Writing the file to the location as a csv
tgt_path = "abfss://global@adlsgen201152023.dfs.core.windows.net/output/"

df.write.mode("append").format("csv").option("header", True).save(tgt_path)