test databricks-connect

requirements:

- service principal
- secrets in databricks
- venv
- databricks profiles
- GRANT SELECT ON ANY FILE TO `service principal id`
- external location
- GRANT CREATE EXTERNAL TABLE ON external location 

In [1]:
%pip install --upgrade "databricks-connect==15.4.*" 

Note: you may need to restart the kernel to use updated packages.


The system cannot find the path specified.


In [1]:
#imports
from pyspark.sql.types import *
from databricks.connect import DatabricksSession
from databricks.sdk import WorkspaceClient

In [28]:
#create workspace client
w = WorkspaceClient(profile='WORKSPACEM2M')

#create spark session
spark = DatabricksSession.builder.profile('WORKSPACEM2M').getOrCreate()

In [29]:
storageAccountName = "adedlspws"
containerName = "azadescontainer"
directoryName = "raw"

#create dbutils 
dbutils = w.dbutils

#get credentials
accountKey = dbutils.secrets.get(scope="azurestorage", key="accountkey")
principalid = dbutils.secrets.get(scope="workspacem2m", key="principalid")

# Configure Spark to access Azure Blob storage account
spark.conf.set(f"fs.azure.account.key.{storageAccountName}.dfs.core.windows.net", accountKey)

TEMP VIEW AND CTAS

In [4]:
# specify the schema for the orders file

orderschema = StructType(
    [
        StructField("invoiceno", StringType(), True),
        StructField("stockcode", StringType(), True),
        StructField("description", StringType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("invoicedate", StringType(), True),
        StructField("unitprice", DoubleType(), True),
        StructField("customerid", StringType(), True),
        StructField("country", StringType(), True),
        StructField("orderid", IntegerType(), True),
    ]
)

# read the orders files from the datain folder in orders container

ordersdf = spark.read.csv(
    f"abfss://{containerName}@{storageAccountName}.dfs.core.windows.net/{directoryName}/",
    schema=orderschema,
    sep=",",
)

#create view to access the dataframe in sql context
ordersdf.createOrReplaceTempView("orders")

In [5]:
spark.sql("select * from orders").show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+-------+-------+
|invoiceno|stockcode|         description|quantity|        invoicedate|unitprice|customerid|Country|orderid|
+---------+---------+--------------------+--------+-------------------+---------+----------+-------+-------+
|   536370|    10002|INFLATABLE POLITI...|      48|2010-12-01 08:45:00|     0.85|         4| France|   1467|
|   536382|    10002|INFLATABLE POLITI...|      12|2010-12-01 09:45:00|     0.85|         4|  India|   1578|
|   536756|    10002|INFLATABLE POLITI...|       1|2010-12-02 14:23:00|     0.85|         1|Bahrain|   5708|
|   536863|    10002|INFLATABLE POLITI...|       1|2010-12-03 11:19:00|     0.85|         7|Bahrain|   6902|
|   536865|    10002|INFLATABLE POLITI...|       5|2010-12-03 11:28:00|     1.66|         1|Bahrain|   6982|
+---------+---------+--------------------+--------+-------------------+---------+----------+-------+-------+
only showing top 5 

In [6]:
#location and credential

metastore = "abfss://ade-metastore@adedlspws.dfs.core.windows.net/"
catalogue = "adedatabricks"
database = "sales"
table = "orders_stage"
external_location_name = "azade-storage-location"
location = f"{metastore}/{database}/stage"
storage_credential = "azade-storage-credential"

#drop db
spark.sql(f"DROP DATABASE IF EXISTS {catalogue}.{database} CASCADE;")

#create db
spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalogue}.{database};")

#create table
query = f'''
CREATE OR REPLACE TABLE {catalogue}.{database}.{table}
USING delta
LOCATION '{location}'
AS SELECT *
FROM orders;
'''
spark.sql(query)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [7]:
# query the external location

