# SETUP

In [0]:
import random
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
db_name = "deltadb"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
spark.sql(f"USE DATABASE {db_name}")

Out[2]: DataFrame[]

In [0]:
# Disable format check
spark.sql("SET spark.databricks.delta.formatCheck.enabled = false")
# Optimize writing
spark.sql("SET spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true")

Out[4]: DataFrame[key: string, value: string]

In [0]:
def my_checkpoint_dir(): 
     return "/tmp/delta_demo/chkpt/%s" % str(random.randint(0, 10000))

# User-defined function to generate random state
@udf(returnType=StringType())
def random_state():
    return str(random.choice(["CA", "TX", "NY", "WA"]))


# Function to start a streaming query with a stream of randomly generated load data and append to the parquet table
def generate_and_append_data_stream(table_format, table_name, schema_ok=False, type="batch"):
  
    stream_data = (spark.readStream.format("rate").option("rowsPerSecond", 500).load()
                        .withColumn("loan_id", 10000 + col("value"))
                        .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer"))
                        .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000))
                        .withColumn("addr_state", random_state())
                        .withColumn("type", lit(type)))

    if schema_ok:
        stream_data = stream_data.select("loan_id", "funded_amnt", "paid_amnt", "addr_state", "type", "timestamp")
      
    query = (stream_data.writeStream
                        .format(table_format)
                        .option("checkpointLocation", my_checkpoint_dir())
                        .trigger(processingTime = "5 seconds")
                        .table(table_name))

    return query

In [0]:
# Function to stop all streaming queries 
def stop_all_streams():
    print("Stopping all streams")
    for s in spark.streams.active:
        try:
            s.stop()
        except:
            pass
    print("Stopped all streams")
    dbutils.fs.rm("/tmp/delta_demo/chkpt/", True)


def cleanup_paths_and_tables():
    dbutils.fs.rm("/tmp/delta_demo/", True)
    dbutils.fs.rm("file:/dbfs/tmp/delta_demo/loans_parquet/", True)
        
    for table in ["deltadb.loans_parquet", "deltadb.loans_delta", "deltadb.loans_delta2"]:
        spark.sql(f"DROP TABLE IF EXISTS {table}")
    
cleanup_paths_and_tables()

In [0]:
%sh mkdir -p /dbfs/tmp/delta_demo/loans_parquet/; wget -O /dbfs/tmp/delta_demo/loans_parquet/loans.parquet https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet

--2022-09-04 23:23:40--  https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet
Resolving pages.databricks.com (pages.databricks.com)... 104.17.70.206, 104.17.71.206, 104.17.72.206, ...
Connecting to pages.databricks.com (pages.databricks.com)|104.17.70.206|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 164631 (161K) [text/plain]
Saving to: ‘/dbfs/tmp/delta_demo/loans_parquet/loans.parquet’

     0K .......... .......... .......... .......... .......... 31% 4.79M 0s
    50K .......... .......... .......... .......... .......... 62% 2.80M 0s
   100K .......... .......... .......... .......... .......... 93% 9.90M 0s
   150K ..........                                            100% 11.9M=0.03s

2022-09-04 23:23:41 (4.69 MB/s) - ‘/dbfs/tmp/delta_demo/loans_parquet/loans.parquet’ saved [164631/164631]



In [0]:
# Batch data
parquet_path = "file:/dbfs/tmp/delta_demo/loans_parquet/"

# Read parquet file downloaded from https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet
df = (spark.read.format("parquet").load(parquet_path)
                .withColumn("type", lit("batch"))
                .withColumn("timestamp", current_timestamp()))

# Save spark dataframe as Delta table in dbfs
df.write.format("delta").mode("overwrite").saveAsTable("loans_delta")


In [0]:
%fs ls file:/dbfs/tmp/delta_demo/loans_parquet/

path,name,size,modificationTime
file:/dbfs/tmp/delta_demo/loans_parquet/loans.parquet,loans.parquet,164631,1661645004000


