<!-- You can run this notebook in a Databricks environment. Specifically, this notebook has been designed to run in [Databricks Community Edition](http://community.cloud.databricks.com/) as well. -->
To run this notebook, you have to [create a cluster](https://docs.databricks.com/clusters/create.html) with version **Databricks Runtime 7.4 or later** and [attach this notebook](https://docs.databricks.com/notebooks/notebooks-manage.html#attach-a-notebook-to-a-cluster) to that cluster. <br/>

### Source Data for this notebook
The data used is a modified version of the public data from [Lending Club](https://www.kaggle.com/wendykan/lending-club-loan-data). It includes all funded loans from 2012 to 2017. Each loan includes applicant information provided by the applicant as well as the current loan status (Current, Late, Fully Paid, etc.) and latest payment information. For a full view of the data please view the data dictionary available [here](https://resources.lendingclub.com/LCDataDictionary.xlsx).

## Setup
Run the commands in cells 3-6 below to set up the demo, which begins in cell 7.

In [0]:
db = "deltadb"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")
spark.sql(f"USE {db}")

spark.sql("SET spark.databricks.delta.formatCheck.enabled = false")
spark.sql("SET spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true")

In [0]:
import random
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *


def my_checkpoint_dir(): 
  return "/tmp/delta_demo/chkpt/%s" % str(random.randint(0, 10000))

# User-defined function to generate random state
@udf(returnType=StringType())
def random_state():
  return str(random.choice(["CA", "TX", "NY", "WA"]))


# Function to start a streaming query with a stream of randomly generated load data and append to the parquet table
def generate_and_append_data_stream(table_format, table_name, schema_ok=False, type="batch"):
  
  stream_data = (spark.readStream.format("rate").option("rowsPerSecond", 500).load()
    .withColumn("loan_id", 10000 + col("value"))
    .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer"))
    .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000))
    .withColumn("addr_state", random_state())
    .withColumn("type", lit(type)))
    
  if schema_ok:
    stream_data = stream_data.select("loan_id", "funded_amnt", "paid_amnt", "addr_state", "type", "timestamp")
      
  query = (stream_data.writeStream
    .format(table_format)
    .option("checkpointLocation", my_checkpoint_dir())
    .trigger(processingTime = "5 seconds")
    .table(table_name))

  return query

In [0]:
def stop_all_streams():
    print("Stopping all streams")
    for s in spark.streams.active:
        try:
            s.stop()
        except:
            pass
    print("Stopped all streams")
    dbutils.fs.rm("/tmp/delta_demo/chkpt/", True)


def cleanup_paths_and_tables():
    dbutils.fs.rm("/tmp/delta_demo/", True)
    dbutils.fs.rm("file:/dbfs/tmp/delta_demo/loans_parquet/", True)
        
    for table in ["deltadb.loans_parquet", "deltadb.loans_delta", "deltadb.loans_delta2"]:
        spark.sql(f"DROP TABLE IF EXISTS {table}")
    
cleanup_paths_and_tables()

In [0]:
%sh mkdir -p /dbfs/tmp/delta_demo/loans_parquet/; wget -O /dbfs/tmp/delta_demo/loans_parquet/loans.parquet https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet

# Getting started with <img src="https://docs.delta.io/latest/_static/delta-lake-logo.png" width=300/>

An open-source storage layer for data lakes that brings ACID transactions to Apache Spark™ and big data workloads.

* **ACID Transactions**: Ensures data integrity and read consistency with complex, concurrent data pipelines.
* **Unified Batch and Streaming Source and Sink**: A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box. 
* **Schema Enforcement and Evolution**: Ensures data cleanliness by blocking writes with unexpected.
* **Time Travel**: Query previous versions of the table by time or version number.
* **Deletes and upserts**: Supports deleting and upserting into tables with programmatic APIs.
* **Open Format**: Stored as Parquet format in blob storage.
* **Audit History**: History of all the operations that happened in the table.
* **Scalable Metadata management**: Able to handle millions of files are scaling the metadata operations with Spark.

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Convert to Delta Lake format

To get started with Delta Lake, simply say “Delta” instead of “Parquet”, when writing out your tables with Spark. You can also use Spark SQL to create a Delta Lake table from scratch with the `CREATE TABLE USING DELTA` command. Or you can use the `CONVERT TO DELTA` command to convert your existing Parquet files to Delta Lake format in place.

<img src="https://databricks.com/wp-content/uploads/2020/12/simplysaydelta.png" width=600/>

In **Python**: Read your data into a Spark DataFrame, then write it out in Delta Lake format directly, with no upfront schema definition needed.

In [0]:
parquet_path = "file:/dbfs/tmp/delta_demo/loans_parquet/"

df = (spark.read.format("parquet").load(parquet_path)
      .withColumn("type", lit("batch"))
      .withColumn("timestamp", current_timestamp()))

df.write.format("delta").mode("overwrite").saveAsTable("loans_delta")

**SQL:** Use `CREATE TABLE` statement with SQL (no upfront schema definition needed)

In [0]:
%sql
CREATE TABLE loans_delta2
USING delta
AS SELECT * FROM parquet.`/tmp/delta_demo/loans_parquet`

num_affected_rows,num_inserted_rows


**SQL**: Use `CONVERT TO DELTA` to convert Parquet files to Delta Lake format in place

In [0]:
%sql CONVERT TO DELTA parquet.`/tmp/delta_demo/loans_parquet`

### View the data in the Delta Lake table
Now that we’ve converted our data to Delta Lake, let’s look at the data in our table. We have 14,705 batch records in this table. And you can see here what the data actually looks like. Next, we'll see how Delta Lake is able to handle batch and streaming data with ease, by setting up two streaming writes to our Delta Lake table alongside two streaming reads that will happen simultaneously.

In [0]:
spark.sql("select count(*) from loans_delta").show()
spark.sql("select * from loans_delta").show(3)

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Unified batch + streaming data processing with multiple concurrent readers and writers

### Write 2 different data streams into our Delta Lake table at the same time.

In [0]:
# Set up 2 streaming writes to our table
stream_query_A = generate_and_append_data_stream(table_format="delta", table_name="loans_delta", schema_ok=True, type='stream A')
stream_query_B = generate_and_append_data_stream(table_format="delta", table_name="loans_delta", schema_ok=True, type='stream B')

### Create 2 continuous streaming readers of our Delta Lake table to illustrate streaming progress.

So as you can see in the chart below, the initial 14,705 batch records in our table are still present. But new data is now streaming in from Stream A and Stream B at around 500 Records per second each. Meanwhile, Delta Lake can also serve consistent views of this table to our streaming read queries, which are powering the visualizations below.

In [0]:
# Streaming read #1
display(spark.readStream.format("delta").table("loans_delta").groupBy("type").count().orderBy("type"))

type,count
batch,14705
stream A,134500
stream B,134000


In the visualization below, you can see the new data streaming into our table over time. Each new bar represents a 10 second window, during which both stream A and stream B are writing to our table concurrently. And again, this visualization is powered by a streaming read from our table as well.

In [0]:
# Streaming read #2
display(spark.readStream.format("delta").table("loans_delta").groupBy("type", window("timestamp", "10 seconds")).count().orderBy("window"))

type,window,count
batch,"List(2021-11-23T00:43:10.000+0000, 2021-11-23T00:43:20.000+0000)",14705
stream A,"List(2021-11-23T00:54:10.000+0000, 2021-11-23T00:54:20.000+0000)",319
stream B,"List(2021-11-23T00:54:20.000+0000, 2021-11-23T00:54:30.000+0000)",4936
stream A,"List(2021-11-23T00:54:20.000+0000, 2021-11-23T00:54:30.000+0000)",5000
stream B,"List(2021-11-23T00:54:30.000+0000, 2021-11-23T00:54:40.000+0000)",5000
stream A,"List(2021-11-23T00:54:30.000+0000, 2021-11-23T00:54:40.000+0000)",5000
stream B,"List(2021-11-23T00:54:40.000+0000, 2021-11-23T00:54:50.000+0000)",5000
stream A,"List(2021-11-23T00:54:40.000+0000, 2021-11-23T00:54:50.000+0000)",5000
stream B,"List(2021-11-23T00:54:50.000+0000, 2021-11-23T00:55:00.000+0000)",5000
stream A,"List(2021-11-23T00:54:50.000+0000, 2021-11-23T00:55:00.000+0000)",5000


### Add a batch query, just for good measure

In [0]:
%sql
SELECT addr_state, COUNT(*)
FROM loans_delta
GROUP BY addr_state

addr_state,count(1)
AZ,329
SC,174
LA,167
MN,256
NJ,541
DC,38
OR,178
VA,413
RI,66
WY,31


What this shows is that Delta Lake tables can easily handle multiple readers and writers of both batch and streaming data all at once.

In [0]:
dbutils.notebook.exit("stop")

stop

In [0]:
stop_all_streams()

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) ACID Transactions

