In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql.functions  import from_unixtime
from pyspark.sql.functions  import to_date
from pyspark.sql import Row
from pyspark.sql.functions import to_json, struct
from pyspark.sql import functions as F

# Read and write

## Read from ADLS-Gen2
For practical purposes we use blob storage (ADLS Gen-2) as data source, here's an example of how to read files stored there

First, create a ADLS-Gen2 storage account. Then, authenticate into the storage endpoint. To do so, these needs to be performed in Azure:
1. **Application registration**: You will need to register an Azure Active Directory (AAD) application. On the Azure portal home page, search for "Azure Active Directory" &rarr; select App registrations &rarr; New registration.
2. **Create secret to the application**: Click on "Certificates & secrets" under the Manage heading &rarr; add a new client secret &rarr; Copy the value
3. **Grant ADLS-Gen2 access to the registered Application**: In the ADLS-Gen2 storage account, navigate to Access Control (IAM) &rarr; Add &rarr; Add role assignment &rarr; Role = Storage Blob Data Contributor; Assign access to = User, group, or service principal; Select = The registered Application

IMPORTANT NOTE: Mounting is no longer a recommended practice, see [here](https://docs.databricks.com/dbfs/mounts.html)

In [None]:
# Mount configuration
storage_account = "xxx" # Name of the ADLS Gen2 Storage Account
storage_container = "xxx" # Name of the ADLS Gen2 Storage Container
mount_name = "xxx"

# Authentication
client_id = "xxx" # Obtained from (1) the registered Application -> Application (client) ID
tenant_id = "xxx" # Obtained from (1) the registered Application -> Directory (tenant) ID
client_secret = "xxx" # Obtained from (2) the registered Application -> Copied secret value

configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": client_id,
           "fs.azure.account.oauth2.client.secret": client_secret,
           "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"}

# Check if the mount already exists, and mount if not
mounts = [str(i) for i in dbutils.fs.ls('/mnt/')]
if any(f"dbfs:/mnt/{mount_name}" in i for i in mounts):
    print(mount_name + " has already been mounted")
else:
    dbutils.fs.mount(
    source = f"abfss://{storage_container}@{storage_account}.dfs.core.windows.net",
    mount_point = f"/mnt/{mount_name}",
    extra_configs = configs
    )

Files in the ADLS-Gen2 can thus be read

In [None]:
# Reading Orders.csv file in a Spark dataframe
df_ord= spark.read.format("csv").option("header",True).load(f"abfss://staging_container@{storageEndPoint}/orders")

In [None]:
# Executing the below will create an External table in Databricks for you to read the CSV.
# This is not about creating Delta tables however
spark.sql(f"""
CREATE OR REPLACE TABLE Orders
    {{(schema)}}
USING {{csv}}
OPTIONS (
    path 'abfss://staging_container@{storageEndPoint}/orders',
    header 'true',
    delimiter ','
    )
""")

## Writing to Delta Tables 

Delta Table is actually a bunch of snappy-compressed parquet files, with Delta Log files. It offer the following benefits:
- Easy rollback since it tracks every changes to the table in the delta log
- ACID compliance
- Enforce constraint defined in DDL
- Optimized performance

Below is an example for creating Delta Table using CSV files, These will happen after executing the below:
- Under the defined LOCATION, snappy-compressed parquet files will be created, alongside the _delta_log folder
- You will find the relevant table under the "Data" section in Databricks
- The table information will persist in Databricks's metastore even after you shut down the clusters

Some notes to the code
- The Temp View is necessary to allow Databricks to parse the CSV first, before making it a Delta table
- LOCATION flag is used to make the table an unmanaged one, whose data does not reside in dfbs but in the specified path

In [None]:
# Path for the raw data blobs
original_path = f"abfss://staging_container@{storageEndPoint}/orders"

# Delta output path
delta_path = f"abfss://loading_container@{storageEndPoint}/orders"

In [None]:
spark.sql(f"""
CREATE OR REPLACE TEMP VIEW Orders_vw
    {{(schema, if necessary)}}
USING {{file format e.g. csv}}
OPTIONS (
    path '{original_path}',
    header 'true',
    delimiter '|'
    );

CREATE OR REPLACE TABLE Orders
LOCATION '{delta_path}'
AS
SELECT *, EXTRACT(YEAR FROM Event_Date) FROM Orders_vw
""")

For self-describing file formats like parquet, the syntax can be made easier. Note: This does not support manual schema declaration, like one would do for CSV

In [None]:
# Note: This will not work for CSV, beecause the CTAS statement cannot infer schema correctly
spark.sql(f"""
CREATE OR REPLACE TABLE Orders
LOCATION '{delta_path}'
AS
    SELECT *
    FROM parquet.`{original_path}`
"""
)

Alternative code using PySpark

In [None]:
# Read input into dataframe
df_ord = (spark.read.format("parquet").load(original_path)
      .withColumn("timestamp", current_timestamp())
      .withColumn("O_OrderDateYear", year(col("O_OrderDate")))
     )

# Save into delta path. Go over to the ADLS Gen2 container and you should see new files got created in the delta path
# Files are organized into different folders according to the "partitionBy" value
df_ord.write.format("delta").partitionBy("O_OrderDateYear").mode("overwrite").option("path",delta_path).saveAsTable("Orders")

After creation, the table can be queried in SQL or Python

In [None]:
# Via SQL
%sql
SELECT o.*
FROM Orders o

