# Delta Lake Time Machine & Optimization Lab

Delta Lake allows you to read, write and query data in data lakes in an efficient manner.

## In this lesson you:
* Compare different versions of a Delta table using Time Machine
* Optimize your Delta Lake to increase speed and reduce number of files

## Audience
* Primary Audience: Data Engineers
* Secondary Audience: Data Analysts and Data Scientists

## Prerequisites
* Web browser: current versions of Google Chrome, Firefox, Safari, Microsoft Edge and
Internet Explorer 11 on Windows 7, 8, or 10 (see <a href="https://docs.databricks.com/user-guide/supported-browsers.html#supported-browsers#" target="_blank">Supported Web Browsers</a>)
* Databricks Runtime 4.2 or greater
## Datasets Used
We will use online retail datasets from `/mnt/training/online_retail`

In [0]:
%run "./Includes/Classroom-Setup"

In [0]:
# Mount "/mnt/training" again using "%run "./Includes/Dataset-Mounts-New"" if it is failed in "./Includes/Classroom-Setup"
try:
    files = dbutils.fs.ls("/mnt/training")
except:
    dbutils.fs.unmount('/mnt/training/')


/mnt/training/ has been unmounted.


In [0]:
%run "./Includes/Dataset-Mounts-New"

Because we'll be calculating some aggregates in this notebook, we'll change our partitions after shuffle from the default `200` to `8` (which is a good number for the 8 node cluster we're currently working on).

In [0]:
%python

sqlContext.setConf("spark.sql.shuffle.partitions", "8")

### Check for our previous Delta Lake tables

This lab relies upon some tables created in previous Delta Lake lessons and labs. 

If you get an error from either of the next two SQL queries, running the solution code for the "Delta-Lake-Lab-1" will build all necessary tables.

In [0]:
%sql

SELECT COUNT(*) FROM customer_counts;
SELECT COUNT(*) FROM customer_data_delta;

count(1)
165544


-sandbox
<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> **The following cell will take several minutes to execute, and is only necessary to run if you got an error in the previous cell.**

In [0]:
%run "./Includes/Delta-Lab-2-Prep"

/mnt/training/ has been unmounted.


Datasets are mounted


True

DataFrame[]

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
14,4,0,10


DataFrame[]

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
38138,14,0,38124


For convenience later in this lab, the paths to the files defining our existing Delta tables are provided. You can use these paths to load the data into DataFrames, if desired, though this entire lab can be completed using SQL on the existant tables.

In [0]:
DeltaPath = userhome + "/delta/customer-data/"
CustomerCountsPath = userhome + "/delta/customer_counts/"

**Note: This lab depends upon the complete exectuion of the notebook titled "Open-Source-Delta-Lake" and the "Delta-Lake-Basics" lab. If these tables don't exist, go back and run all cells in these notebook.**

### Time Travel
Because Delta Lake is version controlled, you have the option to query past versions of the data. Let's look at the history of our current Delta table.

In [0]:
%sql
DESCRIBE HISTORY customer_data_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2023-05-21T18:38:49.000+0000,1497134490897596,vishal.abnave@borregaard.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3177618832289077),0519-083853-7h03etr9,2.0,WriteSerializable,True,"Map(numFiles -> 43, numOutputRows -> 99999, numOutputBytes -> 1696572)",,Databricks-Runtime/13.0.x-cpu-ml-scala2.12
2,2023-05-21T18:38:23.000+0000,1497134490897596,vishal.abnave@borregaard.com,MERGE,"Map(predicate -> ((CAST(spark_catalog.vishal_abnave_borregaard_com_db.customer_data_delta.InvoiceNo AS BIGINT) = upsert_data.InvoiceNo) AND (spark_catalog.vishal_abnave_borregaard_com_db.customer_data_delta.StockCode = upsert_data.StockCode)), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}], notMatchedBySourcePredicates -> [])",,List(3177618832289077),0519-083853-7h03etr9,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 61248, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 4, numTargetBytesAdded -> 429025, numTargetBytesRemoved -> 467130, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 4, executionTimeMs -> 3494, numTargetRowsInserted -> 10, numTargetRowsMatchedDeleted -> 0, scanTimeMs -> 1939, numTargetRowsUpdated -> 4, numOutputRows -> 61262, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 14, numTargetFilesRemoved -> 4, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 1477)",,Databricks-Runtime/13.0.x-cpu-ml-scala2.12
1,2023-05-21T18:38:15.000+0000,1497134490897596,vishal.abnave@borregaard.com,WRITE,"Map(mode -> Append, partitionBy -> [""Country""])",,List(3177618832289077),0519-083853-7h03etr9,0.0,WriteSerializable,True,"Map(numFiles -> 2, numOutputRows -> 36, numOutputBytes -> 5747)",,Databricks-Runtime/13.0.x-cpu-ml-scala2.12
0,2023-05-21T18:38:07.000+0000,1497134490897596,vishal.abnave@borregaard.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [""Country""])",,List(3177618832289077),0519-083853-7h03etr9,,WriteSerializable,False,"Map(numFiles -> 37, numOutputRows -> 65499, numOutputBytes -> 645075)",,Databricks-Runtime/13.0.x-cpu-ml-scala2.12