So how is all of this possible? It’s possible because of ACID transactions. Delta Lake uses a transaction log that serves as a master record of all changes made to each table. That also determines which files are part of our table at all times. You can view the transaction log at any time by running the `DESCRIBE HISTORY` command as seen here.

When we run this command, you can see that each write to our table has been recorded atomically as a streaming update. Once a new write is recorded in the transaction log, all of our downstream readers will now include that write transaction in their view of the table immediately. So everyone always agrees on what constitutes a Delta Lake table at all times. They simply refer to the transaction log.

In [0]:
%sql DESCRIBE HISTORY loans_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
98,2021-11-23T00:59:07.000+0000,4038897723565024,amit.kara@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> bf631fd8-6a07-4622-8006-99a35da34474, epochId -> 48)",,List(4367247831049144),1028-185951-iqhcdsdg,97.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 49214, numAddedFiles -> 1)",
97,2021-11-23T00:59:04.000+0000,4038897723565024,amit.kara@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 3fea360c-1fc3-4b5a-99be-7d8d16502585, epochId -> 48)",,List(4367247831049144),1028-185951-iqhcdsdg,95.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 49205, numAddedFiles -> 1)",
96,2021-11-23T00:59:03.000+0000,4038897723565024,amit.kara@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> bf631fd8-6a07-4622-8006-99a35da34474, epochId -> 47)",,List(4367247831049144),1028-185951-iqhcdsdg,95.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 1500, numOutputBytes -> 37882, numAddedFiles -> 1)",
95,2021-11-23T00:59:00.000+0000,4038897723565024,amit.kara@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> bf631fd8-6a07-4622-8006-99a35da34474, epochId -> 46)",,List(4367247831049144),1028-185951-iqhcdsdg,93.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 49686, numAddedFiles -> 1)",
94,2021-11-23T00:58:59.000+0000,4038897723565024,amit.kara@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 3fea360c-1fc3-4b5a-99be-7d8d16502585, epochId -> 47)",,List(4367247831049144),1028-185951-iqhcdsdg,93.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 49222, numAddedFiles -> 1)",
93,2021-11-23T00:58:57.000+0000,4038897723565024,amit.kara@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> bf631fd8-6a07-4622-8006-99a35da34474, epochId -> 45)",,List(4367247831049144),1028-185951-iqhcdsdg,91.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 3000, numOutputBytes -> 73173, numAddedFiles -> 1)",
92,2021-11-23T00:58:56.000+0000,4038897723565024,amit.kara@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 3fea360c-1fc3-4b5a-99be-7d8d16502585, epochId -> 46)",,List(4367247831049144),1028-185951-iqhcdsdg,91.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2500, numOutputBytes -> 60802, numAddedFiles -> 1)",
91,2021-11-23T00:58:51.000+0000,4038897723565024,amit.kara@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 3fea360c-1fc3-4b5a-99be-7d8d16502585, epochId -> 45)",,List(4367247831049144),1028-185951-iqhcdsdg,89.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 3000, numOutputBytes -> 74684, numAddedFiles -> 1)",
90,2021-11-23T00:58:50.000+0000,4038897723565024,amit.kara@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> bf631fd8-6a07-4622-8006-99a35da34474, epochId -> 44)",,List(4367247831049144),1028-185951-iqhcdsdg,89.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 3000, numOutputBytes -> 73383, numAddedFiles -> 1)",
89,2021-11-23T00:58:44.000+0000,4038897723565024,amit.kara@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 3fea360c-1fc3-4b5a-99be-7d8d16502585, epochId -> 44)",,List(4367247831049144),1028-185951-iqhcdsdg,87.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2500, numOutputBytes -> 61478, numAddedFiles -> 1)",