spark.sql("SELECT * FROM adedatabricks.sales.orders_stage").show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+-------+-------+
|invoiceno|stockcode|         description|quantity|        invoicedate|unitprice|customerid|Country|orderid|
+---------+---------+--------------------+--------+-------------------+---------+----------+-------+-------+
|   536370|    10002|INFLATABLE POLITI...|      48|2010-12-01 08:45:00|     0.85|         4| France|   1467|
|   536382|    10002|INFLATABLE POLITI...|      12|2010-12-01 09:45:00|     0.85|         4|  India|   1578|
|   536756|    10002|INFLATABLE POLITI...|       1|2010-12-02 14:23:00|     0.85|         1|Bahrain|   5708|
|   536863|    10002|INFLATABLE POLITI...|       1|2010-12-03 11:19:00|     0.85|         7|Bahrain|   6902|
|   536865|    10002|INFLATABLE POLITI...|       5|2010-12-03 11:28:00|     1.66|         1|Bahrain|   6982|
+---------+---------+--------------------+--------+-------------------+---------+----------+-------+-------+
only showing top 5 

COPY INTO, INSERT AND MERGE

In [8]:
# variables

storage_credential = "azade-storage-credential"
metastore = "abfss://ade-metastore@adedlspws.dfs.core.windows.net/"
catalogue = "adedatabricks"
database = "sales"
external_location_name = "azade-storage-location"

In [21]:
# create landing table
landing_table = "orders_landing"
landing_location = f"{metastore}/{database}/landing"

#create table
query = f'''
CREATE TABLE {catalogue}.{database}.{landing_table}(
_c0 string,
_c1 string,
_c2 string,
_c3 string,
_c4 string,
_c5 string,
_c6 string,
_c7 string,
_c8 string
)
USING delta
location '{landing_location}';
'''
spark.sql(query)

DataFrame[]

In [18]:
#create staging table
stage_table = "orders_stage"
stage_location = f"{metastore}/{database}/stage"


#drop db
spark.sql(f"DROP DATABASE IF EXISTS {catalogue}.{database} CASCADE;")

#create db
spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalogue}.{database};")


#create table
query = f'''
CREATE OR REPLACE TABLE {catalogue}.{database}.{stage_table} (
    invoiceno STRING,
    stockcode STRING,
    description STRING,
    quantity INT,
    invoicedate STRING,
    unitprice DOUBLE,
    customerid STRING,
    country STRING,
    orderid INT
)
USING delta
OPTIONS (overwriteSchema = 'true')
LOCATION '{stage_location}';
'''
spark.sql(query)

DataFrame[]

In [26]:
#create orders table
bronze_table = "orders_bronze"
bronze_location = f"{metastore}/{database}/bronze"

query = f'''
CREATE TABLE IF NOT EXISTS {catalogue}.{database}.{bronze_table} (
    invoiceno string,
    stockcode string,
    description string,
    quantity int,
    invoicedate string,
    unitprice string,
    customerid string,
    country string,
    orderid int
)
USING delta
LOCATION '{bronze_location}';
'''
spark.sql(query)

# check orders_bronze table
spark.sql(f"SELECT * FROM {catalogue}.{database}.{bronze_table}").show(5)

+---------+---------+-----------+--------+-----------+---------+----------+-------+-------+
|invoiceNo|stockCode|description|quantity|invoiceDate|unitPrice|customerID|country|orderid|
+---------+---------+-----------+--------+-----------+---------+----------+-------+-------+
+---------+---------+-----------+--------+-----------+---------+----------+-------+-------+



In [24]:
# COPY INTO ingest for landing table

source_path = f"abfss://{containerName}@{storageAccountName}.dfs.core.windows.net/{directoryName}/"

query = f'''
    COPY INTO {catalogue}.{database}.{landing_table}
    FROM '{source_path}'
    FILEFORMAT = CSV
    PATTERN = '*.csv*'
    FORMAT_OPTIONS ('delimiter'=',','header'='false');
'''
spark.sql(query)

