###Hello Databricks!


In [0]:
print("Starting first databricks here. Conncted to GitHub Personal Repo...")

In [0]:
def print_hello(name):
    print(f"Hello {name}..")
print_hello(name="Onkar")

In [0]:
some_data = [{"name": "Onkar", "age": 30, "salary": 100000, "city" : "Helsingborg","role":"Data Engineer","department":"Range Ops"},
             {"name": "Ashok", "age": 32, "salary": 200000, "city" : "Malmö","role":"Software Engineer","department":"Range Exp"},
             {"name": "Shreyas", "age": 25, "salary": 150000, "city" : "Stockholm","role":"Senior Software Engineer","department":"FCS"},
             {"name": "Chetan", "age": 35, "salary": 350000, "city" : "Älmhult","role":"Engineering Manager","department":"Marketing"},
             {"name": "Joseph", "age": 25, "salary": 250000, "city" : "Gothenburg","role":"Store Manager","department":"Retail"},
             {"name": "Rohini", "age": 25, "salary": 100000, "city" : "Gothenburg","role":"Associate","department":"retail"},
             {"name": "Manan", "age": 26, "salary": 170000, "city" : "Gothenburg","role":"Senior Associate","department":"REtail"}]
schema = "name string, age int, salary int, city string, role string, department string"
df = spark.createDataFrame(some_data, schema=schema)
display(df)
df.printSchema()

In [0]:
df.filter(df.city=='Gothenburg').show()

In [0]:
from pyspark.sql.functions import lower, max
df_goth = df.filter(lower(df.department)=='retail')
display(df_goth)
from pyspark.sql.functions import max as max_agg; df_goth.filter(df_goth.salary == df_goth.agg(max_agg("salary")).collect()[0][0]).show()

#### We want to access secrets stored in Azure Key Vault in the Databricks environment. For that first we create Azure Key Vault under the same resource group in Azure portal. Update Access Policy of Azure Key Vault to use Vault Based Permission Model in Settings --> Access Configuration. Then give access to Databricks application under Access Policies tab. We then create secret scope in Databricks environment with URL [databricks_workspace_url]#secrets/createScope. Provide Key Vault URL and resource ID (found in key vault Settings --> Properties).

In [0]:
dbutils.secrets.listScopes()

#### Read more about databricks utilities here --> https://docs.databricks.com/aws/en/dev-tools/databricks-utils. dbutils secret utility lets you access env secrets and credentials in the databricks notebooks. Secrets are stored in Azure Key Vault. And key vault is scoped in Secret Scope under databricks.

In [0]:
dbutils.secrets.list('db-secret-scope')

In [0]:
app_client_id = dbutils.secrets.get(scope='db-secret-scope', key='app-client-id')
app_tenant_id = dbutils.secrets.get(scope='db-secret-scope', key='app-tenant-id')
app_client_secret = dbutils.secrets.get(scope='db-secret-scope', key='app-secret')
print(app_client_id)
print(app_tenant_id)
print(app_client_secret)

### Widgets are databricks utilities that lets you accept user inputs in the form of text box, dropdown or multi select. These inputs can be used in notebooks as user inputs. widgets help make notebook parameterized.

In [0]:
dbutils.widgets.dropdown("env", "dev", ["dev", "test", "prod"])
env = dbutils.widgets.get("env")
print(env)
dbutils.widgets.multiselect("env1", "dev", ["dev", "test", "prod"])
env = dbutils.widgets.get("env1")
print(env)

#### To access Azure data lake [Azure data lake is a blob storage in Azure], we need to set azure storage account configuration in spark code. We use App registration in Azure as a medium to grant access to Azure resources. We have set App credentials unde Azure Key Vault and Databricks Secrets Scope, The app in turn has access to Azure blob storage

In [0]:

spark.conf.set("fs.azure.account.auth.type.onkardbstorageacct.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.onkardbstorageacct.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.onkardbstorageacct.dfs.core.windows.net", app_client_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.onkardbstorageacct.dfs.core.windows.net", app_client_secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.onkardbstorageacct.dfs.core.windows.net", f"https://login.microsoftonline.com/{app_tenant_id}/oauth2/token")

In [0]:
dbutils.fs.ls("/")

#### Following code lists all the files in folder names Source in Azure storage account named 'onkardbstorageacct'

In [0]:
dbutils.fs.ls("abfss://source@onkardbstorageacct.dfs.core.windows.net/")


#### Printing contnt of Sales.csv file on the screen using dbutils.fs.head 

In [0]:
dbutils.fs.head("abfss://source@onkardbstorageacct.dfs.core.windows.net/Sales.csv")