This all makes Delta Lake tables really powerful. And at Databricks, we’ve found that many of our customers are able to simplify and streamline their overall data architectures using Delta Lake. By building a simple multi-hop data pipeline with Delta Lake tables, you can reliably transform raw batch and streaming data into high quality, structured data that multiple downstream apps and users can query at once. And what this means from a business ROI perspective is:
* lower cloud computing costs,
* less complexity to manage, and
* less time wasted on fixing systems issues like corrupted data.

<img src="https://databricks.com/wp-content/uploads/2020/09/delta-lake-medallion-model-scaled.jpg" width=1012/>

##  ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Use Schema Enforcement to protect data quality

But Delta Lake does a lot more than just use ACID transactions to combine batch and streaming data. It also offers tools like schema enforcement to protect the quality of the data in your data tables. Without schema enforcement, data with mismatching schemas can change your table schema and break your entire data pipeline, causing cascading failures downstream. So we use schema enforcement to ensure that that doesn’t happen.

To show you how schema enforcement works, let's create a new table that has an extra column -- `credit_score` -- that doesn't match our existing Delta Lake table schema.

#### Write DataFrame with extra column, `credit_score`, to Delta Lake table

In [0]:
# Generate `new_data` with additional column
new_column = [StructField("credit_score", IntegerType(), True)]
new_schema = StructType(spark.table("loans_delta").schema.fields + new_column)
data = [(99997, 10000, 1338.55, "CA", "batch", datetime.now(), 649),
        (99998, 20000, 1442.55, "NY", "batch", datetime.now(), 702)]