# Via Python
deltaTable = spark.read.format("delta").table('Orders')
deltaTable.show()

Some useful commands

In [None]:
%sql

# View history of the Delta table
DESCRIBE HISTORY Orders

# Query a specific version
SELECT * FROM Orders VERSION AS OF 1

# Restore a previous version
RESTORE TABLE Orders VERSION AS OF 5

# See important metadata about the table e.g. schema, database, location, etc.
DESCRIBE EXTENDED Orders

# View underlying file structure of the table e.g. number of files, partitioning, etc.
DESCRIBE DETAIL Orders

### Declare Schema with Generated Columns

As noted previously, CTAS statements do not support schema declaration. We note above that the timestamp column appears to be some variant of a Unix timestamp, which may not be the most useful for our analysts to derive insights. This is a situation where generated columns would be beneficial.

Generated columns are a special type of column whose values are automatically generated based on a user-specified function over other columns in the Delta table (introduced in DBR 8.3).

The code below demonstrates creating a new table while:
1. Specifying column names and types
1. Adding a <a href="https://docs.databricks.com/delta/delta-batch.html#deltausegeneratedcolumns" target="_blank">generated column</a> to calculate the date
1. Providing a descriptive column comment for the generated column

In [None]:
CREATE OR REPLACE TABLE purchase_dates (
  id STRING, 
  transaction_timestamp STRING, 
  price STRING,
  date DATE GENERATED ALWAYS AS (
    cast(cast(transaction_timestamp/1e6 AS TIMESTAMP) AS DATE))
    COMMENT "generated based on `transactions_timestamp` column")

Because **`date`** is a generated column, if we write to **`purchase_dates`** without providing values for the **`date`** column, Delta Lake automatically computes them.

## More about Tables

### Tables vs Views vs CTE

- Table
    - Managed table: Data is actually stored in DBFS
    - Unmanaged table: Data is stored in elsewhere e.g. ADLS-Gen2. Databricks only manages the metadata of the table
- Views
    - View: Will persist like table
    - Temp View: Persist in the current notebook session only
    - Global Temp View: Can be shared across different notebook sessions, until cluster restarts
- CTE
    - CTE: Referenced within the scope of a SQL statement only


### Table Constraint

Because Delta Lake enforces schema on write, Databricks can support standard SQL constraint management clauses to ensure the quality and integrity of data added to a table.

Databricks currently support two types of constraints:
* <a href="https://docs.databricks.com/delta/delta-constraints.html#not-null-constraint" target="_blank">**`NOT NULL`** constraints</a>
* <a href="https://docs.databricks.com/delta/delta-constraints.html#check-constraint" target="_blank">**`CHECK`** constraints</a>

In both cases, you must ensure that no data violating the constraint is already in the table prior to defining the constraint. Once a constraint has been added to a table, data violating the constraint will result in write failure.

Below, we'll add a **`CHECK`** constraint to the **`date`** column of our table. Note that **`CHECK`** constraints look like standard **`WHERE`** clauses you might use to filter a dataset.

In [None]:
ALTER TABLE purchase_dates ADD CONSTRAINT valid_date CHECK (date > '2020-01-01');

### Enrich Tables with Additional Options and Metadata

So far we've only scratched the surface as far as the options for enriching Delta Lake tables.

Below, we show evolving a CTAS statement to include a number of additional configurations and metadata.

Our **`SELECT`** clause leverages two built-in Spark SQL commands useful for file ingestion:
* **`current_timestamp()`** records the timestamp when the logic is executed
* **`input_file_name()`** records the source data file for each record in the table

We also include logic to create a new date column derived from timestamp data in the source.

The **`CREATE TABLE`** clause contains several options:
* A **`COMMENT`** is added to allow for easier discovery of table contents
* A **`LOCATION`** is specified, which will result in an external (rather than managed) table
* The table is **`PARTITIONED BY`** a date column; this means that the data from each data will exist within its own directory in the target storage location

In [None]:
CREATE OR REPLACE TABLE users_pii
COMMENT "Contains PII"
LOCATION "${da.paths.working_dir}/tmp/users_pii"
PARTITIONED BY (first_touch_date)
AS
  SELECT *, 
    cast(cast(user_first_touch_timestamp/1e6 AS TIMESTAMP) AS DATE) first_touch_date, 
    current_timestamp() updated,
    input_file_name() source_file
  FROM parquet.`${da.paths.datasets}/ecommerce/raw/users-historical/`;
  
SELECT * FROM users_pii;

A note about **`PARTITIONED BY`** in the code above:
- Partitioning is not a recommended practice for Delta Tables. Most Delta Lake tables (especially small-to-medium sized data) will not benefit from partitioning. Because partitioning physically separates data files, this approach can result in a small files problem and prevent file compaction and efficient data skipping. Most tables with less than 1 TB of data do not require partitions, all partitions should contain at least 1 GB of data. 
- Do note that it is PARTITION`ED`
- Partitioning can be updated via the `CREATE OR REPLACE TABLE` clause, which would create new folders to store the data partitioned by the new criteria. Contents in the old folders storing the old partitioned data are not removed unless you execute `VACCUM`. After this is executed, contents in the old folders are removed, but the folders themselves will remain

**As a best practice, you should default to non-partitioned tables for most use cases when working with Delta Lake.**

### Cloning Delta Lake Tables
Delta Lake has two options for efficiently copying Delta Lake tables.

**`DEEP CLONE`** fully copies data and metadata from a source table to a target. This copy occurs incrementally, so executing this command again can sync changes from the source to the target location.