In [0]:
sales_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("abfss://source@onkardbstorageacct.dfs.core.windows.net/Sales.csv")

In [0]:
sales_df.show(10, truncate=False)

In [0]:
sales_df.printSchema()

In [0]:
display(sales_df)

In [0]:
item_types = [row.Item_Type for row in sales_df.select('Item_Type').distinct().collect()]
dbutils.widgets.dropdown("item_type", item_types[0], item_types)
item_type = dbutils.widgets.get("item_type")



In [0]:
filtered_df = sales_df.filter(sales_df.Item_Type == item_type)

In [0]:
filtered_df.write.mode("append").format("delta").save("abfss://destination@onkardbstorageacct.dfs.core.windows.net/sales/")

In [0]:
display(sales_df)

### Save the dataframe as a delta table. delta table or delta lake is storage layer optimized over praquet data format. Databricks uses Parquet file format as default. Delta table storage format creates delta logs on top of parquet file that stores transaction logs, versions on paruqet file giving the ACID like features on databricks tables.Following table is saved as "sales" table under "default" database schema in databricks catalog

In [0]:
sales_df.write.mode("overwrite").format("delta").saveAsTable("sales")

In [0]:
%sql
SELECT * FROM SALES;

In [0]:
%sql
DESCRIBE sales;

##### create your own database schema to store "sales" table

In [0]:
%sql
create database if not exists sales_db;
use sales_db;

In [0]:
%sql 
create table if not exists sales_db.sales as select * from default.sales;

In [0]:
%sql
select * from sales_db.sales;

#### Dropping managed tables will delete table metadata from databricks catalog and Azure file system as well. Observe files stored in storage account blob containers.

In [0]:
%sql


-- DROP table if exists sales_db.sales;

#### Saving dataframe as an external table in Azure blob storage

In [0]:
sales_df.write.mode("append").format("delta").option("path", "abfss://destination@onkardbstorageacct.dfs.core.windows.net/sales_t/").saveAsTable("sales_db.sales_t")

In [0]:
%sql 
SELECT * FROM sales_db.sales_t

In [0]:
%sql
EXPLAIN SELECT * FROM sales_db.sales_t

### to use external locations like Azure blob containers as source for delta tables, its important that the databricks workspace has access to Azure blob storage. For that we follow following steps
1. Every time workspace is created from Azure databricks, it creates databricks "unity-catalog-access-connector". Unity Catalog manages all connections to external data from databricks workspace. Copy resource ID of your unity-catalog-access-connector from azure portal.
2. Give access to unity-catalog-access-connector to your azure blob containers. We need access to read, write and managae metadata. I have given higher previligae in this case
3. Create Credentials in databricks workspace. Go to Catalog --> Credentials --> Create Credentials. Give details of your unity-catalog-access-connector on this page and create credentisls. Make sure that this newly created credential can be used by databricks workspace by navigating to Workspaces tab on Catalog --> Credentials
4. Create External location in Databrickls workspace. Go to Catalog --> External Locations --> Create external locations. Use credentials created in Step 3 to create ext location
5. Now you can create external table in databricks notebook

#### Create external table backed by Azure blob storage location

In [0]:
%sql
CREATE TABLE sales_db.sales_ext_t (Item_Identifier STRING, Item_Weight DOUBLE, Item_Fat_Content STRING, Item_Visibility DOUBLE, Item_Type STRING, Item_MRP DOUBLE, Outlet_Identifier STRING, Outlet_Establishment_Year INT, Outlet_Size STRING, Outlet_Location_Type STRING, Outlet_Type STRING, Item_Outlet_Sales DOUBLE ) USING DELTA
LOCATION 'abfss://destination@onkardbstorageacct.dfs.core.windows.net/SalesDB/salesexttable' 

In [0]:
%sql
INSERT INTO sales_db.sales_ext_t VALUES('Apetina Paneer',225, 'Low fat', 100,'Healthy Food', 42.50, 'ICA Maxi Hbg',2019, 'Big','City','Supermarket',20964)

In [0]:
%sql
SELECT * FROM sales_db.sales_ext_t;

In [0]:
%sql
UPDATE sales_db.sales_ext_t SET Outlet_Establishment_Year = 2018 WHERE Item_Identifier = 'Apetina Paneer'

In [0]:
%sql
SELECT * FROM sales_db.sales_ext_t

##### DESCRIBE HISTORY gives all the transactions/updates done to the table.

In [0]:
%sql
DESCRIBE HISTORY sales_db.sales_ext_t