# Ensuring Consistency with ACID Transactions with Delta Lake (Loan Risk Data)

<img src="https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-logo-whitebackground.png" width=200/>

This is a companion notebook to provide a Delta Lake example against the Lending Club data.
* This notebook has been tested with *DBR 5.4 ML Beta, Python 3*

## The Data

The data used is public data from Lending Club. It includes all funded loans from 2012 to 2017. Each loan includes applicant information provided by the applicant as well as the current loan status (Current, Late, Fully Paid, etc.) and latest payment information. For a full view of the data please view the data dictionary available [here](https://resources.lendingclub.com/LCDataDictionary.xlsx).


![Loan_Data](https://preview.ibb.co/d3tQ4R/Screen_Shot_2018_02_02_at_11_21_51_PM.png)

https://www.kaggle.com/wendykan/lending-club-loan-data

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Delta Lake

Optimization Layer a top blob storage for Reliability (i.e. ACID compliance) and Low Latency of Streaming + Batch data pipelines.

## Import Data and create pre-Delta Lake Table
* This will create a lot of small Parquet files emulating the typical small file problem that occurs with streaming or highly transactional data

In [5]:
# -----------------------------------------------
# Uncomment and run if this folder does not exist
# -----------------------------------------------
# Configure location of loanstats_2012_2017.parquet
lspq_path = "/databricks-datasets/samples/lending_club/parquet/"

# Read loanstats_2012_2017.parquet
data = spark.read.parquet(lspq_path)

# Reduce the amount of data (to run on DBCE)
(loan_stats, loan_stats_rest) = data.randomSplit([0.01, 0.99], seed=123)

# Select only the columns needed
loan_stats = loan_stats.select("addr_state", "loan_status")

# Create loan by state
loan_by_state = loan_stats.groupBy("addr_state").count()

# Create table
loan_by_state.createOrReplaceTempView("loan_by_state")

# Display loans by state
display(loan_by_state)

addr_state,count
AZ,346
SC,188
LA,165
MN,266
NJ,547
DC,30
OR,169
VA,426
RI,51
KY,151


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Easily Convert Parquet to Delta Lake format
With Delta Lake, you can easily transform your Parquet data into Delta Lake format.

In [7]:
# Configure Delta Lake Silver Path
DELTALAKE_SILVER_PATH = "/ml/loan_by_state_delta"

# Remove folder if it exists
dbutils.fs.rm(DELTALAKE_SILVER_PATH, recurse=True)

In [8]:
%sql 
-- Current example is creating a new table instead of in-place import so will need to change this code
DROP TABLE IF EXISTS loan_by_state_delta;

CREATE TABLE loan_by_state_delta
USING delta
LOCATION '/ml/loan_by_state_delta'
AS SELECT * FROM loan_by_state;

-- View Delta Lake table
SELECT * FROM loan_by_state_delta

addr_state,count
DC,30
DE,36
LA,165
MA,350
ME,15
NV,222
MI,383
ID,14
KS,124
AL,188


In [9]:
%sql 
DESCRIBE DETAIL loan_by_state_delta

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,21716358-eb74-4aae-9ae1-2ccfc97600cc,default.loan_by_state_delta,,dbfs:/ml/loan_by_state_delta,2019-06-10T13:59:18.350+0000,2019-06-10T13:59:33.000+0000,List(),46,29704,Map(),1,2


## Stop the notebook before the streaming cell, in case of a "run all"

In [11]:
dbutils.notebook.exit("stop") 

stop

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Unified Batch and Streaming Source and Sink

These cells showcase streaming and batch concurrent queries (inserts and reads)
* This notebook will run an `INSERT` every 10s against our `loan_stats_delta` table
* We will run two streaming queries concurrently against this data
* Note, you can also use `writeStream` but this version is easier to run in DBCE

In [13]:
# Read the insertion of data
loan_by_state_readStream = spark.readStream.format("delta").load(DELTALAKE_SILVER_PATH)
loan_by_state_readStream.createOrReplaceTempView("loan_by_state_readStream")

In [14]:
%sql
select addr_state, sum(`count`) as loans from loan_by_state_readStream group by addr_state

addr_state,loans
AZ,346
SC,188
LA,165
MN,266
NJ,547
DC,30
OR,169
VA,426
RI,51
WY,28


**Wait** until the stream is up and running before executing the code below

In [16]:
import time
i = 1
while i <= 6:
  # Execute Insert statement
  insert_sql = "INSERT INTO loan_by_state_delta VALUES ('IA', 450)"
  spark.sql(insert_sql)
  print('loan_by_state_delta: inserted new row of data, loop: [%s]' % i)
    
  # Loop through
  i = i + 1
  time.sleep(10)

**Note**: Once the previous cell is finished and the state of Iowa is fully populated in the map (in cell 14), click *Cancel* in Cell 14 to stop the `readStream`.

Let's review our current set of loans using our map visualization.

In [19]:
%sql
-- Review current loans within the `loan_by_state_delta` Delta Lake table
select addr_state, sum(`count`) as loans from loan_by_state_delta group by addr_state

addr_state,loans
AZ,346
SC,188
LA,165
MN,266
NJ,547
DC,30
OR,169
VA,426
RI,51
WY,28


Observe that the Iowa (middle state) has the largest number of loans due to the recent stream of data.  Note that the original `loan_by_state_delta` table is updated as we're reading `loan_by_state_readStream`.

##![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Full DML Support

**Note**: Full DML Support is a feature that will be coming soon to Delta Lake; the preview is currently available in Databricks.

Delta Lake supports standard DML including UPDATE, DELETE and MERGE INTO providing developers more controls to manage their big datasets.

Let's start by creating a traditional Parquet table

In [23]:
# Load new DataFrame based on current Delta table
lbs_df = sql("select * from loan_by_state_delta")

# Save DataFrame to Parquet
lbs_df.write.mode("overwrite").parquet("loan_by_state.parquet")

# Reload Parquet Data
lbs_pq = spark.read.parquet("loan_by_state.parquet")

# Create new table on this parquet data
lbs_pq.createOrReplaceTempView("loan_by_state_pq")

# Review data
display(sql("select * from loan_by_state_pq"))

addr_state,count
IN,248
MO,239
MN,266
RI,51
NH,66
WI,194
NC,382
NM,94
NJ,547
MS,80


###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) DELETE Support

The data was originally supposed to be assigned to `WA` state, so let's `DELETE` those values assigned to `IA`

In [25]:
%sql
-- Attempting to run `DELETE` on the Parquet table
DELETE FROM loan_by_state_pq WHERE addr_state = 'IA'

**Note**: This command fails because the `DELETE` statements are not supported in Parquet, but are supported in Delta Lake.

In [27]:
%sql
-- Running `DELETE` on the Delta Lake table
DELETE FROM loan_by_state_delta WHERE addr_state = 'IA'

In [28]:
%sql
-- Review current loans within the `loan_by_state_delta` Delta Lake table
select addr_state, sum(`count`) as loans from loan_by_state_delta group by addr_state

addr_state,loans
AZ,346
SC,188
LA,165
MN,266
NJ,547
DC,30
OR,169
VA,426
RI,51
WY,28


###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) UPDATE Support
The data was originally supposed to be assigned to `WA` state, so let's `UPDATE` those values

In [30]:
%sql
-- Attempting to run `UPDATE` on the Parquet table
UPDATE loan_by_state_pq SET `count` = 2700 WHERE addr_state = 'WA'

**Note**: This command fails because the `UPDATE` statements are not supported in Parquet, but are supported in Delta Lake.

In [32]:
%sql
-- Running `UPDATE` on the Delta Lake table
UPDATE loan_by_state_delta SET `count` = 2700 WHERE addr_state = 'WA'

In [33]:
%sql
-- Review current loans within the `loan_by_state_delta` Delta Lake table
select addr_state, sum(`count`) as loans from loan_by_state_delta group by addr_state

addr_state,loans
AZ,346
SC,188
LA,165
MN,266
NJ,547
DC,30
OR,169
VA,426
RI,51
WY,28


###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) MERGE INTO Support

#### INSERT or UPDATE parquet: 7-step process

With a legacy data pipeline, to insert or update a table, you must:
1. Identify the new rows to be inserted
2. Identify the rows that will be replaced (i.e. updated)
3. Identify all of the rows that are not impacted by the insert or update
4. Create a new temp based on all three insert statements
5. Delete the original table (and all of those associated files)
6. "Rename" the temp table back to the original table name
7. Drop the temp table

![](https://pages.databricks.com/rs/094-YMS-629/images/merge-into-legacy.gif)


#### INSERT or UPDATE with Delta Lake

2-step process: 
1. Identify rows to insert or update
2. Use `MERGE`

In [35]:
# Let's create a simple table to merge
items = [('IA', 0), ('CA', 2500), ('OR', 0)]
cols = ['addr_state', 'count']
merge_table = spark.createDataFrame(items, cols)
merge_table.createOrReplaceTempView("merge_table")
display(merge_table)

addr_state,count
IA,0
CA,2500
OR,0


Instead of writing separate `INSERT` and `UPDATE` statements, we can use a `MERGE` statement.

In [37]:
%sql
MERGE INTO loan_by_state_delta as d
USING merge_table as m
on d.addr_state = m.addr_state
WHEN MATCHED THEN 
  UPDATE SET *
WHEN NOT MATCHED 
  THEN INSERT *

In [38]:
%sql
-- Review current loans within the `loan_by_state_delta` Delta Lake table
select addr_state, sum(`count`) as loans from loan_by_state_delta group by addr_state

addr_state,loans
AZ,346
SC,188
LA,165
MN,266
NJ,547
DC,30
OR,0
VA,426
RI,51
WY,28


##![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Schema Evolution
With the `mergeSchema` option, you can evolve your Delta Lake table schema

In [40]:
# Generate new loans with dollar amounts 
loans = sql("select addr_state, cast(rand(10)*count as bigint) as count, cast(rand(10) * 10000 * count as double) as amount from loan_by_state_delta")
display(loans)

addr_state,count,amount
IA,0,0.0
OR,0,0.0
CA,2257,22579252.88779557
WA,254,2546155.3806338133
ND,8,80515.06108006819
DC,16,167077.38406570532
DE,17,179187.89063810013
LA,34,342649.9075232373
MA,335,3350171.792278135
ME,11,111440.9319180662


In [41]:
# Let's write this data out to our Delta table
loans.write.format("delta").mode("append").save(DELTALAKE_SILVER_PATH)

**Note**: This command fails because the schema of our new data does not match the schema of our original data

In [43]:
# Add the mergeSchema option
loans.write.option("mergeSchema","true").format("delta").mode("append").save(DELTALAKE_SILVER_PATH)

**Note**: With the `mergeSchema` option, we can merge these different schemas together.

In [45]:
%sql
-- Review current loans within the `loan_by_state_delta` Delta Lake table
select addr_state, sum(`amount`) as amount from loan_by_state_delta group by addr_state order by sum(`amount`) desc limit 10

addr_state,amount
CA,22579252.88779557
NY,7854835.336431534
NJ,4949527.228699277
FL,4408661.055850041
OH,3999871.348691913
NC,3764625.986915014
MA,3350171.792278135
MI,2566760.5722063584
WA,2546155.3806338133
TX,2436079.621248425


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Let's Travel back in Time!
Databricks Delta’s time travel capabilities simplify building data pipelines for the following use cases. 

* Audit Data Changes
* Reproduce experiments & reports
* Rollbacks

As you write into a Delta table or directory, every operation is automatically versioned.

You can query by:
1. Using a timestamp
1. Using a version number

using Python, Scala, and/or Scala syntax; for these examples we will use the SQL syntax.  

For more information, refer to [Introducing Delta Time Travel for Large Scale Data Lakes](https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html)

### ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Review Delta Lake Table History
All the transactions for this table are stored within this table including the initial set of insertions, update, delete, merge, and inserts with schema modification

In [48]:
%sql
DESCRIBE HISTORY loan_by_state_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend
10,2019-06-10T14:21:52.000+0000,100599,denny.lee@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3148195),0610-133810-masts50,9.0,WriteSerializable,False
9,2019-06-10T14:21:28.000+0000,100599,denny.lee@databricks.com,MERGE,Map(predicate -> (d.`addr_state` = m.`addr_state`)),,List(3148195),0610-133810-masts50,8.0,WriteSerializable,False
8,2019-06-10T14:21:07.000+0000,100599,denny.lee@databricks.com,UPDATE,Map(predicate -> (addr_state#10851 = WA)),,List(3148195),0610-133810-masts50,7.0,WriteSerializable,False
7,2019-06-10T14:20:51.000+0000,100599,denny.lee@databricks.com,DELETE,"Map(predicate -> [""(default.loan_by_state_delta.`addr_state` = 'IA')""])",,List(3148195),0610-133810-masts50,6.0,WriteSerializable,False
6,2019-06-10T14:07:52.000+0000,100599,denny.lee@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3148195),0610-133810-masts50,5.0,WriteSerializable,True
5,2019-06-10T14:07:34.000+0000,100599,denny.lee@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3148195),0610-133810-masts50,4.0,WriteSerializable,True
4,2019-06-10T14:07:15.000+0000,100599,denny.lee@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3148195),0610-133810-masts50,3.0,WriteSerializable,True
3,2019-06-10T14:06:57.000+0000,100599,denny.lee@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3148195),0610-133810-masts50,2.0,WriteSerializable,True
2,2019-06-10T14:06:40.000+0000,100599,denny.lee@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3148195),0610-133810-masts50,1.0,WriteSerializable,True
1,2019-06-10T14:06:23.000+0000,100599,denny.lee@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3148195),0610-133810-masts50,0.0,WriteSerializable,True


### ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Time Travel via Version Number
Below are SQL syntax examples of Delta Time Travel by using a Version Number

In [50]:
%sql
SELECT * FROM loan_by_state_delta VERSION AS OF 0

addr_state,count
DC,30
DE,36
LA,165
MA,350
ME,15
NV,222
MI,383
ID,14
KS,124
AL,188


In [51]:
%sql
SELECT * FROM loan_by_state_delta VERSION AS OF 9

addr_state,count
IA,0
OR,0
CA,2500
WA,2700
ND,21
DC,30
DE,36
LA,165
MA,350
ME,15
