# Delta Lake Time Machine & Optimization Lab

Delta Lake allows you to read, write and query data in data lakes in an efficient manner.

## In this lesson you:
* Compare different versions of a Delta table using Time Machine
* Optimize your Delta Lake to increase speed and reduce number of files

## Audience
* Primary Audience: Data Engineers
* Secondary Audience: Data Analysts and Data Scientists

## Prerequisites
* Web browser: current versions of Google Chrome, Firefox, Safari, Microsoft Edge and
Internet Explorer 11 on Windows 7, 8, or 10 (see <a href="https://docs.databricks.com/user-guide/supported-browsers.html#supported-browsers#" target="_blank">Supported Web Browsers</a>)
* Databricks Runtime 4.2 or greater
## Datasets Used
We will use online retail datasets from `/mnt/training/online_retail`

In [0]:
%run "./Includes/Classroom-Setup"

Because we'll be calculating some aggregates in this notebook, we'll change our partitions after shuffle from the default `200` to `8` (which is a good number for the 8 node cluster we're currently working on).

In [0]:
%python

sqlContext.setConf("spark.sql.shuffle.partitions", "8")

### Check for our previous Delta Lake tables

This lab relies upon some tables created in previous Delta Lake lessons and labs. 

If you get an error from either of the next two SQL queries, running the solution code for the "Delta-Lake-Lab-1" will build all necessary tables.

In [0]:
%sql

SELECT COUNT(*) FROM customer_counts;
SELECT COUNT(*) FROM customer_data_delta;

<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> **The following cell will take several minutes to execute, and is only necessary to run if you got an error in the previous cell.**

In [0]:
%run "./Includes/Delta-Lab-2-Prep"

For convenience later in this lab, the paths to the files defining our existing Delta tables are provided. You can use these paths to load the data into DataFrames, if desired, though this entire lab can be completed using SQL on the existant tables.

In [0]:
DeltaPath = userhome + "/delta/customer-data/"
CustomerCountsPath = userhome + "/delta/customer_counts/"

**Note: This lab depends upon the complete exectuion of the notebook titled "Open-Source-Delta-Lake" and the "Delta-Lake-Basics" lab. If these tables don't exist, go back and run all cells in these notebook.**

### Time Travel
Because Delta Lake is version controlled, you have the option to query past versions of the data. Let's look at the history of our current Delta table.

In [0]:
%sql
DESCRIBE HISTORY customer_data_delta

Querying an older version is as easy as adding `VERSION AS OF desired_version`. Let's verify that our table from one version back still exists.

In [0]:
%sql
SELECT COUNT(*)
FROM customer_data_delta
VERSION AS OF 1

Using a single file storage system, you now have access to every version of your historical data, ensuring that your data analysts will be able to replicate their reports (and compare aggregate changes over time) and your data scientists will be able to replicate their experiments.

### Check difference between versions

You want to compare how many orders from Sweden were added by your recent UPSERT to your BI table.

Let's start by getting the total sum of our `total_orders` column where our country is Sweden.

In [0]:
# TODO
count = spark.sql("FILL IN").collect()[0][0]

Again, we can look at the history of our Delta table here.

In [0]:
%sql
DESCRIBE HISTORY customer_counts

Our original table will be version `0`. Let's write a SQL query to see how many orders we originally had from Sweden.

In [0]:
%sql
SELECT SUM(total_orders)
FROM customer_counts
VERSION AS OF 0
WHERE Country='Sweden'

We can combine these two queries and get our difference, which represents our new entries.

In [0]:
%sql
SELECT SUM(total_orders) - (
  SELECT SUM(total_orders)
  FROM customer_counts
  VERSION AS OF 0
  WHERE Country='Sweden') AS new_entries
FROM customer_counts
WHERE Country='Sweden'

### OPTIMIZE and ZORDER

Let's apply some of these optimizations to `../delta/customer-data/`.

Our data is partitioned by `Country`.

We want to query the data for `StockCode` equal to `22301`.

We expect this query to be slow because we have to examine ALL OF `../delta/customer-data/` to find the desired `StockCode` and not just in one or two partitions.

First, let's time the above query: you will need to form a DataFrame to pass to `preZorderQuery`.

In [0]:
# TODO
%timeit preZorderQuery = spark.sql("FILL_IN").collect()


Compact the files and re-order by `StockCode`.

In [0]:
%sql
-- TODO
OPTIMIZE FILL_IN
ZORDER by (FILL_IN)

Let's time the above query again: you will need to form a DataFrame to pass to `postZorderQuery`.

In [0]:
# TODO
%timeit postZorderQuery = spark.sql("FILL_IN").collect()

### OPTIMIZE your BI table

Here we'll optimize our `customer_counts` table so that we can quickly query on our `CustomerID` column.

In [0]:
%sql
OPTIMIZE customer_counts
ZORDER by (CustomerID)

Now we can easily look at which of our customers have made the most orders.

In [0]:
%sql
SELECT CustomerID, SUM(total_orders) AS total
FROM customer_counts
GROUP BY CustomerID
ORDER BY total DESC

Or we can see examine those customers that operate in the most countries.

In [0]:
%sql
SELECT CustomerID, COUNT(Country) AS num_countries
FROM customer_counts
GROUP BY CustomerID
SORT BY num_countries DESC

And then look at how many orders a customer made in each of these countries.

In [0]:
%sql
SELECT Country, total_orders
FROM customer_counts
WHERE CustomerID = 20059

### Using VACUUM to clean up small files

After we run OPTIMIZE, we have a number of uncompacted files that are no longer necessary. Running VACUUM will remove these files for us.

Let's go ahead and VACUUM our `customer_data_delta` table, which points at the files in our `DeltaPath` variable.

Count number of files before `VACUUM` for `Country=Sweden`.

In [0]:
# TODO
preNumFiles = len(dbutils.fs.ls(FILL_IN))
print(preNumFiles)

If you try to perform an immediate `VACUUM` (using `RETAIN 0 HOURS` to clean up recently optimized files), you will get an error.

In [0]:
%sql
-- TODO
VACUUM FILL_IN

This is a helfpul error. Remember that `VACUUM` is intended for occasional garbage collection. Here we'll just demonstrating that we _can_ use it to clean up files, so we'll set our configuration to allow this operation.

In [0]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)

Now we won't get an error when we run `VACUUM`.

In [0]:
%sql
VACUUM customer_data_delta RETAIN 0 HOURS;

Count how many files there are for `Country=Sweden`.

In [0]:
# TODO
postNumFiles = len(dbutils.fs.ls(FILL_IN))
print(postNumFiles)

Comparing our `preNumFiles` to `postNumFiles`, we can see that this number has reduced.