In [0]:
%sql 
-- REPEAT THE PREVIOUS STEP BUT USING SQL. CREATE A DELTA TABLE 
CREATE TABLE loans_delta2
USING DELTA AS
SELECT * FROM parquet.`file:/dbfs/tmp/delta_demo/loans_parquet`

num_affected_rows,num_inserted_rows


In [0]:
%sql
-- Convert parquet files to Delta Lake format in place
CONVERT TO DELTA parquet.`file:/dbfs/tmp/delta_demo/loans_parquet`

In [0]:
%fs ls file:/dbfs/tmp/delta_demo/loans_parquet/

path,name,size,modificationTime
file:/dbfs/tmp/delta_demo/loans_parquet/_delta_log/,_delta_log/,4096,1662334304167
file:/dbfs/tmp/delta_demo/loans_parquet/loans.parquet,loans.parquet,164631,1661645004000


In [0]:
spark.sql("SELECT COUNT(*) FROM loans_delta").show()
spark.sql("SELECT COUNT(*) FROM loans_delta2").show()

+--------+
|count(1)|
+--------+
|   14705|
+--------+

+--------+
|count(1)|
+--------+
|   14705|
+--------+



In [0]:
# Set up 2 streaming writes to our table
stream_query_A = generate_and_append_data_stream(table_format="delta", table_name="loans_delta", schema_ok=True, type='stream A')
stream_query_B = generate_and_append_data_stream(table_format="delta", table_name="loans_delta", schema_ok=True, type='stream B')

In [0]:
# Streaming read #1
display(spark.readStream.format("delta").table("loans_delta").groupBy("type").count().orderBy("type"))

type,count
batch,14705
stream A,417500
stream B,417000


In [0]:
# Streaming read #2
display(spark.readStream.format("delta").table("loans_delta").groupBy("type", window("timestamp", "10 seconds")).count().orderBy("window"))

type,window,count
batch,"List(2022-09-04T22:58:00.000+0000, 2022-09-04T22:58:10.000+0000)",14705
stream A,"List(2022-09-04T23:35:20.000+0000, 2022-09-04T23:35:30.000+0000)",2195
stream B,"List(2022-09-04T23:35:20.000+0000, 2022-09-04T23:35:30.000+0000)",1510
stream A,"List(2022-09-04T23:35:30.000+0000, 2022-09-04T23:35:40.000+0000)",5000
stream B,"List(2022-09-04T23:35:30.000+0000, 2022-09-04T23:35:40.000+0000)",5000
stream B,"List(2022-09-04T23:35:40.000+0000, 2022-09-04T23:35:50.000+0000)",5000
stream A,"List(2022-09-04T23:35:40.000+0000, 2022-09-04T23:35:50.000+0000)",5000
stream A,"List(2022-09-04T23:35:50.000+0000, 2022-09-04T23:36:00.000+0000)",5000
stream B,"List(2022-09-04T23:35:50.000+0000, 2022-09-04T23:36:00.000+0000)",5000
stream A,"List(2022-09-04T23:36:00.000+0000, 2022-09-04T23:36:10.000+0000)",5000


In [0]:
spark.sql("SELECT COUNT(*) FROM loans_delta").show()
spark.sql("SELECT DISTINCT type FROM loans_delta LIMIT 10").show()

+--------+
|count(1)|
+--------+
|  972205|
+--------+

+--------+
|    type|
+--------+
|stream B|
|stream A|
|   batch|
+--------+



In [0]:
%sql
SELECT addr_state, COUNT(*)
FROM loans_delta
GROUP BY addr_state

addr_state,count(1)
CA,37356
WA,35474
NY,36052
TX,36050
AZ,329
SC,174
LA,167
MN,256
NJ,541
DC,38


In [0]:
dbutils.notebook.exit("stop")

stop

In [0]:
stop_all_streams()

Stopping all streams
Stopped all streams