# check the table
spark.sql(f"SELECT * FROM {catalogue}.{database}.{landing_table}").show(5)

+------+-----+--------------------+---+-------------------+----+---+-------+----+
|   _c0|  _c1|                 _c2|_c3|                _c4| _c5|_c6|    _c7| _c8|
+------+-----+--------------------+---+-------------------+----+---+-------+----+
|536370|10002|INFLATABLE POLITI...| 48|2010-12-01 08:45:00|0.85|  4| France|1467|
|536382|10002|INFLATABLE POLITI...| 12|2010-12-01 09:45:00|0.85|  4|  India|1578|
|536756|10002|INFLATABLE POLITI...|  1|2010-12-02 14:23:00|0.85|  1|Bahrain|5708|
|536863|10002|INFLATABLE POLITI...|  1|2010-12-03 11:19:00|0.85|  7|Bahrain|6902|
|536865|10002|INFLATABLE POLITI...|  5|2010-12-03 11:28:00|1.66|  1|Bahrain|6982|
+------+-----+--------------------+---+-------------------+----+---+-------+----+
only showing top 5 rows



In [25]:
# INSERT INTO stage table

query = f''' 
INSERT INTO {catalogue}.{database}.{stage_table}
SELECT * FROM {catalogue}.{database}.{landing_table};
'''
spark.sql(query)

# check the table
spark.sql(f"SELECT * FROM {catalogue}.{database}.{stage_table}").show(5)


+---------+---------+--------------------+--------+-------------------+---------+----------+-------+-------+
|invoiceno|stockcode|         description|quantity|        invoicedate|unitprice|customerid|country|orderid|
+---------+---------+--------------------+--------+-------------------+---------+----------+-------+-------+
|   536370|    10002|INFLATABLE POLITI...|      48|2010-12-01 08:45:00|     0.85|         4| France|   1467|
|   536382|    10002|INFLATABLE POLITI...|      12|2010-12-01 09:45:00|     0.85|         4|  India|   1578|
|   536756|    10002|INFLATABLE POLITI...|       1|2010-12-02 14:23:00|     0.85|         1|Bahrain|   5708|
|   536863|    10002|INFLATABLE POLITI...|       1|2010-12-03 11:19:00|     0.85|         7|Bahrain|   6902|
|   536865|    10002|INFLATABLE POLITI...|       5|2010-12-03 11:28:00|     1.66|         1|Bahrain|   6982|
+---------+---------+--------------------+--------+-------------------+---------+----------+-------+-------+
only showing top 5 

CLEAN AND MERGE

clean, transform and ingest the data into the orders table.
1. Update country "Unspecified" to "India"
2. Update customerid to 1 where it's null
3. Merge orders from orders_staging

In [32]:
# clean stage_table

clean_country = f"UPDATE {catalogue}.{database}.{stage_table} SET country='India' where country='Unspecified';"
spark.sql(clean_country)

clean_customer = f"UPDATE {catalogue}.{database}.{stage_table} SET customerid='1' where customerid='null';"
spark.sql(clean_customer)

# MERGE INTO bronze table

merge_query = f'''
    MERGE INTO {catalogue}.{database}.{bronze_table} as orders
    USING {catalogue}.{database}.{stage_table} as stage
    ON
    (
        orders.InvoiceNo = stage.invoiceno AND 
        orders.stockCode = stage.stockcode AND 
        orders.customerID = stage.customerid
    )
    WHEN MATCHED THEN 
    UPDATE SET 
        orders.unitPrice=stage.unitprice,
        orders.quantity=stage.quantity,
        orders.country=stage.country
    
    WHEN NOT MATCHED THEN 
    INSERT
    (
        invoiceNo,
        stockCode,
        description,
        quantity,
        invoiceDate,
        unitPrice,
        customerID,
        country,
        orderid
    )
    
    VALUES
    (
        invoiceno,
        stockcode,
        description,
        quantity,
        invoicedate, 
        unitprice,
        customerid,
        country,
        orderid
    )
'''
spark.sql(merge_query)