If you wish to create a copy of a table quickly to test out applying changes without the risk of modifying the current table, **`SHALLOW CLONE`** can be a good option. Shallow clones just copy the Delta transaction logs, meaning that the data doesn't move.

In [None]:
CREATE OR REPLACE TABLE purchases_clone
DEEP/SHALLOW CLONE purchases

### Loading into Delta Lake

**`CREATE OR REPLACE TABLE`** (CRAS):
- Fully replace the contents of a table each time they execute.

**`INSERT OVERWRITE`**:
- Can only overwrite an existing table, not create a new one like our CRAS statement
- Can overwrite only with new records that match the current table schema -- and thus can be a "safer" technique for overwriting an existing table without disrupting downstream consumers
- Can overwrite individual partitions

**`INSERT INTO SELECT XXX`**:
- Self-explanatory

**`MERGE INTO a USING b`**:
- Self-explanatory

**`COPY INTO`**:
- An idempotent option to incrementally ingest data from external systems.
COPY INTO sales
FROM "path"
FILEFORMAT = PARQUET

### Optimization
- Remove unused files from a table directory via [VACCUM](https://learn.microsoft.com/en-us/azure/databricks/spark/latest/spark-sql/language-manual/delta-vacuum/), add `DRY RUN` to preview previous versions to be deleted first. This is especially important for deleting PII information, as after DELETE the PII might still exist in previous images of the data
- Compact small files via ```OPTIMIZE```
- Indexing
  - **Z-Order**: Collocate related information for high cardinality data, often most effective when working with queries that filter against continuous numeric variables. ```ZORDER BY (comma separated column names)```
  - **Bloom Filter**: Probabilistically identifying files that may contain data using fields containing arbitrary text. ```CREATE BLOOMFILTER INDEX ON TABLE table_name FOR COLUMNS(indexed_col OPTIONS (fpp=0.1, numItems=200))```
- Turning on Auto Optimize and Auto Compaction help us avoid the tables containing too many small files. For more information on these settings, see [here](https://docs.databricks.com/delta/optimizations/auto-optimize.html)


# Streaming data pipeline from EventHub

This is an example of building the Bronze, Silver, and Gold Zone for a streaming data pipeline
- Bronze: Read live data from EventHub, and historical data from ADLS-Gen2. Parse content and union them
- Silver: Implement business rules and data cleansing process and join with lookup tables
- Gold: Data is aggregated

In [None]:
# Define variables
db_name = "VehicleSensor"

def get_config(zone):
    return {
    'delta_path': f"/mnt/SensorData/vehiclestreamingdata/{zone}/delta",
    'chkpt_path': f"/mnt/SensorData/vehiclestreamingdata/{zone}/chkpt",
    'schema_path': f"/mnt/SensorData/vehiclestreamingdata/{zone}/schema",
    'delta_table': f"VehicleDelta_{zone}"
    }

# Create DB first
spark.sql(f"CREATE DATABASE IF NOT EXISTS{db_name}")

## Bronze Zone

### Streaming from Kafka

Create Spark DataFrame which reads from the Kafka topic

In [None]:
TOPIC = "cookbook-eventhub" # Event Hub namespace
BOOTSTRAP_SERVERS = "cookbook-eventhub.servicebus.windows.net:9093" # Host name of Event Hub:9093, 9093 is the port for Kafka

# Go to Event Hub's Shared Access Policies -> Click onto a policy -> Copy Connection string–primary key here
CONN_STRING = "Endpoint=sb://kafkaenabledeventhubns.servicebus.windows.net/;SharedAccessKeyName=sendreceivekafka;SharedAccessKey=4vgxasdsdasd4aVcUWBvYp44sdasaasasasasasasvoVE=" 

# The $ConnectionString and $Default are fixed values, don't update them
EH_SASL = f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{CONN_STRING}\";"
GROUP_ID = "$Default" 

# // Read stream using Spark SQL (structured streaming)
# // consider adding .option("startingOffsets", "earliest") to read earliest available offset during testing
kafkaDF = spark.readStream \
    .format("kafka") \
    .option("subscribe", TOPIC) \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("kafka.group.id", "POC") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "latest") \
    .load() \
    .withColumn("source", lit(TOPIC)) # Optional: Also add the topic as column

#Check if streaming is on and getting the schema for the kakfa dataframe 
print(kafkaDF.isStreaming)
print(kafkaDF.printSchema())

#It should then output something like this:
#
#True
#root
# |-- key: binary (nullable = true)
# |-- value: binary (nullable = true)
# |-- topic: string (nullable = true)
# |-- partition: integer (nullable = true)
# |-- offset: long (nullable = true)
# |-- timestamp: timestamp (nullable = true)
# |-- timestampType: integer (nullable = true)
# |-- source: string (nullable = true)

Parse the Kafka message and writing the streaming data to Delta table. Some important configurations for ```writeStream```:
- ```trigger``` is for controlling the frequency of writes, default is fixed interval micro-batches of 500ms. This could be modified with ```.trigger(processingTime='10 seconds')```. For incremental batch jobs, use ```.trigger(availableNow=True)```

There are a few more configurations under `options`:
- ```checkpointLocation``` is set so that it can recover from failure in the event of server failure. If this is not defined, all state data around the streaming job is lost, and upon restart, the job must start from scratch. Each query must have a different checkpoint location.
- ```maxFiles[Bytes]PerTrigger```: Maintain a consistent batch size and prevents large batches from leading to spill and cascading micro-batch processing delays.

In [None]:
# Creating the schema for the 'value' field in Kafka message
jsonschema = StructType() \
      .add("id", StringType()) \
      .add("eventtime", TimestampType()) \
      .add("rpm", IntegerType()) \
      .add("speed", IntegerType()) \
      .add("kms", IntegerType()) \
      .add("lfi", IntegerType())  \
      .add("lat", DoubleType()) \
      .add("long", DoubleType())

# Parse data and create writeStream
kafkaDF.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "source") \
            .withColumn('vehiclejson', from_json(col('value'), jsonschema)) \
            .select("key", "value", "source", "vehiclejson.*") \
            .writeStream.format("delta") \
            .trigger(processingTime = "2 minutes") \
            .outputMode("append") \
            .option("checkpointLocation",get_config("Bronze")['chkpt_path']) \
            .start(get_config("Bronze")['delta_path']) 