In [0]:
%sql
--- VIEW THE DELTA LAKE TRANSACTION LOG
DESCRIBE HISTORY loans_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
50,2022-09-04T23:51:29.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 6746ce3c-2190-4aeb-8c65-464cbf17491f, epochId -> 24)",,List(508300667251686),0904-211633-11rrocdz,48.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 18000, numOutputBytes -> 418341, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
49,2022-09-04T23:51:28.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 514bec85-90c1-4a91-95ac-4b678b6e0dce, epochId -> 24)",,List(508300667251686),0904-211633-11rrocdz,48.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 17500, numOutputBytes -> 407680, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
48,2022-09-04T23:50:53.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 6746ce3c-2190-4aeb-8c65-464cbf17491f, epochId -> 23)",,List(508300667251686),0904-211633-11rrocdz,46.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 26000, numOutputBytes -> 591851, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
47,2022-09-04T23:50:52.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 514bec85-90c1-4a91-95ac-4b678b6e0dce, epochId -> 23)",,List(508300667251686),0904-211633-11rrocdz,46.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 26000, numOutputBytes -> 591824, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
46,2022-09-04T23:50:04.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 6746ce3c-2190-4aeb-8c65-464cbf17491f, epochId -> 22)",,List(508300667251686),0904-211633-11rrocdz,44.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 18000, numOutputBytes -> 418410, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
45,2022-09-04T23:50:02.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 514bec85-90c1-4a91-95ac-4b678b6e0dce, epochId -> 22)",,List(508300667251686),0904-211633-11rrocdz,44.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 17500, numOutputBytes -> 407688, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
44,2022-09-04T23:49:27.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 514bec85-90c1-4a91-95ac-4b678b6e0dce, epochId -> 21)",,List(508300667251686),0904-211633-11rrocdz,42.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 27500, numOutputBytes -> 624219, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
43,2022-09-04T23:49:26.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 6746ce3c-2190-4aeb-8c65-464cbf17491f, epochId -> 21)",,List(508300667251686),0904-211633-11rrocdz,42.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 27500, numOutputBytes -> 624349, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
42,2022-09-04T23:48:33.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 514bec85-90c1-4a91-95ac-4b678b6e0dce, epochId -> 20)",,List(508300667251686),0904-211633-11rrocdz,40.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 18000, numOutputBytes -> 418328, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
41,2022-09-04T23:48:32.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 6746ce3c-2190-4aeb-8c65-464cbf17491f, epochId -> 20)",,List(508300667251686),0904-211633-11rrocdz,40.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 24500, numOutputBytes -> 559244, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12


##### Use schema enforcement to protect data quality

In [0]:
# Write dataframe with extra column, "credit_score", to Delta Lake table
new_column = [StructField("credit_score", IntegerType(), True)]

new_schema = StructType(spark.table("loans_delta").schema.fields + new_column)
data = [(99997, 10000, 1338.55, "CA", "batch", datetime.now(), 649),
        (99998, 20000, 1442.55, "NY", "batch", datetime.now(), 702)]

new_data = spark.createDataFrame(data, new_schema)
new_data.printSchema()