new_data = spark.createDataFrame(data, new_schema)
new_data.printSchema()

In [0]:
# Uncommenting this cell will lead to an error because the schemas don't match.
# Attempt to write data with new column to Delta Lake table
new_data.write.format("delta").mode("append").saveAsTable("loans_delta")

**Schema enforcement helps keep our tables clean and tidy so that we can trust the data we have stored in Delta Lake.** The writes above were blocked because the schema of the new data did not match the schema of table (see the exception details). See more information about how it works [here](https://databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html).

##  ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Use Schema Evolution to add new columns to schema

But schema enforcement alone is not enough. In the event that we do want to change our table schema, we also need schema evolution. With Delta Lake, we evolve our schema quickly and easily by simply adding the `mergeSchema` option to our Spark write command.

In [0]:
new_data.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("loans_delta")

When we run this command, we can see that the write command that previously failed has now succeeded, and the new `credit_score` column is now present in our new table.

In [0]:
%sql SELECT * FROM loans_delta WHERE loan_id IN (99997, 99998)

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp,credit_score
99997,10000,1338.55,CA,batch,2021-01-04T20:21:35.571+0000,649.0
99998,20000,1442.55,NY,batch,2021-01-04T20:21:35.571+0000,702.0
99998,6376,5995.127658122062,WA,stream A,2021-01-04T20:18:49.038+0000,
99997,9584,8973.400453976867,NY,stream A,2021-01-04T20:18:49.036+0000,
99997,9631,8575.464785030312,NY,stream B,2021-01-04T20:18:49.726+0000,
99998,9681,8376.371768590849,WA,stream B,2021-01-04T20:18:49.728+0000,


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Delta Lake Time Travel

Delta Lake’s time travel capabilities simplify building data pipelines for use cases including:

* Auditing Data Changes
* Reproducing experiments & reports
* Rollbacks

As you write into a Delta table or directory, every operation is automatically versioned.

<img src="https://github.com/risan4841/img/blob/master/transactionallogs.png?raw=true" width=250/>

You can query snapshots of your tables by:
1. **Version number**, or
2. **Timestamp.**

using Python, Scala, and/or SQL syntax; for these examples we will use the SQL syntax.  

For more information, refer to the [docs](https://docs.delta.io/latest/delta-utility.html#history), or [Introducing Delta Time Travel for Large Scale Data Lakes](https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html).

#### Review Delta Lake Table History for  Auditing & Governance
Another major feature of Delta Lake is the ability to travel back in time using time travel, also known as data versioning. Because every change to our table is recorded as an atomic transaction in the transaction log, we can use this information to recreate the exact state of our table at any point in time.

Time travel helps you avoid making irreversible changes to your tables. It makes your data sets and experiments reproducible, and offers a verifiable data lineage for audit and governance purposes.

So let’s first review the transaction log using the `DESCRIBE HISTORY` command that we saw earlier.

In [0]:
%sql
DESCRIBE HISTORY loans_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
111,2021-01-04T20:23:00.000+0000,101001,Brenner.Heintz@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(8163273),0318-151752-abed99,110.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputBytes -> 1933, numOutputRows -> 2)",
110,2021-01-04T20:20:57.000+0000,101001,Brenner.Heintz@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 1e76a9c2-f557-46e3-865a-d0c6e6617d29, epochId -> 54)",,List(8163273),0318-151752-abed99,108.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2500, numOutputBytes -> 62263, numAddedFiles -> 1)",
109,2021-01-04T20:20:56.000+0000,101001,Brenner.Heintz@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 67316880-2d22-417e-8103-577b47ecf139, epochId -> 54)",,List(8163273),0318-151752-abed99,108.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2500, numOutputBytes -> 62259, numAddedFiles -> 1)",
108,2021-01-04T20:20:51.001+0000,101001,Brenner.Heintz@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 1e76a9c2-f557-46e3-865a-d0c6e6617d29, epochId -> 53)",,List(8163273),0318-151752-abed99,106.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2500, numOutputBytes -> 62463, numAddedFiles -> 1)",
107,2021-01-04T20:20:51.000+0000,101001,Brenner.Heintz@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 67316880-2d22-417e-8103-577b47ecf139, epochId -> 53)",,List(8163273),0318-151752-abed99,106.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2500, numOutputBytes -> 61777, numAddedFiles -> 1)",
106,2021-01-04T20:20:47.000+0000,101001,Brenner.Heintz@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 67316880-2d22-417e-8103-577b47ecf139, epochId -> 52)",,List(8163273),0318-151752-abed99,104.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 49651, numAddedFiles -> 1)",
105,2021-01-04T20:20:46.000+0000,101001,Brenner.Heintz@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 1e76a9c2-f557-46e3-865a-d0c6e6617d29, epochId -> 52)",,List(8163273),0318-151752-abed99,104.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 50085, numAddedFiles -> 1)",
104,2021-01-04T20:20:43.000+0000,101001,Brenner.Heintz@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 1e76a9c2-f557-46e3-865a-d0c6e6617d29, epochId -> 51)",,List(8163273),0318-151752-abed99,102.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 1000, numOutputBytes -> 24890, numAddedFiles -> 1)",
103,2021-01-04T20:20:42.000+0000,101001,Brenner.Heintz@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 67316880-2d22-417e-8103-577b47ecf139, epochId -> 51)",,List(8163273),0318-151752-abed99,102.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 1500, numOutputBytes -> 37950, numAddedFiles -> 1)",
102,2021-01-04T20:20:40.000+0000,101001,Brenner.Heintz@databricks.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 1e76a9c2-f557-46e3-865a-d0c6e6617d29, epochId -> 50)",,List(8163273),0318-151752-abed99,100.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 49643, numAddedFiles -> 1)",


