# Lakehouse Labs World Tour for Data Engineers
Welcome to the Data and AI World Tour for data engineers.  Over the course of this notebook, you will use an open dataset and learn how to:
1. Ingest data that has been landed into cloud storage
2. Transform and store your data in the reliable and performant Delta Lake storage format
3. Use Update, Delete, Merge, Schema Evolution, Time Travel Capabilities, and CDF (Change Data Feed) functionality built in the Delta Lake storage format

## The Use Case
We will analyze United States zip code data that is made available as a public dataset online. You can learn more about the dataset [here](https://simplemaps.com/data/us-zips), but we'll download it automatically below.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
%sql
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;

key,value
spark.databricks.delta.properties.defaults.enableChangeDataFeed,True


In [0]:
%sh
# Pull CSV file from url
wget -nc https://lafkkbox.blob.core.windows.net/worldtourdata/uszips.csv

# Move from databricks/driver to dbfs
mv /databricks/driver/uszips.csv /dbfs/FileStore/uszips.csv

--2022-10-18 13:49:35--  https://lafkkbox.blob.core.windows.net/worldtourdata/uszips.csv
Resolving lafkkbox.blob.core.windows.net (lafkkbox.blob.core.windows.net)... 20.60.59.235
Connecting to lafkkbox.blob.core.windows.net (lafkkbox.blob.core.windows.net)|20.60.59.235|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 6323670 (6.0M) [text/csv]
Saving to: ‘uszips.csv’

     0K .......... .......... .......... .......... ..........  0% 1.07M 6s
    50K .......... .......... .......... .......... ..........  1% 2.12M 4s
   100K .......... .......... .......... .......... ..........  2% 2.13M 4s
   150K .......... .......... .......... .......... ..........  3% 2.11M 3s
   200K .......... .......... .......... .......... ..........  4%  198M 3s
   250K .......... .......... .......... .......... ..........  4% 2.15M 3s
   300K .......... .......... .......... .......... ..........  5%  157M 2s
   350K .......... .......... .......... .......... ..........  6% 2.16M 

In [0]:
# mv: cannot move '/databricks/driver/uszips.csv' to '/dbfs/FileStore/uszips.csv': No such file or directory

In [0]:
#dbutils.fs.mv("file:/databricks/driver/uszips.csv", "dbfs:/FileStore/uszips.csv", recurse=True)

[0;31m---------------------------------------------------------------------------[0m
[0;31mExecutionError[0m                            Traceback (most recent call last)
[0;32m<command-4367306520264190>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mdbutils[0m[0;34m.[0m[0mfs[0m[0;34m.[0m[0mmv[0m[0;34m([0m[0;34m"file:/databricks/driver/uszips.csv"[0m[0;34m,[0m [0;34m"dbfs:/FileStore/uszips.csv"[0m[0;34m,[0m [0mrecurse[0m[0;34m=[0m[0;32mTrue[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/python_shell/dbruntime/dbutils.py[0m in [0;36mf_with_exception_handling[0;34m(*args, **kwargs)[0m
[1;32m    388[0m                     [0mexc[0m[0;34m.[0m[0m__context__[0m [0;34m=[0m [0;32mNone[0m[0;34m[0m[0;34m[0m[0m
[1;32m    389[0m                     [0mexc[0m[0;34m.[0m[0m__cause__[0m [0;34m=[0m [0;32mNone[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 390[0;31m                     [0;32mraise[0m [0mexc

In [0]:
%fs ls 'dbfs:/FileStore/'

path,name,size,modificationTime
dbfs:/FileStore/tables/,tables/,0,1665498302000
dbfs:/FileStore/uszips/,uszips/,0,1665751836000
dbfs:/FileStore/uszips.csv,uszips.csv,6323670,1666100976000


In [0]:
%fs head 'dbfs:/FileStore/uszips.csv'

In [0]:
# CSV schema
schema = "zip string,lat double,lng double,city string,state_id string,state_name string,zcta boolean,parent_zcta string,population integer,density double,county_fips integer,county_name string,county_weights string,county_names_all string,county_fips_all string,imprecise boolean,military boolean,timezone string"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format("csv") \
  .option("header", "true") \
  .option("sep", ",") \
  .option("escape", "\"") \
  .schema(schema) \
  .load("dbfs:/FileStore")

display(df)

# write data out in Delta format
# we'll skip writing this data for now as we'll use another ingestion pattern
# df.write.format("delta").partitionBy('state_id').save('/mnt/delta/uszips_delta')

zip,lat,lng,city,state_id,state_name,zcta,parent_zcta,population,density,county_fips,county_name,county_weights,county_names_all,county_fips_all,imprecise,military,timezone
601,18.18005,-66.75218,Adjuntas,PR,Puerto Rico,True,,17113.0,102.7,72001,Adjuntas,"{""72001"": ""99.43"", ""72141"": ""0.57""}",Adjuntas|Utuado,72001|72141,False,False,America/Puerto_Rico
602,18.36074,-67.17519,Aguada,PR,Puerto Rico,True,,37751.0,476.0,72003,Aguada,"{""72003"": ""100""}",Aguada,72003,False,False,America/Puerto_Rico
603,18.4544,-67.12201,Aguadilla,PR,Puerto Rico,True,,47081.0,574.9,72005,Aguadilla,"{""72005"": ""100""}",Aguadilla,72005,False,False,America/Puerto_Rico
606,18.16721,-66.93828,Maricao,PR,Puerto Rico,True,,6392.0,58.3,72093,Maricao,"{""72093"": ""94.88"", ""72153"": ""3.78"", ""72121"": ""1.35""}",Maricao|Yauco|Sabana Grande,72093|72153|72121,False,False,America/Puerto_Rico
610,18.29032,-67.12244,Anasco,PR,Puerto Rico,True,,26686.0,286.9,72011,Añasco,"{""72011"": ""99.45"", ""72003"": ""0.55""}",Añasco|Aguada,72011|72003,False,False,America/Puerto_Rico
612,18.40699,-66.70805,Arecibo,PR,Puerto Rico,True,,59369.0,339.1,72013,Arecibo,"{""72013"": ""99.89"", ""72017"": ""0.11""}",Arecibo|Barceloneta,72013|72017,False,False,America/Puerto_Rico
616,18.41752,-66.66814,Bajadero,PR,Puerto Rico,True,,10022.0,335.6,72013,Arecibo,"{""72013"": ""100""}",Arecibo,72013,False,False,America/Puerto_Rico
617,18.44125,-66.55916,Barceloneta,PR,Puerto Rico,True,,23750.0,603.5,72017,Barceloneta,"{""72017"": ""99.43"", ""72054"": ""0.57""}",Barceloneta|Florida,72017|72054,False,False,America/Puerto_Rico
622,17.99174,-67.15248,Boqueron,PR,Puerto Rico,True,,6741.0,89.8,72023,Cabo Rojo,"{""72023"": ""100""}",Cabo Rojo,72023,False,False,America/Puerto_Rico
623,18.08354,-67.15418,Cabo Rojo,PR,Puerto Rico,True,,41746.0,424.5,72023,Cabo Rojo,"{""72023"": ""100""}",Cabo Rojo,72023,False,False,America/Puerto_Rico


<!-- #DATA ENGINEERING AND STREAMING ARCHITECTURE -->
<img src="https://publicimg.blob.core.windows.net/images/DE and Streaming2.png" width="1200">

####Auto Loader, COPY INTO, and Incrementally Ingesting Data
[Auto Loader](https://docs.databricks.com/spark/latest/structured-streaming/auto-loader-gen2.html) and [COPY INTO](https://docs.databricks.com/ingestion/copy-into/index.html) are two methods of ingesting data into a Delta Lake table from a folder in a Data Lake. “Yeah, so... Why is that so special?”, you may ask. The reason these features are special is that they make it possible to ingest data directly from a data lake incrementally, in an idempotent way, without needing a distributed streaming system like Kafka. This can considerably simplify the Incremental ETL process. It is also an extremely efficient way to ingest data since you are only ingesting new data and not reprocessing data that already exists. Below is an Incremental ETL architecture. We will focus on the left hand side, ingesting into tables from outside sources. 

You can incrementally ingest data either continuously or scheduled in a job. COPY INTO and Auto Loader cover both cases and we will show you how below.

<img src="https://databricks.com/wp-content/uploads/2021/07/get-start-delta-blog-img-1.png" width=1000>

### Autoloader
In this notebook we will use Auto Loader for a basic ingest use case with [schema inference and evolution](https://docs.databricks.com/spark/latest/structured-streaming/auto-loader-gen2.html#schema-inference-and-evolution).  
Notice how the schema is inferred, but we can provide schema hints for certain columns also.

In [0]:
spark.sql("DROP DATABASE IF EXISTS population CASCADE")
dbutils.fs.rm("/mnt/delta/", recurse=True)
spark.sql("CREATE DATABASE IF NOT EXISTS population LOCATION '/mnt/delta/'")

Out[5]: DataFrame[]

In [0]:
# "cloudFiles" indicates the use of Auto Loader

dfBronze = spark.readStream.format("cloudFiles") \
  .option('cloudFiles.format', 'csv') \
  .option('header','true') \
  .option("cloudFiles.schemaLocation", "/mnt/delta/checkpoint/uszips_delta/") \
  .option("cloudFiles.schemaHints", "zip string, population integer, density double") \
  .load("dbfs:/FileStore")

# The stream will shut itself off when it is finished based on the trigger once feature
# The checkpoint location saves the state of the ingest when it is shut off so we know where to pick up next time
# Notice that we can partition our data as well.  Partioning isn't recommended unless our data is over a GB in size.  Use Optimize and ZOrder instead which is explained below.
dfBronze.writeStream \
  .format("delta") \
  .partitionBy('state_id') \
  .trigger(once=True) \
  .option("checkpointLocation", "/mnt/delta/checkpoint/uszips_delta/") \
  .start("/mnt/delta/uszips_delta/")

Out[13]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f0b9ce6eaf0>

In [0]:
%sql
CREATE TABLE IF NOT EXISTS population.uszips
 
USING DELTA 
LOCATION '/mnt/delta/uszips_delta/'

In [0]:
%sql
DESCRIBE EXTENDED population.uszips 

col_name,data_type,comment
zip,string,
lat,string,
lng,string,
city,string,
state_id,string,
state_name,string,
zcta,string,
parent_zcta,string,
population,int,
density,double,


In [0]:
%fs ls "/mnt/delta/uszips_delta/"

path,name,size,modificationTime
dbfs:/mnt/delta/uszips_delta/_delta_log/,_delta_log/,0,1665954226000
dbfs:/mnt/delta/uszips_delta/state_id=AK/,state_id=AK/,0,1665954208000
dbfs:/mnt/delta/uszips_delta/state_id=AL/,state_id=AL/,0,1665954211000
dbfs:/mnt/delta/uszips_delta/state_id=AR/,state_id=AR/,0,1665954211000
dbfs:/mnt/delta/uszips_delta/state_id=AS/,state_id=AS/,0,1665954211000
dbfs:/mnt/delta/uszips_delta/state_id=AZ/,state_id=AZ/,0,1665954212000
dbfs:/mnt/delta/uszips_delta/state_id=CA/,state_id=CA/,0,1665954212000
dbfs:/mnt/delta/uszips_delta/state_id=CO/,state_id=CO/,0,1665954212000
dbfs:/mnt/delta/uszips_delta/state_id=CT/,state_id=CT/,0,1665954212000
dbfs:/mnt/delta/uszips_delta/state_id=DC/,state_id=DC/,0,1665954212000


In [0]:
%sql
SELECT * FROM delta.`/mnt/delta/uszips_delta/`

zip,lat,lng,city,state_id,state_name,zcta,parent_zcta,population,density,county_fips,county_name,county_weights,county_names_all,county_fips_all,imprecise,military,timezone,_rescued_data
35004,33.60281,-86.49612,Moody,AL,Alabama,True,,12045,257.3,1115,St. Clair,"""{""""01115"""": """"100""""}""",St. Clair,01115,FALSE,FALSE,America/Chicago,
35005,33.59515,-87.00089,Adamsville,AL,Alabama,True,,7344,82.5,1073,Jefferson,"""{""""01073"""": """"100""""}""",Jefferson,01073,FALSE,FALSE,America/Chicago,
35006,33.42922,-87.19708,Adger,AL,Alabama,True,,2883,11.2,1073,Jefferson,"""{""""01073"""": """"98.4""""","""""01127"""": """"1.35""""","""""01125"""": """"0.26""""}""",Jefferson|Walker|Tuscaloosa,01073|01127|01125,FALSE,
35007,33.21591,-86.79717,Alabaster,AL,Alabama,True,,26332,271.9,1117,Shelby,"""{""""01117"""": """"100""""}""",Shelby,01117,FALSE,FALSE,America/Chicago,
35010,32.91644,-85.9368,Alexander City,AL,Alabama,True,,20613,36.6,1123,Tallapoosa,"""{""""01123"""": """"92.16""""","""""01037"""": """"6.56""""","""""01051"""": """"1.27""""}""",Tallapoosa|Coosa|Elmore,01123|01037|01051,FALSE,
35013,33.90164,-86.51785,Allgood,AL,Alabama,True,,46,627.1,1009,Blount,"""{""""01009"""": """"100""""}""",Blount,01009,FALSE,FALSE,America/Chicago,
35014,33.35989,-86.26918,Alpine,AL,Alabama,True,,4963,20.1,1121,Talladega,"""{""""01121"""": """"100""""}""",Talladega,01121,FALSE,FALSE,America/Chicago,
35016,34.32333,-86.50142,Arab,AL,Alabama,True,,17208,89.4,1095,Marshall,"""{""""01095"""": """"86.93""""","""""01043"""": """"11.29""""","""""01009"""": """"1.78""""}""",Marshall|Cullman|Blount,01095|01043|01009,FALSE,
35019,34.29852,-86.63523,Baileyton,AL,Alabama,True,,2492,32.2,1043,Cullman,"""{""""01043"""": """"78.51""""","""""01103"""": """"21.49""""}""",Cullman|Morgan,01043|01103,FALSE,FALSE,
35020,33.40271,-86.95158,Bessemer,AL,Alabama,True,,25757,521.4,1073,Jefferson,"""{""""01073"""": """"100""""}""",Jefferson,01073,FALSE,FALSE,America/Chicago,


In [0]:
%sql
SELECT * FROM population.uszips

zip,lat,lng,city,state_id,state_name,zcta,parent_zcta,population,density,county_fips,county_name,county_weights,county_names_all,county_fips_all,imprecise,military,timezone,_rescued_data
35004,33.60281,-86.49612,Moody,AL,Alabama,True,,12045,257.3,1115,St. Clair,"""{""""01115"""": """"100""""}""",St. Clair,01115,FALSE,FALSE,America/Chicago,
35005,33.59515,-87.00089,Adamsville,AL,Alabama,True,,7344,82.5,1073,Jefferson,"""{""""01073"""": """"100""""}""",Jefferson,01073,FALSE,FALSE,America/Chicago,
35006,33.42922,-87.19708,Adger,AL,Alabama,True,,2883,11.2,1073,Jefferson,"""{""""01073"""": """"98.4""""","""""01127"""": """"1.35""""","""""01125"""": """"0.26""""}""",Jefferson|Walker|Tuscaloosa,01073|01127|01125,FALSE,
35007,33.21591,-86.79717,Alabaster,AL,Alabama,True,,26332,271.9,1117,Shelby,"""{""""01117"""": """"100""""}""",Shelby,01117,FALSE,FALSE,America/Chicago,
35010,32.91644,-85.9368,Alexander City,AL,Alabama,True,,20613,36.6,1123,Tallapoosa,"""{""""01123"""": """"92.16""""","""""01037"""": """"6.56""""","""""01051"""": """"1.27""""}""",Tallapoosa|Coosa|Elmore,01123|01037|01051,FALSE,
35013,33.90164,-86.51785,Allgood,AL,Alabama,True,,46,627.1,1009,Blount,"""{""""01009"""": """"100""""}""",Blount,01009,FALSE,FALSE,America/Chicago,
35014,33.35989,-86.26918,Alpine,AL,Alabama,True,,4963,20.1,1121,Talladega,"""{""""01121"""": """"100""""}""",Talladega,01121,FALSE,FALSE,America/Chicago,
35016,34.32333,-86.50142,Arab,AL,Alabama,True,,17208,89.4,1095,Marshall,"""{""""01095"""": """"86.93""""","""""01043"""": """"11.29""""","""""01009"""": """"1.78""""}""",Marshall|Cullman|Blount,01095|01043|01009,FALSE,
35019,34.29852,-86.63523,Baileyton,AL,Alabama,True,,2492,32.2,1043,Cullman,"""{""""01043"""": """"78.51""""","""""01103"""": """"21.49""""}""",Cullman|Morgan,01043|01103,FALSE,FALSE,
35020,33.40271,-86.95158,Bessemer,AL,Alabama,True,,25757,521.4,1073,Jefferson,"""{""""01073"""": """"100""""}""",Jefferson,01073,FALSE,FALSE,America/Chicago,


### Creating Additional Reporting Tables

In [0]:
%sql
CREATE TABLE IF NOT EXISTS population.states 
USING DELTA 
LOCATION '/mnt/delta/states_delta/'
AS
SELECT DISTINCT state_id, state_name FROM population.uszips

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM population.states 

state_id,state_name
IL,Illinois
CA,California
MI,Michigan
NY,New York
IA,Iowa
TX,Texas
MN,Minnesota
PA,Pennsylvania
VA,Virginia
MO,Missouri


In [0]:
%sql
CREATE TABLE IF NOT EXISTS population.county 
USING DELTA 
LOCATION '/mnt/delta/county_delta/'
AS
SELECT DISTINCT state_id, county_fips, county_name FROM population.uszips

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM population.county 

state_id,county_fips,county_name
PA,42117,Tioga
IL,17157,Randolph
CA,6067,Sacramento
OH,39121,Noble
MO,29041,Chariton
IA,19045,Clinton
MI,26095,Luce
VA,51550,Chesapeake
MN,27047,Freeborn
MN,27161,Waseca


###Creating Silver/Gold tables
Create an aggregate table usign PySpark.  This could have also been done using SQL, Scala, or R.  
Notice that we're able to write the table to a Delta path and register the table in one line.

In [0]:
city_county_df = spark.read.format("delta").load("/mnt/delta/uszips_delta/") \
  .groupBy("state_id", "city", "county_fips").sum('population') \
  .withColumnRenamed("sum(population)", "populationSum")

city_county_df.write.format("delta").mode('overwrite').option('path', '/mnt/delta/city_delta/').saveAsTable('population.city')

In [0]:
%sql
ALTER TABLE population.city SET TBLPROPERTIES (delta.enableChangeDataFeed=true)



In [0]:
%sql
SELECT * FROM population.city



### Perform DML operations, Schema Evolution and Time Travel
#####Delta Lake supports standard DML including UPDATE, DELETE and MERGE INTO providing data engineers more controls to manage their big datasets.

### DELETE Support

In [0]:
%sql
DELETE FROM population.city WHERE populationSum <= 10000



In [0]:
%sql
SELECT * FROM population.city WHERE populationSum <= 10000



### UPDATE Support

In [0]:
%sql
UPDATE population.city SET populationSum = 19999 WHERE populationSum BETWEEN 10000 AND 20000



In [0]:
%sql
SELECT * FROM population.city WHERE populationSum BETWEEN 10000 AND 20000



###MERGE Support
We can merge directly into a Delta Lake table and perform Inserts, Updates, and Deletes in one simple statement.  
This is supported in multiple languages, but we'll perform the command in SQL.

### Schema Evolution
With the `autoMerge` option, you can evolve your Delta Lake table schema seamlessly inside the ETL pipeline. New columns will automatically be added to your table.

In [0]:
%sql
set spark.databricks.delta.schema.autoMerge.enabled = true;



In [0]:
%sql
-- Our CTE has a new column called densityAvg
WITH merge_cte AS (
SELECT state_id, city, county_fips, SUM(population) AS populationSum, AVG(density) AS densityAvg
FROM population.uszips
WHERE state_id <> 'OH'
GROUP BY state_id, city, county_fips
)

MERGE INTO population.city as d
USING merge_cte as m
on d.state_id = m.state_id AND d.city = m.city AND d.county_fips = m.county_fips
WHEN MATCHED THEN 
  UPDATE SET *
WHEN NOT MATCHED 
  THEN INSERT *



In [0]:
%sql
--Notice our new column has been seamlessly added to the table
SELECT * FROM population.city WHERE populationSum <= 10000



In [0]:
%sql
--Notice our new column has been seamlessly added to the table
SELECT * FROM population.city WHERE populationSum BETWEEN 10000 AND 20000



###Let's travel back in time with Time Travel!
Databricks Delta’s time travel capabilities simplify building data pipelines for the following use cases. 

* Audit Data Changes
* Reproduce experiments & reports
* Rollbacks

As you write into a Delta table or directory, every operation is automatically versioned.

You can query by:
1. Using a timestamp
1. Using a version number

using Python, Scala, and/or Scala syntax; for these examples we will use the SQL syntax.  

For more information, refer to [Introducing Delta Time Travel for Large Scale Data Lakes](https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html)

In [0]:
%sql
DESCRIBE HISTORY population.city



####  Time Travel via Version Number
Below are SQL syntax examples of Delta Time Travel by using a Version Number

In [0]:
%sql
SELECT * FROM population.city VERSION AS OF 2



### We can even roll back a table to a previous version

In [0]:
%sql
RESTORE TABLE population.city TO VERSION AS OF 2



In [0]:
%sql
SELECT * FROM population.city



### Or even create shallow or deep clones of a table (backups or testing tables)

In [0]:
%sql
CREATE OR REPLACE TABLE population.city_bak
DEEP CLONE population.city
LOCATION '/mnt/delta/city_delta_bak'



In [0]:
%sql
SELECT * FROM population.city_bak



### Simplify Your Medallion Architecture with Delta Lake’s CDF Feature

### Overview
The medallion architecture takes raw data landed from source systems and refines the data through bronze, silver and gold tables. It is an architecture that the MERGE operation and log versioning in Delta Lake make possible. Change data capture (CDC) is a use case that we see many customers implement in Databricks. The [Change Data Feed](https://docs.databricks.com/delta/delta-change-data-feed.html) (CDF) feature in Delta Lake makes this architecture even simpler to implement!

CDF makes it possible to detect data changes between versions or timestamps of a Delta table. This allows us to build ETL pipelines that can be naturally incremental in nature. The CDF table_changes function lets us analyze the changes (inserts, updates, and deletes) that have occured on a Delta table which we can then use to funnel changes to downstream consumers.  

<img src="https://databricks.com/wp-content/uploads/2021/05/cdf-blog-img-1-rev.png" width=600>

In [0]:
%fs ls '/mnt/delta/city_delta'



In [0]:
%sql
SELECT * FROM table_changes('population.city', 1)



### IDENTITY COLUMNS
Delta Lake now supports identity columns. When you write to a Delta table that defines an identity column, and you do not provide values for that column, Delta automatically assigns a unique and statistically increasing or decreasing value.

In [0]:
%sql
CREATE TABLE population.dim_states
( state_sk BIGINT GENERATED ALWAYS AS IDENTITY,
  state_id STRING, 
  state_name STRING
)
USING DELTA
LOCATION '/mnt/delta/dim_states_delta'



In [0]:
%sql
INSERT INTO population.dim_states (state_id, state_name) 
SELECT * FROM population.states



In [0]:
%sql
SELECT * FROM population.dim_states



#####  OPTIMIZE
OPTIMIZE optimizes the layout of Delta Lake data. Optionally, optimize a subset of data or colocate data by column. If you do not specify colocation, bin-packing optimization is performed.

In [0]:
%sql 
OPTIMIZE population.city ZORDER BY (state_id)



###But Wait, There's Even More in Delta Lake!
We've explored a lot of the awesome powers of the [Delta Lake](https://docs.databricks.com/delta/index.html) storage format, but there is even more capabilities that Delta Lake provides.  
Check the docs to explore even more functinality like:
- Unified Batch and Streaming directly from Delta Lake tables
- Constraints
- Primary Keys and Foreign Keys
- Information Schema views
- Automatic capture of table statistics for data skipping
- Computed/generated columns