# Data can thus be streamed to a Delta table 
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {db_name}.{get_config("Bronze")['delta_table']}
LOCATION '{get_config("Bronze")['delta_path']}'
""")

### Streaming from ADLS-Gen2

In case you have historical data in ADLS-Gen2, and you would like to stream-read the data as files are ingested there, AutoLoader is the prefered method. This method is also useful for batch jobs, as likely should be the case here.

Some important options for Auto Loader readStream:
- ```cloudFiles.maxFiles[Bytes]PerTrigger```: Same configuration as described above, but set using this key for Auto Loader
- ```cloudFiles.schemaLocation```: The location to store the inferred schema and subsequent changes, to deal with schema evolution

In [None]:
# Function to read data from ADLS Gen-2 using readStream API and writing as delta format
def append_batch_source(data_source_path, value_schema):
  
  kafkaDF = (spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.maxBytesPerTrigger", "parquet")
    .option("cloudFiles.schemaLocation", get_config("Hist")['schema_path'])
    .load(data_source_path)
  )

  parsedDF=kafkaDF.withColumn("source", lit('historical')) \
            .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "source") \
            .withColumn('vehiclejson', from_json(col('value'), value_schema)) \
            .select("key", "value", "source", "vehiclejson.*") \
            .writeStream.format("delta") \
            .trigger(availableNow=True) \
            .option("checkpointLocation",get_config("Hist")['chkpt_path']) \
            .option("mergeSchema", "true") \
            .start(get_config("Hist")['delta_path']) 

  return parsedDF

# Create historical delta table
append_batch_source('/xxx', jsonschema)
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {db_name}.{get_config("Historical")['delta_table']}
LOCATION '{get_config("Historical")['delta_path']}'
""")

In [None]:
# Inspect schema
spark.sql(f"""
DESCRIBE FORMATTED {db_name}.{get_config("Bronze")['delta_table']}
""")

### Union the live and historical data, and generate Temp View

With live and historical delta tables created, we can union them for subsequent usage

In [None]:
#Streaming Data from Bronze and Historical tables
df_bronze=spark.readStream.format("delta").option("latestFirst", "true").table(f"{db_name}.{get_config("Bronze")['delta_table']}")
df_historical=spark.readStream.format("delta").option("latestFirst", "true").table(f"{db_name}.{get_config("Historical")['delta_table']}")

#Joining both historical and Bronze Streaming Data. The TempView can be used like CTE in SQL statements
df_bronze_all = df_bronze.union(df_historical)
df_bronze_all.createOrReplaceTempView("vw_TempBronzeAll")

## Silver Zone

### Connect to Azure SQL DB for lookup tables

Establish connection to Azure SQL DB

In [None]:
# Config details for Azure SQL DB for VehicleInformation and LocationInformation tables
sqldbusername = dbutils.secrets.get(scope="KeyVaultScope",key="VehicleInformationDBUserId")
sqldbpwd = dbutils.secrets.get(scope="KeyVaultScope",key="VehicleInformationDBPwd")