Querying an older version is as easy as adding `VERSION AS OF desired_version`. Let's verify that our table from one version back still exists.

In [0]:
%sql
SELECT COUNT(*)
FROM customer_data_delta
VERSION AS OF 1

count(1)
65535


Using a single file storage system, you now have access to every version of your historical data, ensuring that your data analysts will be able to replicate their reports (and compare aggregate changes over time) and your data scientists will be able to replicate their experiments.

### Check difference between versions

You want to compare how many orders from Sweden were added by your recent UPSERT to your BI table.

Let's start by getting the total sum of our `total_orders` column where our country is Sweden.

In [0]:
# TODO
count = spark.sql("SELECT SUM(total_orders) FROM customer_counts where Country='Sweden'").collect()[0][0]
print(count)

2985


Again, we can look at the history of our Delta table here.

In [0]:
%sql
DESCRIBE HISTORY customer_counts

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2023-05-21T18:38:44.000+0000,1497134490897596,vishal.abnave@borregaard.com,MERGE,"Map(predicate -> ((spark_catalog.vishal_abnave_borregaard_com_db.customer_counts.Country = spark_catalog.vishal_abnave_borregaard_com_db.new_customer_counts.Country) AND (spark_catalog.vishal_abnave_borregaard_com_db.customer_counts.CustomerID = spark_catalog.vishal_abnave_borregaard_com_db.new_customer_counts.CustomerID)), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}], notMatchedBySourcePredicates -> [])",,List(3177618832289077),0519-083853-7h03etr9,0.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 7, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 22, numTargetBytesAdded -> 190519, numTargetBytesRemoved -> 1014, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 14, executionTimeMs -> 3681, numTargetRowsInserted -> 38124, numTargetRowsMatchedDeleted -> 0, scanTimeMs -> 1606, numTargetRowsUpdated -> 14, numOutputRows -> 38145, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 38138, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 2044)",,Databricks-Runtime/13.0.x-cpu-ml-scala2.12
0,2023-05-21T18:38:29.000+0000,1497134490897596,vishal.abnave@borregaard.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [""Country""])",,List(3177618832289077),0519-083853-7h03etr9,,WriteSerializable,False,"Map(numFiles -> 25, numOutputRows -> 1245, numOutputBytes -> 29057)",,Databricks-Runtime/13.0.x-cpu-ml-scala2.12


Our original table will be version `0`. Let's write a SQL query to see how many orders we originally had from Sweden.

In [0]:
%sql
SELECT SUM(total_orders)
FROM customer_counts
VERSION AS OF 0
WHERE Country='Sweden'

sum(total_orders)
60


We can combine these two queries and get our difference, which represents our new entries.

In [0]:
%sql
SELECT SUM(total_orders) - (
  SELECT SUM(total_orders)
  FROM customer_counts
  VERSION AS OF 0
  WHERE Country='Sweden') AS new_entries