# check the table
spark.sql(f"SELECT * FROM {catalogue}.{database}.{bronze_table}").show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+-------+-------+
|invoiceNo|stockCode|         description|quantity|        invoiceDate|unitPrice|customerID|country|orderid|
+---------+---------+--------------------+--------+-------------------+---------+----------+-------+-------+
|   578539|    11001|ASSTD DESIGN RACI...|      16|2011-11-24 14:55:00|     1.69|         1|  India|  56619|
|   578539|    20749|ASSORTED COLOUR M...|       4|2011-11-24 14:55:00|     7.95|         1|  India|  56603|
|   578539|    20750|RED RETROSPOT MIN...|       4|2011-11-24 14:55:00|     7.95|         1|  India|  56604|
|   578539|    20975|12 PENCILS SMALL ...|      24|2011-11-24 14:55:00|     0.65|         1|  India|  56611|
|   578539|    20981|12 PENCILS TALL T...|      12|2011-11-24 14:55:00|     0.85|         1|  India|  56612|
+---------+---------+--------------------+--------+-------------------+---------+----------+-------+-------+
only showing top 5 

TRANSFORMATIONS

In [118]:
#get the customer information from SQL Database

#SQL Server FQDN
jdbcHostname = "azadesqlserver.database.windows.net" 
jdbcDatabase = "azadesqldb" 
jdbcPort = 1433

#get username and password for database
username = dbutils.secrets.get(scope = "azuresql", key = "username")
password = dbutils.secrets.get(scope = "azuresql", key = "password")

# set the JDBC URL
jdbcUrl = \
"jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(
    jdbcHostname, jdbcPort, jdbcDatabase, username, password)

# read customer table.
customerdf = spark.read.jdbc(url=jdbcUrl, table="customer")

In [45]:
customerdf.show()

+---+---------+
| id|     name|
+---+---------+
|  1|   Hyphen|
|  2|     Page|
|  3| Data Inc|
|  4|    Delta|
|  5|     Genx|
|  6| Rand Inc|
|  7|Hallo Inc|
+---+---------+



In [46]:
# join ordersdf and customerdf and select only the required columns
customerordersdf = ordersdf \
    .join(customerdf, ordersdf.customerid == customerdf.id, how="inner") \
    .select(ordersdf.Country, ordersdf.unitprice, ordersdf.quantity, customerdf.name.alias("customername"))

In [48]:
customerordersdf.show()

+--------------+---------+--------+------------+
|       Country|unitprice|quantity|customername|
+--------------+---------+--------+------------+
|United Kingdom|     0.95|       6|      Hyphen|
|United Kingdom|     1.65|       1|      Hyphen|
|United Kingdom|      0.0|    -110|      Hyphen|
|United Kingdom|      0.0|     170|      Hyphen|
|United Kingdom|      0.0|    -178|      Hyphen|
|United Kingdom|      0.0|      60|      Hyphen|
|United Kingdom|      0.0|      70|      Hyphen|
|United Kingdom|     2.95|       2|      Hyphen|
|United Kingdom|     5.95|       2|      Hyphen|
|United Kingdom|     5.95|       2|      Hyphen|
|United Kingdom|     1.25|       3|      Hyphen|
|United Kingdom|     0.85|       3|      Hyphen|
|United Kingdom|   201.37|       1|      Hyphen|
|United Kingdom|     0.83|       1|      Hyphen|
|United Kingdom|     0.83|       2|      Hyphen|
|United Kingdom|     0.83|       1|      Hyphen|
|United Kingdom|     0.83|       1|      Hyphen|
|United Kingdom|    

In [56]:
#fliter data
from pyspark.sql.functions import col

filterdf = customerordersdf \
    .filter((col("country")!="Unspecified") & (col("quantity")>0)) \
    .withColumn("totalamount", (col("unitprice")*col("quantity")) \
    .cast("decimal(10,2)"))

In [57]:
filterdf.show()