jdbcHostname = "vehicledemoinformatiosrvr.database.windows.net"
jdbcDatabase = "VehicleInformationDB"
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, sqldbusername, sqldbpwd)
connectionProperties = {
  "user" : sqldbusername,
  "password" : sqldbpwd,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

Retrieve the two lookup tables and create Temp Views

In [None]:
# Reading dbo.VehicleInfo master table from Azure SQL DB and creating a view
vehicleInfo = "(select VehicleId,Make,Model,Category,ModelYear from dbo.VehicleInformation) vehicle"
df_vehicleInfo = spark.read.jdbc(url=jdbcUrl, table=vehicleInfo, properties=connectionProperties)
df_vehicleInfo.createOrReplaceTempView("vw_VehicleMaster")
display(df_vehicleInfo)

In [None]:
# Reading dbo.LocationInfo master table from Azure SQL DB and creating a view
locationInfo = "(select Borough,Location,Latitude,Longitude from dbo.LocationInfo) vehicle"
df_locationInfo = spark.read.jdbc(url=jdbcUrl, table=locationInfo, properties=connectionProperties)
df_locationInfo.createOrReplaceTempView("vw_LocationMaster")
display(df_locationInfo)

### De-duplication

Because Kafka provides at-least-once guarantees on data delivery, all Kafka consumers should be prepared to handle duplicate reocrds

In [None]:
deduped_df = (spark.readStream
                   .table("vw_TempBronzeAll")
                   .select("*")
                   .withWatermark("time", "30 seconds")
                   .dropDuplicates(["id", "eventtime"])
                   .createOrReplaceTempView("vw_TempBronzeAll_dedup"))

After the deduplication, we are ready to insert to records into Silver. Because we're using SQL to write to our Delta table, we'll need to make sure this table exists before we begin.

In [None]:
spark.sql(f"""CREATE TABLE IF NOT EXISTS {get_config("Silver")['delta_table']}
(id STRING, eventtime TIMESTAMP, rpm INTEGER, Year INTEGER, Month INTEGER, Day INTEGER, Hour INTEGER, 
Make STRING, Model STRING, Category STRING, Borough STRING, Location STRING)
USING DELTA
LOCATION {get_config("Silver")['delta_path']}
""")

Delta Lake has optimized functionality for insert-only merges. This operation is ideal for de-duplication: define logic to match on unique keys, and only insert those records for keys that don't already exist.

Note that in this application, we proceed in this fashion because we know two records with the same matching keys represent the same information. If the later arriving records indicated a necessary change to an existing record, we would need to change our logic to include a **`WHEN MATCHED`** clause.

A merge into query is defined in SQL below against a view titled **`stream_updates`**.

In [None]:
sql_query = f"""
  MERGE INTO {get_config("Silver")['delta_table']} a
  USING (SELECT
    stream_updates.id
    ,stream_updates.eventtime
    ,stream_updates.rpm
    ,Year(eventtime) as Year
    ,month(eventtime) as Month
    ,day(eventtime) as Day
    ,hour(eventtime) as Hour
    ,m.Make
    ,m.Model
    ,m.Category
    ,l.Borough
    ,l.Location
    FROM stream_updates
    LEFT JOIN vw_VehicleMaster m on stream_updates.id = m.VehicleId
    LEFT join vw_LocationMaster l on stream_updates.lat = l.Latitude and stream_updates.long = l.Longitude
    ) b
  ON a.VehicleId=b.VehicleId AND a.eventtime=b.eventtime
  WHEN NOT MATCHED THEN INSERT *
"""

The Spark Structured Streaming **`foreachBatch`** method allows users to define custom logic when writing.

The logic applied during **`foreachBatch`** addresses the present microbatch as if it were a batch (rather than streaming) data, thus enabling the use of some functions, like merge, or window functions. Applying these functions without **`foreachBatch`** would return error, see the end of this section

The function called in **`foreachBatch`** requires two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. The following cell defines simple logic that will allow us to register any SQL **`MERGE INTO`** query for use in a Structured Streaming write. 

In [None]:
def upsert_to_delta(microBatchDF, batchId):
    microBatchDF.createOrReplaceTempView("stream_updates")
    microBatchDF._jdf.sparkSession().sql(sql_query)

Here we perform the upsert by using the previously defined function in our **`foreachBatch`** logic.

In [None]:
df_silver = (deduped_df.writeStream
            .format("delta") \
            .foreachBatch(upsert_to_delta)
            .outputMode("update") \
            .option("checkpointLocation",get_config("Silver")['chkpt_path'])  \
            .start()
)

Run the following code to read data as streaming data from the Delta table.

In [None]:
display(spark.readStream.format("delta").table(get_config("Silver")['delta_table']).groupBy("Make").count().orderBy("Make"))

Note: If we try to apply this to a streaming read of our data without using ```foreachBatch```, we'll learn that
> Non-time-based windows are not supported on streaming DataFrames

Below is an example for an attempt to perform a window-function based deduplication

In [None]:
# ranked_df = (spark.readStream
#                   .table("bronze")
#                   .filter("topic = 'user_info'")
#                   .select(F.from_json(F.col("value").cast("string"), schema).alias("v"))
#                   .select("v.*")
#                   .filter(F.col("update_type").isin(["new", "update"]))
#                   .withColumn("rank", F.rank().over(window))
#                   .filter("rank == 1").drop("rank"))
#
# display(ranked_df)

### Quality enforcement 

This can be implemented by:
1. Enforcing **`Table Constraints`**, and sending the unconforming data to a Delta table
2. Adding a CASE WHEN flag to the Silver table itself

Below is a demonstration of #1

Table constraints apply boolean filters to columns within a table and prevent data that does not fulfill these constraints from being written.

In [None]:
ALTER TABLE heart_rate_silver ADD CONSTRAINT validbpm CHECK (heartrate > 0);

Creating a table to store quarantined records

In [None]:
%sql
CREATE TABLE IF NOT EXISTS bpm_quarantine
    (device_id LONG, time TIMESTAMP, heartrate DOUBLE)
USING DELTA
LOCATION '${da.paths.user_db}/bpm_quarantine'

With Structured Streaming operations, writing to an additional table can be accomplished within **`foreachBatch`** logic.

In [None]:
sql_query = """
MERGE INTO heart_rate_silver a
USING stream_updates b
ON a.device_id=b.device_id AND a.time=b.time
WHEN NOT MATCHED THEN INSERT *
"""

class Upsert:
    def __init__(self, query, update_temp="stream_updates"):
        self.query = query
        self.update_temp = update_temp 
        
    def upsert_to_delta(self, micro_batch_df, batch):
        micro_batch_df.filter("heartrate" > 0).createOrReplaceTempView(self.update_temp)
        micro_batch_df._jdf.sparkSession().sql(self.query)
        # Notice the .write here
        micro_batch_df.filter("heartrate" <= 0).write.format("delta").mode("append").saveAsTable("bpm_quarantine")

Note that within the **`foreachBatch`** logic, the DataFrame operations are treating the data in each batch as if it's static rather than streaming.

As such, we use the **`write`** syntax instead of **`writeStream`**.

This also means that our exactly-once guarantees are relaxed. In our example above, we have two ACID transactions:
1. Our SQL query executes to run an insert-only merge to avoid writing duplicate records to our silver table.
2. We write a microbatch of records with negative heartrates to the **`bpm_quarantine`** table

If our job fails after our first transaction completes but before the second completes, we will re-execute the full microbatch logic on job restart.

However, because our insert-only merge already prevents duplicate records from being saved to our table, this will not result in any data corruption.

## Gold Zone

Aggregate the data and write to Gold Delta table. Important concepts include the below:

readStream:
- ```withWatermark```: Control the threshold for how long to continue processing updates for a given state entity (e.g. count).
- ```window```: Defines the aggregation window for streaming data. Options include:
  - **Tumbling**: Non-overlapping windows with fixed length ```window('eventtime','1 hour')```
  - **Sliding**: Overlapping windows with fixed length, which emits result with between a fixed interval ```window('eventtime','1 hour', '5 minutes')```
  - **Session**: Window is of dynamic length, which expires after x minutes without incoming event ```session_window("eventTime", "5 minutes")```


writeStream:
- ```outputMode```: How the processed data is pushed to sink. 3 modes are available: ```append``` (the default), ```complete``` (this mode is used only when you have streaming aggregated data), and ```update``` (just outputs the updated aggregated results every time to data sink when new data arrives)

In [None]:
# Define readStream and writeStream
df_gold=(
spark.readStream.format("delta").option("latestFirst", "true").table(f"{db_name}.{get_config("Silver")['delta_table']}")
    .withWatermark("timestamp","4 minutes")
    .groupBy(window("eventtime","1 hour"),"Make","Borough","Location","Month","Day","Hour").count()) \
        .writeStream.format("delta") \
        .outputMode("update") \
        .option("checkpointLocation", get_config("Gold")['chkpt_path']) \
        .start(get_config("Gold")['delta_path'])

# Create Delta table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {db_name}.{get_config("Gold")['delta_table']}
LOCATION '{get_config("Gold")['delta_path']}'
""")

# Delta Live Tables

Delta Live Tables allows Python/SQL statements to be structured as ELT pipelines.

Comparison between Workflows and DLT:
- **DLT**: More for ELT jobs in batch/streaming mode, with built-in data quality constraints, and monitorng & logging. 
- **Workflows**: Can orchestrate arbitrary codes and ML tasks. DLT can be a task under Workflow

## Basic Info

### Settings

**Storage location**: Where the output tables and metadata required for processing will be stored. This location is either DBFS or another location you provide. Under the specified location, these folders storing Delta Lake tables are created:
-  ```autoloader/``` and ```checkpoints/```: Directories contain data used to manage incremental data processing with Structured Streaming.
- ```system/```: The ```events/``` subfolder captures events associated with the pipeline e.g. INITIALIZING, QUEUED, etc, lineage and data quality metrics are also available
- ```tables/```: The tables themselves, each subfolder contains a Delta Lake table being managed by DLT.

**Target**:
- The name of a database for persisting pipeline output data.

**Configuration**:
- Define variables for parameterization, see [here](https://learn.microsoft.com/en-us/azure/databricks/workflows/delta-live-tables/delta-live-tables-configuration#parameterize-pipelines)

**Pipeline Mode**: Specifies how the pipeline will be run.
- **Triggered** pipelines run once and then shut down until the next manual or scheduled update.
- **Continuous** pipelines run continuously, ingesting new data as it arrives.

### Python vs SQL
| Python | SQL | Notes |
|--------|--------|--------|
| Python API | Proprietary SQL API |  |
| no dlt module, no syntax check | has syntax checks| In Python, if you run a DLT notebook cell on its own it will show in error, whereas in SQL it will check if the command is syntactically valid and tell you. In both cases, individual notebook cells are not supposed to be run for DLT pipelines. |
| A Note on Imports | None | The dlt module should be explicitly imported into your Python notebook libraries. In SQL, this is not the case. |
| Tables as DataFrames | Tables as Query Results | The Python DataFrame API allows for multiple transformations of a dataset by stringing multiple API calls together. Compared to SQL, those same transformations must be saved in temporary tables as they are transformed. |
|@dlt.table()<br/>def function-name():<br/><---->return (query)|CREATE OR REFRESH [STREAMING] **```LIVE```** TABLE table_name AS select_statement. **```LIVE```** keyword is used in place of the schema name to refer to the target schema configured for the current DLT pipeline| In SQL, the core logic of transformations is contained in the SELECT statement. In Python, data transformations are specified in the ```return``` clause.  |
| @dlt.table(comment = "Python comment",table_properties = {"quality": "silver"}) | COMMENT "SQL comment"       TBLPROPERTIES ("quality" = "silver") | This is how you add comments and table properties in Python vs. SQL |

### Live Tables vs Streaming Live Tables
**Live Tables**
* Always "correct", meaning their contents will match their definition after any update.
* Return same results as if table had just been defined for first time on all data.
* Should not be modified by operations external to the DLT Pipeline (you'll either get undefined answers or your change will just be undone).

**Streaming Live Tables**
* Only supports reading from "append-only" streaming sources.
* Only reads each input batch once, no matter what (even if joined dimensions change, or if the query definition changes, etc).
* Can perform operations on the table outside the managed DLT Pipeline (append data, perform GDPR, etc).

## Data pipeline with DLT
Here's an example for using DLT + Auto Loader for Bronze -> Silver -> Gold incremental processing. Some notes about DLT pipeline

**Temporary tables**: They can be created with these methods
- Declare temporary tables e.g. ```CREATE TEMPORARY LIVE TABLE temp_table```
- Create Views e.g. ```CREATE LIVE VIEW temp_table``` Unlike views used elsewhere in Databricks, DLT views are not persisted to the metastore, meaning that they can only be referenced from within the DLT pipeline they are a part of. (This is similar scoping to temporary views in most SQL systems.)


**Joins and Referencing Tables Across Notebook Libraries**
- Within a DLT Pipeline, code in any notebook library can reference tables and views created in any other notebook library. Essentially, we can think of the scope of the schema reference by the **`LIVE`** keyword to be at the DLT Pipeline level, rather than the individual notebook.

**CDC and SCD**
- Although not shown in the below code, SCD 1/2 can be applied with ```APPLY CHANGE INTO``` syntax, so that changes from the upstream tables can be propagated into downstream tables. See [here](https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-cdc.html).

### Bronze Zone
Incremental processing via <a herf="https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html" target="_blank">Auto Loader</a> (which uses the same processing model as Structured Streaming), requires the addition of the **`STREAMING`** keyword in the declaration as seen below. The **`cloud_files()`** method enables Auto Loader to be used natively with SQL. This method takes the following positional parameters:
* The source location, as mentioned above
* The source data format, which is JSON in this case
* An arbitrarily sized array of optional reader options. In this case, we set **`cloudFiles.inferColumnTypes`** to **`true`**

Assume the parameter ```source``` is defined in DLT Pipeline ```Configuration```

In [None]:
CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_raw
COMMENT "The raw sales orders, ingested from retail-org/sales_orders."
AS SELECT * FROM cloud_files("${source}/orders", "json", map("cloudFiles.inferColumnTypes", "true"));

CREATE OR REFRESH STREAMING LIVE TABLE customers
COMMENT "The customers buying finished products, ingested from retail-org/customers."
AS SELECT * FROM cloud_files("${source}/customers", "csv");

### Silver Zone
The **`CONSTRAINT`** keyword introduces quality control. Similar in function to a traditional **`WHERE`** clause, **`CONSTRAINT`** integrates with DLT, enabling it to collect metrics on constraint violations. Constraints provide an optional **`ON VIOLATION`** clause, specifying an action to take on records that violate the constraint. The three modes currently supported by DLT include:

| **`ON VIOLATION`** | Behavior |
| --- | --- |
| **`FAIL UPDATE`** | Pipeline failure when constraint is violated |
| **`DROP ROW`** | Discard records that violate constraints |
| Omitted | Records violating constraints will be included (but violations will be reported in metrics) |

In [None]:
CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_cleaned(
  CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW,
  CONSTRAINT valid_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
)
COMMENT "The cleaned sales orders with valid order_number(s)."
AS
  SELECT f.customer_id, f.customer_name, f.number_of_line_items, 
         timestamp(from_unixtime((cast(f.order_datetime as long)))) as order_datetime, 
         c.loyalty_segment
  FROM STREAM(LIVE.sales_orders_raw) f
  LEFT JOIN LIVE.customers c
    ON c.customer_id = f.customer_id

### Gold Zone
At the most refined level of the architecture, we declare a table delivering an aggregation with business value, in this case a collection of sales order data based in a specific region. In aggregating, the report generates counts and totals of orders by date and customer.

In [None]:
CREATE OR REFRESH LIVE TABLE sales_order_in_la
COMMENT "Sales orders in LA."
AS
  SELECT city, order_date, customer_id, customer_name, ordered_products_explode.curr, 
         sum(ordered_products_explode.price) as sales, 
         sum(ordered_products_explode.qty) as quantity, 
         count(ordered_products_explode.id) as product_count
  FROM (SELECT city, order_date, customer_id, customer_name, explode(ordered_products) as ordered_products_explode
        FROM LIVE.sales_orders_cleaned 
        WHERE city = 'Los Angeles')
  GROUP BY order_date, city, customer_id, customer_name, ordered_products_explode.curr

# Delta Change Data Feed

Change data feed allows Databricks to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records change events for all the data written into the table.

Here are the codes for enabling this feature:

In [None]:
# This enables CDF for particular tables
spark.sql("""
ALTER TABLE silverTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
"""
)

# Or, enable CDF using Spark conf setting in a notebook or on a cluster will ensure it's used on all newly created Delta tables in that scope.
spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", True)

After this is enabled, two things will happen:
- A folder ```_change_data``` will appear in the Delta Table directory, which contains parquet files
- CDC data of the table can be read using the below methods

In [None]:
# via Python
cdc_df = (spark.readStream
               .format("delta")
               .option("readChangeData", True)
               .option("startingVersion", 0)
               .table("silver"))

# via SQL
spark.sql("""
SELECT * FROM table_changes('silver', 0) order by _commit_timestamp
""")

Note that we are using a table that has updates written to it as a streaming source! This is a **huge** value add, and something that historically has required extensive workarounds to process correctly.

This would mean:
- **Silver and Gold tables**: Updates to Silver table can be isolated and pushed to Gold table, without reading through the whole Silver table. Data removal requests can also be more easily fulfilled, as DELETE pushed down, see below cell for an example
- **Materialized view**s: Create up-to-date, aggregated views of information for use in BI and analytics without having to reprocess the full underlying tables, instead updating only where changes have come through.
- **Transmit changes**: Send a change data feed to downstream systems such as Kafka or RDBMS that can use it to incrementally process in later stages of data pipelines.
- **Audit trail table**: Capture the change data feed as a Delta table provides perpetual storage and efficient query capability to see all changes over time, including when deletes occur and what updates were made.

Please see [here](https://docs.databricks.com/_static/notebooks/delta/cdf-demo.html) for a demo. Below is an example `forEachBatch` function to propagate delete to downstream tables

In [None]:
def process_deletes(microBatchDF, batchId):
    
    (microBatchDF
        .filter("_change_type = 'delete'")
        .createOrReplaceTempView("deletes"))
    
    microBatchDF._jdf.sparkSession().sql("""
        MERGE INTO users u
        USING deletes d
        ON u.alt_id = d.alt_id
        WHEN MATCHED
            THEN DELETE
    """)

    microBatchDF._jdf.sparkSession().sql("""
        DELETE FROM user_bins
        WHERE user_id IN (SELECT user_id FROM deletes)
    """)
    
    microBatchDF._jdf.sparkSession().sql("""
        MERGE INTO delete_requests dr
        USING deletes d
        ON d.alt_id = dr.alt_id
        WHEN MATCHED
          THEN UPDATE SET status = "deleted"
    """)

# Other things

## Privileges
- **SELECT**: gives read access to an object.
- **CREATE**: gives ability to create an object (for example, a table in a schema).
- **MODIFY**: gives ability to add, delete, and modify data to or from an object.
- **USAGE**: does not give any abilities, but is an additional requirement to perform any action on a ```schema``` object.
- **READ_METADATA**: gives ability to view an object and its metadata.
- **CREATE_NAMED_FUNCTION**: gives ability to create a named UDF in an existing `catalog` or `schema`.
- **MODIFY_CLASSPATH**: gives ability to add files to the Spark class path.
- **ALL PRIVILEGES**: gives all privileges (is translated into all the above privileges).

## Compute

Cluster ACL https://docs.databricks.com/security/access-control/cluster-acl.html

Access modes

| **Access mode dropdown** | **Access control** | **Visible to user** | **Unity Catalog support** | **Supported languages** |
|--------|--------|--------|--------|--------|
| Single user | Credential pass-through | Always | Yes | Python, SQL, Scala, R |
| Shared | Credential pass-through / Table ACL | Always (**Premium plan required**) | Yes | Python, SQL |
| No isolation shared | Maps all the users to the root account, everything is shared, they can install anything, and changes apply to all users | Admins can hide this cluster type by enforcing user isolation in the admin console | No | Python, SQL, Scala, R |
| Custom | This option will only be shown for existing clusters without access modes | No | Python, SQL, Scala, R |


Below describes how to install custom packages in Compute
- **Install in UI**, can be scoped by Workspace (which can be created and installed on all clusters in the Workspace), Cluster (self-explanatory), or Notebook scope (run %pip install xxx in Notebook)
- **Init script**, can be defined per cluster / set global init script (not recommended as per Databricks best practice)

Pool are reserved idle instances to reduce cluster start-up time. They don't charge DBU, but Cloud Platform instance charges apply however

## Workflow

Job permissions

- A job cannot have a group as an owner
- Jobs triggered through Run Now assume the permissions of the **job owner** 

| **Ability** | **No Permissions** | **Can View** | **Can Manage Run** | **Is Owner** | **Can Manage** |
|--------|--------|--------|--------|--------|--------|
| View job details and settings | x | x | x | x | x|
| View results, Spark UI, logs of a job run | | x | x | x | x|
| Run now | | | x | x | x|
| Cancel run | | | x | x | x|
| Edit job settings | | | | x | x|
| Modify permissions | | | | x | x|
| Delete job | | | | x | x|
| Change owner | | | | | |

Notes
- The use of **shared job cluster** is recommended because:
  - The cluster can be reused for multiple jobs and reducing the start-up time
  - All-purpose compute does not auto-terminate when job finishes
- The workflow structure of A -> [B, C] , where A set Spark Configuration for storage credentials for subsequent consumption by B and C, does not work

## Unity Catalog

Data Sharing
- Metastore is a regional resource, sharing metastore across regions is done via Delta Share
- It might be tempting to create external tables in metastore B to query tables managed in metastore A, but this is strongly advised against because upstream changes in tables recognized in metastore A will not be propagated across
- Cross-region/cloud queries could incur additional egress charge. For frequently queried tables, making a direct copy is preferred

## Miscellaneous

### Adding Commit Messages

Delta Lake supports arbitrary commit messages that will be recorded to the Delta transaction log and viewable in the table history. This can help with later auditing.

Setting this with SQL will create a global commit message that will be used for all subsequent operations in our notebook.

In [None]:
SET spark.databricks.delta.commitInfo.userMetadata=Deletes committed

With DataFrames, commit messages can also be specified as part of the write options using the **`userMetadata`** option.

Here, we'll indicate that we're manually processing these requests in a notebook, rather than using an automated job.

In [None]:
query = (requests_df.writeStream
                    .option("checkpointLocation", f"{DA.paths.checkpoints}/delete_requests")
                    .option("userMetadata", "Requests processed interactively")
                    .trigger(availableNow=True)
                    .table("delete_requests"))