FROM customer_counts
WHERE Country='Sweden'

new_entries
2925


### OPTIMIZE and ZORDER

Let's apply some of these optimizations to `../delta/customer-data/`.

Our data is partitioned by `Country`.

We want to query the data for `StockCode` equal to `22301`.

We expect this query to be slow because we have to examine ALL OF `../delta/customer-data/` to find the desired `StockCode` and not just in one or two partitions.

First, let's time the above query: you will need to form a DataFrame to pass to `preZorderQuery`.

In [0]:
# TODO
%timeit preZorderQuery = spark.sql("select * from customer_data_delta where StockCode = 22301").collect()

718 ms ± 45.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Compact the files and re-order by `StockCode`.

In [0]:
%sql
-- TODO
OPTIMIZE customer_data_delta
ZORDER by (StockCode)

path,metrics
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data,"List(26, 75, List(3605, 504776, 76704.34615384616, 26, 1994313), List(2620, 419888, 29438.28, 75, 2207871), 33, List(minCubeSize(107374182400), List(0, 0), List(82, 2309289), 0, List(75, 2207871), 26, null), 1, 82, 7, false, 0, 0, 1684694344117, 1684694352940, 4, 26, null, List(0, 0), 8, 8, 5634)"


Let's time the above query again: you will need to form a DataFrame to pass to `postZorderQuery`.

In [0]:
# TODO
%timeit postZorderQuery = spark.sql("select * from customer_data_delta where StockCode = 22301").collect()

418 ms ± 44.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### OPTIMIZE your BI table

Here we'll optimize our `customer_counts` table so that we can quickly query on our `CustomerID` column.

In [0]:
%sql
OPTIMIZE customer_counts
ZORDER by (CustomerID)

path,metrics
dbfs:/user/vishal.abnave@borregaard.com/delta/customer_counts,"List(13, 26, List(7084, 15622, 9319.076923076924, 13, 121148), List(873, 9544, 5038.153846153846, 26, 130992), 33, List(minCubeSize(107374182400), List(0, 0), List(46, 218562), 0, List(26, 130992), 13, null), 1, 46, 20, false, 0, 0, 1684694357436, 1684694363114, 4, 13, null, List(0, 0), 3, 3, 1433)"


Now we can easily look at which of our customers have made the most orders.

In [0]:
%sql
SELECT CustomerID, SUM(total_orders) AS total
FROM customer_counts
GROUP BY CustomerID
ORDER BY total DESC

CustomerID,total
,25281
12748.0,695
17841.0,481
14606.0,421
15311.0,418
14911.0,377
17850.0,297
13089.0,261
13081.0,261
18118.0,250


Or we can see examine those customers that operate in the most countries.

In [0]:
%sql
SELECT CustomerID, COUNT(Country) AS num_countries
FROM customer_counts
GROUP BY CustomerID
SORT BY num_countries DESC

CustomerID,num_countries
20123.0,22
21529.0,21
20586.0,21
20600.0,21
20453.0,21
21860.0,21
20807.0,21
21160.0,21
21440.0,21
20059.0,21


And then look at how many orders a customer made in each of these countries.

In [0]:
%sql
SELECT Country, total_orders
FROM customer_counts
WHERE CustomerID = 20059

Country,total_orders
United Kingdom,8
France,4
,5
Germany,4
United States,7
Canada,2
Japan,3
Switzerland,2
Netherlands,2
Italy,2


### Using VACUUM to clean up small files

After we run OPTIMIZE, we have a number of uncompacted files that are no longer necessary. Running VACUUM will remove these files for us.

Let's go ahead and VACUUM our `customer_data_delta` table, which points at the files in our `DeltaPath` variable.

Count number of files before `VACUUM` for `Country=Sweden`.

In [0]:
# TODO
preFiles = (dbutils.fs.ls(DeltaPath))
display(preFiles)

preNumFiles = (dbutils.fs.ls(DeltaPath + '/Country=Sweden'))
display(preNumFiles)