#### Use time travel to select and view the original version of our table (Version 0).
As you can see above, each version of our table is saved by version number and by timestamp. We can use this information to do things like query historical versions of our table.

By adding the `VERSION AS OF` command to our SQL query, our query runs on the very first version of our data set — version 0 — and returns a count of 14,705, the same number of records that were originally present in our table.

In [0]:
spark.sql("SELECT * FROM loans_delta VERSION AS OF 0").show(3)
spark.sql("SELECT COUNT(*) FROM loans_delta VERSION AS OF 0").show()

In [0]:
%sql SELECT COUNT(*) FROM loans_delta

count(1)
319707


#### Rollback a table to a specific version using `RESTORE`
Taking time travel one step further, we can roll back our table at any time using the `RESTORE` command shown here. This is really useful if we decide that we’ve made a change to our table that we want to completely undo, and simply roll back our data to an earlier version.

In [0]:
%sql RESTORE loans_delta VERSION AS OF 0

In [0]:
%sql SELECT COUNT(*) FROM loans_delta

count(1)
14705


So as you can see, now when we query our table, those original 14,705 batch records are the only records present in our table. Version 0 has become the current version.

##![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Full DML Support: `DELETE`, `UPDATE`, `MERGE INTO`

The next Delta Lake feature I want to demo for you is full support for transactional DML commands like `UPDATE`, `MERGE`, and `DELETE`. These are the SQL commands that make manipulating big data tables quick and easy. Before Delta Lake, deleting a user’s data from a data lake to comply with a GDPR request was difficult to perform without running the risk of data loss or corruption. But with Delta Lake, we can delete a user’s data transactionally, in just one line of code.