root
 |-- loan_id: long (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- paid_amnt: double (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- type: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- credit_score: integer (nullable = true)



In [0]:
new_data.write.format("delta").mode("append").saveAsTable("loans_delta")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-2549487216592855>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mnew_data[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"delta"[0m[0;34m)[0m[0;34m.[0m[0mmode[0m[0;34m([0m[0;34m"append"[0m[0;34m)[0m[0;34m.[0m[0msaveAsTable[0m[0;34m([0m[0;34m"loans_delta"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/readwriter.py[0m in [0;36msaveAsTable[0;34m(self, name, format, mode, partitionBy, **options)[0m
[1;32m    804[0m         [0;32mif[0m [0mformat[0m [0;32mis[0m [0;32mnot[0m [0;32mNone[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m    805[0m             [0mself[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0mformat[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 806[0;31m         

##### Use schema evolution to add new columns to schema

In [0]:
new_data.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("loans_delta")

In [0]:
spark.table('loans_delta').printSchema()

root
 |-- loan_id: long (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- paid_amnt: double (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- type: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- credit_score: integer (nullable = true)



In [0]:
%sql
SELECT * FROM loans_delta
WHERE loan_id IN (99997, 99998);

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp,credit_score
99997,6395,5068.754683799191,WA,stream A,2022-09-04T23:38:25.604+0000,
99998,8493,7087.46838749574,WA,stream A,2022-09-04T23:38:25.606+0000,
99997,7706,6004.825410352112,WA,stream B,2022-09-04T23:38:26.975+0000,
99998,5080,4096.455744645483,NY,stream B,2022-09-04T23:38:26.977+0000,
99997,10000,1338.55,CA,batch,2022-09-05T01:13:22.302+0000,649.0
99998,20000,1442.55,NY,batch,2022-09-05T01:13:22.302+0000,702.0


#### Delta Lake Time Travel

Review Delta Lake table history for Auditing & Governance. You can query snapshots of your table by:
1. Version number
2. Timestamp

In [0]:
%sql
DESCRIBE HISTORY loans_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
51,2022-09-05T01:14:29.000+0000,1057755597611259,matiasvargas598@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(508300667251686),0904-211633-11rrocdz,50.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 2152)",,Databricks-Runtime/10.4.x-scala2.12
50,2022-09-04T23:51:29.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 6746ce3c-2190-4aeb-8c65-464cbf17491f, epochId -> 24)",,List(508300667251686),0904-211633-11rrocdz,48.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 18000, numOutputBytes -> 418341, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
49,2022-09-04T23:51:28.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 514bec85-90c1-4a91-95ac-4b678b6e0dce, epochId -> 24)",,List(508300667251686),0904-211633-11rrocdz,48.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 17500, numOutputBytes -> 407680, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
48,2022-09-04T23:50:53.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 6746ce3c-2190-4aeb-8c65-464cbf17491f, epochId -> 23)",,List(508300667251686),0904-211633-11rrocdz,46.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 26000, numOutputBytes -> 591851, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
47,2022-09-04T23:50:52.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 514bec85-90c1-4a91-95ac-4b678b6e0dce, epochId -> 23)",,List(508300667251686),0904-211633-11rrocdz,46.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 26000, numOutputBytes -> 591824, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
46,2022-09-04T23:50:04.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 6746ce3c-2190-4aeb-8c65-464cbf17491f, epochId -> 22)",,List(508300667251686),0904-211633-11rrocdz,44.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 18000, numOutputBytes -> 418410, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
45,2022-09-04T23:50:02.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 514bec85-90c1-4a91-95ac-4b678b6e0dce, epochId -> 22)",,List(508300667251686),0904-211633-11rrocdz,44.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 17500, numOutputBytes -> 407688, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
44,2022-09-04T23:49:27.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 514bec85-90c1-4a91-95ac-4b678b6e0dce, epochId -> 21)",,List(508300667251686),0904-211633-11rrocdz,42.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 27500, numOutputBytes -> 624219, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
43,2022-09-04T23:49:26.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 6746ce3c-2190-4aeb-8c65-464cbf17491f, epochId -> 21)",,List(508300667251686),0904-211633-11rrocdz,42.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 27500, numOutputBytes -> 624349, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
42,2022-09-04T23:48:33.000+0000,1057755597611259,matiasvargas598@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 514bec85-90c1-4a91-95ac-4b678b6e0dce, epochId -> 20)",,List(508300667251686),0904-211633-11rrocdz,40.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 18000, numOutputBytes -> 418328, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12


In [0]:

# COUNT OF ROWS OF THE VESION 0
spark.sql('SELECT COUNT(*) FirstVersionCount FROM loans_delta VERSION AS OF 0;').show()

# COUNT OF ROWS OF THE LAST VERSION
spark.sql('SELECT COUNT(*) LastVersionCount FROM loans_delta;').show()

+-----------------+
|FirstVersionCount|
+-----------------+
|            14705|
+-----------------+

+----------------+
|LastVersionCount|
+----------------+
|          972207|
+----------------+



##### Roll back a table to a specific version

In [0]:
%sql
RESTORE loans_delta VERSION AS OF 0;

table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
165506,1,49,0,21993901,0


In [0]:
spark.sql('SELECT COUNT(*) LastVersionCount FROM loans_delta;').show()

+----------------+
|LastVersionCount|
+----------------+
|           14705|
+----------------+



### DML Support
Parquet does not support these commands - they are unique to Delta Lake

In [0]:
%sql
SELECT * FROM loans_delta WHERE loan_id=4420

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp
4420,22000,1050.94,TX,batch,2022-09-04T22:58:08.108+0000


In [0]:
%sql
--DELETE COMMAND
DELETE FROM loans_delta WHERE loan_id=4420;
-- Confirm the user's data was deleted
SELECT * FROM loans_delta WHERE loan_id=4420

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp


In [0]:
%sql
--INSERT COMMAND
INSERT INTO loans_delta
SELECT * FROM loans_delta VERSION AS OF 0
WHERE loan_id=4420;

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
-- Confirm the user's data was inserted
SELECT * FROM loans_delta WHERE loan_id=4420

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp
4420,22000,1050.94,TX,batch,2022-09-04T22:58:08.108+0000


In [0]:
%sql
UPDATE loans_delta SET funded_amnt = 50000 WHERE loan_id = 4420;
-- Confirm the user's data was updated
SELECT * FROM loans_delta WHERE loan_id = 4420;

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp
4420,50000,1050.94,TX,batch,2022-09-04T22:58:08.108+0000


###### Support Change Data Capture Workflows & Other Ingest Use Cases via MERGE INTO

In [0]:
# Create merge table with 1 row update, 1 insertion
data = [(4420, 22000, 21500.00, "NY", "update", datetime.now()),  # record to update
        (99999, 10000, 1338.55, "CA", "insert", datetime.now())]  # record to insert

schema = spark.table("loans_delta").schema

spark.createDataFrame(data, schema).createOrReplaceTempView("merge_table")

spark.sql("SELECT * FROM merge_table").show()

+-------+-----------+---------+----------+------+--------------------+
|loan_id|funded_amnt|paid_amnt|addr_state|  type|           timestamp|
+-------+-----------+---------+----------+------+--------------------+
|   4420|      22000|  21500.0|        NY|update|2022-09-05 01:45:...|
|  99999|      10000|  1338.55|        CA|insert|2022-09-05 01:45:...|
+-------+-----------+---------+----------+------+--------------------+



In [0]:
%sql
MERGE INTO loans_delta AS l
USING merge_table AS m
ON l.loan_id = m.loan_id
WHEN MATCHED THEN 
  UPDATE SET *
WHEN NOT MATCHED 
  THEN INSERT *;

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,1,0,1


In [0]:
%sql
SELECT * FROM loans_delta WHERE loan_id IN (4420, 99999)

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp
4420,22000,21500.0,NY,update,2022-09-05T01:45:27.986+0000
99999,10000,1338.55,CA,insert,2022-09-05T01:45:27.986+0000


#### File compaction and performance optimizations = faster queries

###### Vacuum

In [0]:
%sql
-- Vacuum deletes all files no longer needed by the current version of the table.
VACUUM loans_delta;

path
dbfs:/user/hive/warehouse/deltadb.db/loans_delta


###### Cache table in memory (Databricks Delta Lake only) to speed up that query in the future

In [0]:
%sql
CACHE SELECT * FROM loans_delta;

###### Z-Order Optimize (Databricks Delta Lake only)

In [0]:
%sql
OPTIMIZE loans_delta ZORDER BY addr_state;

path,metrics
dbfs:/user/hive/warehouse/deltadb.db/loans_delta,"List(1, 2, List(165524, 165524, 165524.0, 1, 165524), List(1868, 165507, 83687.5, 2, 167375), 0, List(minCubeSize(107374182400), List(0, 0), List(2, 167375), 0, List(2, 167375), 1, null), 1, 2, 0, false)"


In [0]:
cleanup_paths_and_tables()