path,name,size,modificationTime
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Australia/,Country=Australia/,0,1684694284000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Austria/,Country=Austria/,0,1684694285000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Bahrain/,Country=Bahrain/,0,1684694285000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Belgium/,Country=Belgium/,0,1684694285000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Brazil/,Country=Brazil/,0,1684694328000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Canada/,Country=Canada/,0,1684694328000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Channel Islands/,Country=Channel Islands/,0,1684694285000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=China/,Country=China/,0,1684694328000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Cyprus/,Country=Cyprus/,0,1684694285000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Denmark/,Country=Denmark/,0,1684694285000


path,name,size,modificationTime
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Sweden/part-00000-47f5fc44-17d1-4bc9-8a6c-8e8ee6403eea.c000.snappy.parquet,part-00000-47f5fc44-17d1-4bc9-8a6c-8e8ee6403eea.c000.snappy.parquet,34640,1684694329000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Sweden/part-00000-57f955ab-6dcd-4915-8699-b88c3208e859.c000.snappy.parquet,part-00000-57f955ab-6dcd-4915-8699-b88c3208e859.c000.snappy.parquet,3846,1684694286000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Sweden/part-00000-795c5ef9-93ca-4076-bc29-5c0bf0fd617a.c000.snappy.parquet,part-00000-795c5ef9-93ca-4076-bc29-5c0bf0fd617a.c000.snappy.parquet,2880,1684694295000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Sweden/part-00001-65dcf364-586e-4173-bdec-67a0fe0a25b9.c000.snappy.parquet,part-00001-65dcf364-586e-4173-bdec-67a0fe0a25b9.c000.snappy.parquet,22629,1684694329000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Sweden/part-00022-cb789ae7-3560-40eb-aa08-e6471e95b6ad.c000.snappy.parquet,part-00022-cb789ae7-3560-40eb-aa08-e6471e95b6ad.c000.snappy.parquet,51334,1684694351000


If you try to perform an immediate `VACUUM` (using `RETAIN 0 HOURS` to clean up recently optimized files), you will get an error.

In [0]:
%sql
-- TODO
VACUUM customer_data_delta RETAIN 0 HOURS;

This is a helfpul error. Remember that `VACUUM` is intended for occasional garbage collection. Here we'll just demonstrating that we _can_ use it to clean up files, so we'll set our configuration to allow this operation.

In [0]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)

Now we won't get an error when we run `VACUUM`.

In [0]:
%sql
VACUUM customer_data_delta RETAIN 0 HOURS;

path
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data


Count how many files there are for `Country=Sweden`.

In [0]:
# TODO
# postNumFiles = len(dbutils.fs.ls(DeltaPath + '/Country=Sweden'))
postNumFiles = (dbutils.fs.ls(DeltaPath + '/Country=Sweden'))
display(postNumFiles)

path,name,size,modificationTime
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Sweden/part-00000-7bef0385-0be1-4bbc-b5b0-f47c089a97ca.c000.snappy.parquet,part-00000-7bef0385-0be1-4bbc-b5b0-f47c089a97ca.c000.snappy.parquet,34640,1684694548000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Sweden/part-00000-9e226a7f-29db-4c45-b6b7-3d8a384dc1e1.c000.snappy.parquet,part-00000-9e226a7f-29db-4c45-b6b7-3d8a384dc1e1.c000.snappy.parquet,3846,1684694504000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Sweden/part-00000-b575b9b5-bb0b-4051-8ed0-3cdbdb23c22a.c000.snappy.parquet,part-00000-b575b9b5-bb0b-4051-8ed0-3cdbdb23c22a.c000.snappy.parquet,2880,1684694512000
dbfs:/user/vishal.abnave@borregaard.com/delta/customer-data/Country=Sweden/part-00001-f1ba0b29-5657-4b29-9615-f091e24d5bcd.c000.snappy.parquet,part-00001-f1ba0b29-5657-4b29-9615-f091e24d5bcd.c000.snappy.parquet,22629,1684694548000


Comparing our `preNumFiles` to `postNumFiles`, we can see that this number has reduced.