>Parquet does **not** support these commands - they are unique to Delta Lake.

###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) `DELETE`: Handle GDPR or CCPA Requests on your Data Lake

First, let’s choose a user whose data we will manipulate -- let's choose user #4420.

**View the user's data**

In [0]:
%sql
SELECT * FROM loans_delta WHERE loan_id=4420

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp
4420,22000,1050.94,TX,batch,2021-01-04T20:15:28.381+0000


**Delete the individual user's data with a single `DELETE` command using Delta Lake.**

Now when we run the `DELETE` command, you can see that we successfully deleted our user's data transactionally.

In [0]:
%sql
DELETE FROM loans_delta WHERE loan_id=4420;
-- Confirm the user's data was deleted
SELECT * FROM loans_delta WHERE loan_id=4420

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp


The same idea applies with other DML commands like `INSERT`, `UPDATE`, and `MERGE`. Delta Lake makes these operations really simple to perform with minimal code, all backed by the reliability of ACID transactions.

###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Use time travel and `INSERT INTO` to add the user back into our table
Let’s go ahead and use time travel to insert user #4420's data back into our table.

In [0]:
%sql
INSERT INTO loans_delta
SELECT * FROM loans_delta VERSION AS OF 0
WHERE loan_id=4420

After running the `INSERT` command, we can see that our user's record has been added back into the table successfully.

In [0]:
%sql SELECT * FROM loans_delta WHERE loan_id=4420

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp
4420,22000,1050.94,TX,batch,2021-01-04T20:15:28.381+0000


### ![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) `UPDATE`: Modify the existing records in a table in one command
Next, let’s go ahead and `UPDATE` our user’s data with a single command as well. Here, we’re going to set the funded amount equal to 22,000, for the same user as before, #4420.

In [0]:
%sql UPDATE loans_delta SET funded_amnt = 22000 WHERE loan_id = 4420

After running the command, we see that the user’s funded amount has been updated successfully, since it now equals 22,000.

In [0]:
%sql SELECT * FROM loans_delta WHERE loan_id = 4420

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp
4420,22000,1050.94,TX,batch,2021-01-04T20:15:28.381+0000


###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Support Change Data Capture Workflows & Other Ingest Use Cases via `MERGE INTO`

Finally, Delta Lake also supports "Upserts", which are a mix of `INSERT` and `UPDATE`, using the `MERGE` command. Normally, merges are a difficult, expensive operation that involves several intermediate steps:

<img src="https://pages.databricks.com/rs/094-YMS-629/images/merge-into-legacy.gif" alt='Merge process' width=600/>