+--------------+---------+--------+------------+-----------+
|       Country|unitprice|quantity|customername|totalamount|
+--------------+---------+--------+------------+-----------+
|United Kingdom|     0.95|       6|      Hyphen|       5.70|
|United Kingdom|     1.65|       1|      Hyphen|       1.65|
|United Kingdom|      0.0|     170|      Hyphen|       0.00|
|United Kingdom|      0.0|      60|      Hyphen|       0.00|
|United Kingdom|      0.0|      70|      Hyphen|       0.00|
|United Kingdom|     2.95|       2|      Hyphen|       5.90|
|United Kingdom|     5.95|       2|      Hyphen|      11.90|
|United Kingdom|     5.95|       2|      Hyphen|      11.90|
|United Kingdom|     1.25|       3|      Hyphen|       3.75|
|United Kingdom|     0.85|       3|      Hyphen|       2.55|
|United Kingdom|   201.37|       1|      Hyphen|     201.37|
|United Kingdom|     0.83|       1|      Hyphen|       0.83|
|United Kingdom|     0.83|       2|      Hyphen|       1.66|
|United Kingdom|     0.8

In [52]:
import pyspark.sql.functions as f

salesbycountries = filterdf.groupBy("customername", "country").agg(f.sum("totalamount").alias("Amount"))

In [53]:
salesbycountries.show()

+------------+--------------------+------------------+
|customername|             country|            Amount|
+------------+--------------------+------------------+
|   Hallo Inc|         Netherlands| 58330.76000000004|
|        Page|      United Kingdom|7856.5099999999975|
|   Hallo Inc|             Austria| 961.6000000000001|
|   Hallo Inc|             Denmark|           1680.72|
|   Hallo Inc|United Arab Emirates| 864.0400000000002|
|   Hallo Inc|              Poland|           1465.43|
|   Hallo Inc|         Switzerland| 8869.329999999994|
|   Hallo Inc|               Japan|          16869.82|
|   Hallo Inc|              Greece|2982.8500000000004|
|      Hyphen|           Hong Kong|2419.9600000000005|
|      Hyphen|                EIRE| 3310.999999999998|
|   Hallo Inc|             Finland| 7694.349999999999|
|        Page|             Germany| 816.2500000000001|
|   Hallo Inc|           Singapore|           1987.37|
|   Hallo Inc|             Germany| 48481.56999999994|
|   Hallo 

In [58]:
#insert data into SalesStaging table.

salesbycountries.write.jdbc(jdbcUrl,"dbo.SalesStaging", mode="overwrite")

#read the SalesStaging table. 

from pyspark.sql.functions import desc
salesstagingdf = spark.read.jdbc(url=jdbcUrl, table="SalesStaging")

salesstagingdf.sort(desc("Amount")).show()

+------------+---------------+------------------+
|customername|        country|            Amount|
+------------+---------------+------------------+
|   Hallo Inc| United Kingdom| 1584382.439999807|
|      Hyphen| United Kingdom| 537848.9699999607|
|   Hallo Inc|           EIRE| 58479.49000000003|
|   Hallo Inc|    Netherlands| 58330.76000000004|
|   Hallo Inc|        Germany| 48481.56999999994|
|   Hallo Inc|         France|45537.200000000004|
|   Hallo Inc|      Australia| 40499.27000000001|
|   Hallo Inc|          Japan|          16869.82|
|   Hallo Inc|          Spain|13533.070000000007|
|       Delta| United Kingdom|11934.840000000007|
|   Hallo Inc|         Norway|10150.660000000007|
|   Hallo Inc|       Portugal| 9269.690000000002|
|   Hallo Inc|    Switzerland| 8869.329999999994|
|        Page| United Kingdom|7856.5099999999975|
|   Hallo Inc|         Sweden| 7833.720000000001|
|   Hallo Inc|        Finland| 7694.349999999999|
|   Hallo Inc|         Cyprus| 7323.350000000001|