With Delta Lake, we can skip all of that complexity, and simply use the `MERGE` command.

So first, we’ll create some dummy data to merge — one row with an update of user #4420’s data, and one row of new data to insert in our table. So when we put together our `MERGE` command, we can specify that we want to `UPDATE` the table when the record already exists, and insert it when it doesn’t.

In [0]:
# Create merge table with 1 row update, 1 insertion
data = [(4420, 22000, 21500.00, "NY", "update", datetime.now()),  # record to update
        (999999, 10000, 1338.55, "CA", "insert", datetime.now())]  # record to insert
schema = spark.table("loans_delta").schema
spark.createDataFrame(data, schema).createOrReplaceTempView("merge_table")
spark.sql("SELECT * FROM merge_table").show()

In [0]:
%sql
MERGE INTO loans_delta AS l
USING merge_table AS m
ON l.loan_id = m.loan_id
WHEN MATCHED THEN 
  UPDATE SET *
WHEN NOT MATCHED 
  THEN INSERT *;

So after we run the `MERGE` command, as you can see below, we successfully updated user #4420’s data, and inserted our new user’s (#999999) data.

In [0]:
%sql SELECT * FROM loans_delta WHERE loan_id IN (4420, 999999)

loan_id,funded_amnt,paid_amnt,addr_state,type,timestamp
4420,22000,21500.0,NY,update,2021-01-04T20:23:52.362+0000
99999,10000,1338.55,CA,insert,2021-01-04T20:23:52.362+0000


## ![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) File compaction and performance optimizations = faster queries
Finally, before wrapping up, let's review a couple of additional commands that improve performance when working with big Delta Lake tables. First, there’s the `VACUUM` command.

### Vacuum
`VACUUM` allows us to mark any data files that are no longer being used by the current version of our table for deletion.

In [0]:
%sql
-- Vacuum deletes all files no longer needed by the current version of the table.
VACUUM loans_delta

path
dbfs:/user/hive/warehouse/deltadb.db/loans_delta


### <img src="https://pages.databricks.com/rs/094-YMS-629/images/dbsquare.png" width=30/> Cache table in memory (Databricks Delta Lake only)
The next couple of commands are only available with Delta Lake on Databricks. First, there’s the `CACHE` command, which allows you to cache the results of any frequently used query in memory to speed up that query in the future.

In [0]:
%sql CACHE SELECT * FROM loans_delta

### <img src="https://pages.databricks.com/rs/094-YMS-629/images/dbsquare.png" width=30/> Z-Order Optimize (Databricks Delta Lake only)
And finally, there’s the Z-Order optimize command shown here, which uses advanced multidimensional data clustering techniques to co-locate related data for the fastest queries possible. Z-Ordering significantly speeds up many queries and reduces cloud costs by enabling more efficient reads of our data.

In [0]:
%sql OPTIMIZE loans_delta ZORDER BY addr_state

path,metrics
,"List(1, 2, List(165253, 165253, 165253.0, 1, 165253), List(1652, 165309, 83480.0, 2, 166961), 0, List(minCubeSize(107374182400), List(0, 0), List(2, 166961), 0, List(2, 166961), 1, null), 1)"


In [0]:
cleanup_paths_and_tables()

<img src="https://docs.delta.io/latest/_static/delta-lake-logo.png" width=300/>

So that concludes our hands-on demo of Delta Lake. Try Delta Lake on Databricks today, and join the community online to learn more about how Delta Lake provides the perfect foundation for your lakehouse architecture.

#Join the community!


* [Delta Lake on GitHub](https://github.com/delta-io/delta)
* [Delta Lake Slack Channel](https://delta-users.slack.com/) ([Registration Link](https://join.slack.com/t/delta-users/shared_invite/enQtNTY1NDg0ODcxOTI1LWJkZGU3ZmQ3MjkzNmY2ZDM0NjNlYjE4MWIzYjg2OWM1OTBmMWIxZTllMjg3ZmJkNjIwZmE1ZTZkMmQ0OTk5ZjA))
* [Public Mailing List](https://groups.google.com/forum/#!forum/delta-users)