# Ensuring Consistency with ACID Transactions with Delta Lake (Loan Risk Data)

<img src="https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-logo-whitebackground.png" width=200/>

This is a companion notebook to provide a Delta Lake example against the Lending Club data.
* This notebook has been tested with *DBR 9 ML, Python 3*
* For the bottom sections of this notebook, you will need `mlflow` and `yellowbrick` installed on your cluster as well

## The Data

The data used is public data from Lending Club. It includes all funded loans from 2012 to 2017. Each loan includes applicant information provided by the applicant as well as the current loan status (Current, Late, Fully Paid, etc.) and latest payment information. For a full view of the data please view the data dictionary available [here](https://resources.lendingclub.com/LCDataDictionary.xlsx).


![Loan_Data](https://preview.ibb.co/d3tQ4R/Screen_Shot_2018_02_02_at_11_21_51_PM.png)

https://www.kaggle.com/wendykan/lending-club-loan-data

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Delta Lake

Optimization Layer a top blob storage for Reliability (i.e. ACID compliance) and Low Latency of Streaming + Batch data pipelines.

## Import Data and create pre-Delta Lake Table
* This will create a lot of small Parquet files emulating the typical small file problem that occurs with streaming or highly transactional data

In [0]:
# -----------------------------------------------
# Uncomment and run if this folder does not exist
# -----------------------------------------------
# Configure location of loanstats_2012_2017.parquet
lspq_path = "/databricks-datasets/samples/lending_club/parquet/"

# Read loanstats_2012_2017.parquet
data = spark.read.parquet(lspq_path)

# Reduce the amount of data (to run on DBCE)
(loan_stats, loan_stats_rest) = data.randomSplit([0.01, 0.99], seed=123)

# Select only the columns needed
loan_stats = loan_stats.select("addr_state", "loan_status")

# Create loan by state
loan_by_state = loan_stats.groupBy("addr_state").count()

# Create table
loan_by_state.createOrReplaceTempView("loan_by_state")

# Display loans by state
display(loan_by_state)

addr_state,count
AZ,355
SC,172
LA,208
MN,254
NJ,553
DC,47
OR,178
VA,414
RI,56
KY,133


Output can only be rendered in Databricks

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Easily Convert Parquet to Delta Lake format
With Delta Lake, you can easily transform your Parquet data into Delta Lake format.

In [0]:
lc_path = "/databricks-datasets/delta-sharing/samples/lending_club/"

In [0]:
df = spark.read.format("delta").load(lc_path)
display(df)

id,member_id,loan_amnt,funded_amnt,funded_amnt_inv,term,int_rate,installment,grade,sub_grade,emp_title,emp_length,home_ownership,annual_inc,verification_status,loan_status,pymnt_plan,url,desc,purpose,title,zip_code,addr_state,dti,delinq_2yrs,earliest_cr_line,inq_last_6mths,mths_since_last_delinq,mths_since_last_record,open_acc,pub_rec,revol_bal,revol_util,total_acc,initial_list_status,out_prncp,out_prncp_inv,total_pymnt,total_pymnt_inv,total_rec_prncp,total_rec_int,total_rec_late_fee,recoveries,collection_recovery_fee,last_pymnt_d,last_pymnt_amnt,next_pymnt_d,last_credit_pull_d,collections_12_mths_ex_med,mths_since_last_major_derog,policy_code,application_type,annual_inc_joint,dti_joint,verification_status_joint,acc_now_delinq,tot_coll_amt,tot_cur_bal,open_acc_6m,open_il_6m,open_il_12m,open_il_24m,mths_since_rcnt_il,total_bal_il,il_util,open_rv_12m,open_rv_24m,max_bal_bc,all_util,total_rev_hi_lim,inq_fi,total_cu_tl,inq_last_12m,acc_open_past_24mths,avg_cur_bal,bc_open_to_buy,bc_util,chargeoff_within_12_mths,delinq_amnt,mo_sin_old_il_acct,mo_sin_old_rev_tl_op,mo_sin_rcnt_rev_tl_op,mo_sin_rcnt_tl,mort_acc,mths_since_recent_bc,mths_since_recent_bc_dlq,mths_since_recent_inq,mths_since_recent_revol_delinq,num_accts_ever_120_pd,num_actv_bc_tl,num_actv_rev_tl,num_bc_sats,num_bc_tl,num_il_tl,num_op_rev_tl,num_rev_accts,num_rev_tl_bal_gt_0,num_sats,num_tl_120dpd_2m,num_tl_30dpd,num_tl_90g_dpd_24m,num_tl_op_past_12m,pct_tl_nvr_dlq,percent_bc_gt_75,pub_rec_bankruptcies,tax_liens,tot_hi_cred_lim,total_bal_ex_mort,total_bc_limit,total_il_high_credit_limit,revol_bal_joint,sec_app_earliest_cr_line,sec_app_inq_last_6mths,sec_app_mort_acc,sec_app_open_acc,sec_app_revol_util,sec_app_open_il_6m,sec_app_num_rev_accts,sec_app_chargeoff_within_12_mths,sec_app_collections_12_mths_ex_med,sec_app_mths_since_last_major_derog,hardship_flag,hardship_type,hardship_reason,hardship_status,deferral_term,hardship_amount,hardship_start_date,hardship_end_date,payment_plan_start_date,hardship_length,hardship_dpd,hardship_loan_status,orig_projected_additional_accrued_interest,hardship_payoff_balance_amount,hardship_last_payment_amount,issue_d
,,35000.0,35000,35000.0,36 months,17.27%,1252.56,D,D2,Owner,10+ years,RENT,107000.0,Verified,Current,n,,,debt_consolidation,Debt consolidation,900xx,CA,21.88,0.0,Mar-1996,0,,47.0,9,2,65692,92.7%,20.0,w,21634.04,21634.04,19973.8,19973.8,13365.96,6607.84,0.0,0.0,0.0,Aug-2017,1252.56,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,86057,0,2,2,2,9.0,20365,88.0,0,0,24914,92,70900,0,12,0,2,10757,2844.0,94.1,0.0,0,157.0,240,27,9,0,41.0,,18.0,,0,2,3,4,6,9,7,11,3,9,0.0,0,0,2,100.0,50.0,0,2,93936,86057,48400,23036,,,,,,,,,,,,N,,,,,,,,,,,,,,,Mar-2016
,,8000.0,8000,8000.0,36 months,18.25%,290.23,D,D3,Manager,9 years,RENT,50000.0,Source Verified,Late (31-120 days),n,,,debt_consolidation,Debt consolidation,907xx,CA,22.0,0.0,Oct-2009,0,,,15,0,10227,43%,25.0,w,5392.03,5392.03,4047.0,4047.0,2607.97,1439.03,0.0,0.0,0.0,Jun-2017,290.23,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,33913,1,2,1,2,12.0,23686,78.0,1,5,4010,63,23800,2,2,5,7,2422,0.0,101.1,0.0,0,30.0,77,6,6,0,6.0,,2.0,,0,3,7,4,5,4,13,19,7,15,0.0,0,0,2,100.0,100.0,0,0,54092,33913,6500,30292,,,,,,,,,,,,N,,,,,,,,,,,,,,,Mar-2016
,,5000.0,5000,5000.0,36 months,6.97%,154.32,A,A3,,,OWN,25000.0,Source Verified,Current,n,,,major_purchase,Major purchase,179xx,PA,3.31,0.0,Aug-1988,0,,,3,0,2246,34.6%,12.0,w,2905.82,2905.82,2465.25,2465.25,2094.18,371.07,0.0,0.0,0.0,Aug-2017,154.32,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,2246,0,0,0,0,61.0,0,,0,0,1199,35,6500,0,0,1,0,749,3301.0,26.6,0.0,0,128.0,331,108,61,2,108.0,,9.0,,0,1,2,2,4,2,3,8,2,3,0.0,0,0,0,100.0,50.0,0,0,6500,2246,4500,0,,,,,,,,,,,,N,,,,,,,,,,,,,,,Mar-2016
,,10000.0,10000,10000.0,36 months,9.75%,321.5,B,B3,ELECTRICIAN,10+ years,MORTGAGE,105000.0,Not Verified,Current,n,,,debt_consolidation,Debt consolidation,183xx,PA,16.63,3.0,Jun-1995,1,17.0,,14,0,19512,65.5%,32.0,w,5912.64,5912.64,5133.17,5133.17,4087.36,1045.81,0.0,0.0,0.0,Aug-2017,321.5,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,303803,3,2,1,1,10.0,23021,58.0,3,5,3345,61,29800,3,0,1,6,21700,5067.0,67.3,0.0,0,145.0,249,3,3,3,3.0,17.0,4.0,17.0,0,6,11,6,12,11,11,18,11,14,0.0,0,0,4,78.1,66.7,0,0,340397,42533,15500,39561,,,,,,,,,,,,N,,,,,,,,,,,,,,,Mar-2016
,,24000.0,24000,24000.0,36 months,9.75%,771.6,B,B3,C&C MACHINE OPERATOR,5 years,MORTGAGE,50000.0,Verified,Current,n,,,debt_consolidation,Debt consolidation,377xx,TN,24.46,0.0,Aug-1993,0,,,8,0,15702,54.1%,14.0,w,14190.34,14190.34,12351.31,12351.31,9809.66,2541.65,0.0,0.0,0.0,Aug-2017,771.6,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,93535,0,1,0,1,18.0,30356,,0,1,6995,54,29000,0,0,0,4,13362,10125.0,49.4,0.0,0,18.0,271,24,18,2,24.0,,18.0,,0,4,5,4,8,1,6,11,5,8,0.0,0,0,0,100.0,25.0,0,0,113715,46058,20000,36715,,,,,,,,,,,,N,,,,,,,,,,,,,,,Mar-2016
,,9600.0,9600,9600.0,36 months,9.75%,308.64,B,B3,unit leader,10+ years,MORTGAGE,120000.0,Not Verified,In Grace Period,n,,,home_improvement,Home improvement,835xx,ID,13.11,1.0,Sep-1999,1,15.0,,16,0,5375,18.2%,35.0,w,5939.59,5939.59,4656.31,4656.31,3660.41,995.9,0.0,0.0,0.0,Aug-2017,308.64,Sep-2017,Aug-2017,0,59.0,1,INDIVIDUAL,,,,0,453,165800,2,3,2,3,7.0,26846,62.0,1,2,2437,44,29500,2,14,4,6,10363,14649.0,25.3,0.0,0,138.0,198,0,0,4,41.0,,0.0,,1,3,4,6,6,17,11,14,4,16,0.0,0,0,4,91.2,33.3,0,0,207042,32221,19600,43252,,,,,,,,,,,,N,,,,,,,,,,,,,,,Mar-2016
,,13000.0,13000,13000.0,60 months,8.39%,266.03,B,B1,Registered Nurse,2 years,RENT,66000.0,Source Verified,Current,n,,,credit_card,Credit card refinancing,606xx,IL,10.62,0.0,Sep-2003,0,34.0,,12,0,11424,55.7%,19.0,w,10045.95,10045.95,4289.8,4289.8,2954.05,1335.75,0.0,0.0,0.0,Aug-2017,266.03,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,77071,1,6,0,2,14.0,65596,113.0,1,1,5314,86,20500,0,0,0,3,7006,9076.0,55.7,0.0,0,150.0,115,2,2,0,2.0,,15.0,,0,4,4,4,7,9,4,7,4,11,0.0,0,0,1,94.4,25.0,0,0,71768,77071,20500,50941,,,,,,,,,,,,N,,,,,,,,,,,,,,,Mar-2016
,,9000.0,9000,9000.0,36 months,9.16%,286.87,B,B2,Specialty scheduler,9 years,RENT,60000.0,Source Verified,Current,n,,,debt_consolidation,Debt consolidation,914xx,CA,21.12,0.0,Jul-2003,0,26.0,53.0,10,1,7397,53.2%,22.0,w,5055.74,5055.74,4853.89,4853.89,3944.26,909.63,0.0,0.0,0.0,Aug-2017,286.87,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,49144,1,2,1,2,4.0,41747,75.0,0,1,3036,60,13900,0,2,1,3,4914,3612.0,61.2,0.0,0,128.0,152,24,4,0,24.0,,4.0,26.0,0,2,5,2,2,5,8,17,5,10,0.0,0,0,1,94.7,50.0,1,0,57373,49144,9300,43473,,,,,,,,,,,,N,,,,,,,,,,,,,,,Mar-2016
,,18000.0,18000,18000.0,36 months,12.99%,606.41,C,C2,Lock and Dam Operator,< 1 year,RENT,90000.0,Source Verified,Current,n,,,debt_consolidation,Debt consolidation,721xx,AR,5.92,0.0,Feb-2008,1,,,10,0,13895,53.9%,17.0,w,8407.1,8407.1,11782.85,11782.85,9592.9,2189.95,0.0,0.0,0.0,Aug-2017,606.41,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,19315,2,1,0,0,27.0,5420,64.0,5,6,4765,56,25800,0,0,2,6,1932,4302.0,66.9,0.0,0,97.0,54,4,4,0,4.0,,5.0,,0,4,6,6,7,6,9,11,6,10,0.0,0,0,5,100.0,50.0,0,0,34300,19315,13000,8500,,,,,,,,,,,,N,,,,,,,,,,,,,,,Mar-2016
,,16000.0,16000,16000.0,36 months,5.32%,481.84,A,A1,Security specialist,8 years,RENT,105000.0,Not Verified,Fully Paid,n,,,debt_consolidation,Debt consolidation,206xx,MD,15.02,1.0,Nov-2000,0,9.0,,9,0,6219,27.9%,21.0,w,0.0,0.0,16098.34,16098.34,16000.0,98.34,0.0,0.0,0.0,May-2016,16107.8,,Nov-2016,0,,1,INDIVIDUAL,,,,0,0,23525,0,3,2,2,8.0,17306,69.0,0,1,3770,50,22300,1,3,1,3,3361,13632.0,29.7,0.0,0,124.0,184,22,8,0,52.0,,8.0,9.0,0,2,3,2,5,7,6,14,3,9,0.0,0,0,2,95.2,0.0,0,0,47543,23525,19400,25243,,,,,,,,,,,,N,,,,,,,,,,,,,,,Mar-2016


Output can only be rendered in Databricks

In [0]:
# -----------------------------------------------
# Uncomment and run if this folder does not exist
# -----------------------------------------------
# Configure location of loanstats_2012_2017.parquet

# Read loanstats_2012_2017.delta
data = spark.read.format("delta").load(lc_path)

# Reduce the amount of data (to run on DBCE)
#(loan_stats, loan_stats_rest) = data.randomSplit([0.01, 0.99], seed=123)

# Select only the columns needed
loan_stats = data.select("addr_state", "loan_status")

# Create loan by state
loan_by_state = data.groupBy("addr_state").count()

# Create table
loan_by_state.createOrReplaceTempView("loan_by_state")

# Display loans by state
display(loan_by_state)

addr_state,count
AZ,34706
SC,18102
LA,17385
MN,26664
NJ,54569
DC,3616
OR,17545
VA,42115
RI,6507
KY,14360


Output can only be rendered in Databricks

In [0]:
%sql
select * from loan_by_state


addr_state,count
SC,18102
AZ,34706
LA,17385
MN,26664
NJ,54569
DC,3616
OR,17545
VA,42115
,18
RI,6507


In [0]:
%sql 
-- Current example is creating a new table instead of in-place import so will need to change this code
--DROP TABLE IF EXISTS loan_by_state;

CREATE TABLE loan_by_state
USING delta
LOCATION '/tmp/loan_by_state'
AS SELECT * FROM loan_by_state;

-- View Delta Lake table
--SELECT * FROM loan_by_state_delta

num_affected_rows,num_inserted_rows


In [0]:
%sql 
DESCRIBE DETAIL delta.`/tmp/loan_by_state`

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,eccaf333-023a-4391-a91d-d9a66d17dba2,,,dbfs:/tmp/loan_by_state,2022-03-22T02:25:33.780+0000,2022-03-22T02:25:39.000+0000,List(),1,1374,Map(),1,2


In [0]:
%sql
describe detail loan_by_state

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,eccaf333-023a-4391-a91d-d9a66d17dba2,default.loan_by_state,,dbfs:/tmp/loan_by_state,2022-03-22T02:25:33.780+0000,2022-03-22T02:25:39.000+0000,List(),1,1374,Map(),1,2


Output can only be rendered in Databricks

In [0]:
%sql
SELECT * FROM loan_by_state

addr_state,count
AZ,34706
SC,18102
LA,17385
MN,26664
NJ,54569
DC,3616
OR,17545
VA,42115
RI,6507
KY,14360


Output can only be rendered in Databricks

## Stop the notebook before the streaming cell, in case of a "run all"

In [0]:
# dbutils.notebook.exit("stop") 

stop

In [0]:
%fs ls /tmp/loan_by_state/_delta_log/

path,name,size,modificationTime
dbfs:/tmp/loan_by_state/_delta_log/.s3-optimization-0,.s3-optimization-0,0,1647915949000
dbfs:/tmp/loan_by_state/_delta_log/.s3-optimization-1,.s3-optimization-1,0,1647915949000
dbfs:/tmp/loan_by_state/_delta_log/.s3-optimization-2,.s3-optimization-2,0,1647915949000
dbfs:/tmp/loan_by_state/_delta_log/00000000000000000000.crc,00000000000000000000.crc,520,1647915949000
dbfs:/tmp/loan_by_state/_delta_log/00000000000000000000.json,00000000000000000000.json,1391,1647915939000


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Unified Batch and Streaming Source and Sink

These cells showcase streaming and batch concurrent queries (inserts and reads)
* This notebook will run an `INSERT` every 10s against our `loan_stats_delta` table
* We will run two streaming queries concurrently against this data
* Note, you can also use `writeStream` but this version is easier to run in DBCE

In [0]:
# Configure Delta Lake Silver Path
DELTALAKE_SILVER_PATH = "/tmp/loan_by_state"

# Remove folder if it exists
#dbutils.fs.rm(DELTALAKE_SILVER_PATH, recurse=True)

In [0]:
# Read the insertion of data
loan_by_state_readStream = spark.readStream.format("delta").load(DELTALAKE_SILVER_PATH)
loan_by_state_readStream.createOrReplaceTempView("loan_by_state_readStream")

In [0]:
%sql
select addr_state, sum(`count`) as loans from loan_by_state_readStream group by addr_state

addr_state,loans
SC,18102
AZ,34706
LA,17385
MN,26664
NJ,54569
DC,3616
OR,17545
,18
VA,42115
RI,6507


**Wait** until the stream is up and running before executing the code below

In [0]:
%fs ls /ml/loan_by_state_delta/_delta_log/

path,name,size,modificationTime
dbfs:/ml/loan_by_state_delta/_delta_log/.s3-optimization-0,.s3-optimization-0,0,1646624744000
dbfs:/ml/loan_by_state_delta/_delta_log/.s3-optimization-1,.s3-optimization-1,0,1646624744000
dbfs:/ml/loan_by_state_delta/_delta_log/.s3-optimization-2,.s3-optimization-2,0,1646624744000
dbfs:/ml/loan_by_state_delta/_delta_log/00000000000000000000.crc,00000000000000000000.crc,520,1646624744000
dbfs:/ml/loan_by_state_delta/_delta_log/00000000000000000000.json,00000000000000000000.json,1371,1646624740000
dbfs:/ml/loan_by_state_delta/_delta_log/00000000000000000001.crc,00000000000000000001.crc,520,1646624887000
dbfs:/ml/loan_by_state_delta/_delta_log/00000000000000000001.json,00000000000000000001.json,890,1646624886000
dbfs:/ml/loan_by_state_delta/_delta_log/00000000000000000002.crc,00000000000000000002.crc,520,1646624893000
dbfs:/ml/loan_by_state_delta/_delta_log/00000000000000000002.json,00000000000000000002.json,890,1646624892000
dbfs:/ml/loan_by_state_delta/_delta_log/00000000000000000003.crc,00000000000000000003.crc,520,1646624899000


**Note**: Once the previous cell is finished and the state of Iowa is fully populated in the map (in cell 14), click *Cancel* in Cell 14 to stop the `readStream`.

Let's review our current set of loans using our map visualization.

In [0]:
%sql
-- Review current loans within the `loan_by_state_delta` Delta Lake table
select addr_state, sum(`count`) as loans from loan_by_state group by addr_state

addr_state,loans
AZ,34706
SC,18102
LA,17385
MN,26664
NJ,54569
DC,3616
OR,17545
VA,42115
RI,6507
KY,14360


###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) UPDATE Support
The data was originally supposed to be assigned to `WA` state, so let's `UPDATE` those values

In [0]:
%sql
-- Attempting to run `UPDATE` on the Parquet table

UPDATE loan_by_state SET `count` = 2700 WHERE addr_state = 'WA'

num_affected_rows
1


**Note**: This command fails because the `UPDATE` statements are not supported in Parquet, but are supported in Delta Lake.

In [0]:
%sql
-- Running `UPDATE` on the Delta Lake table
UPDATE loan_by_state_delta SET `count` = 2700 WHERE addr_state = 'WA'

In [0]:
%sql
-- Review current loans within the `loan_by_state_delta` Delta Lake table
select addr_state, sum(`count`) as loans from loan_by_state group by addr_state

addr_state,loans
AZ,34706
SC,18102
LA,17385
MN,26664
NJ,54569
DC,3616
OR,17545
VA,42115
,18
RI,6507


###![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) MERGE INTO Support

#### INSERT or UPDATE parquet: 7-step process

With a legacy data pipeline, to insert or update a table, you must:
1. Identify the new rows to be inserted
2. Identify the rows that will be replaced (i.e. updated)
3. Identify all of the rows that are not impacted by the insert or update
4. Create a new temp based on all three insert statements
5. Delete the original table (and all of those associated files)
6. "Rename" the temp table back to the original table name
7. Drop the temp table

![](https://pages.databricks.com/rs/094-YMS-629/images/merge-into-legacy.gif)


#### INSERT or UPDATE with Delta Lake

2-step process: 
1. Identify rows to insert or update
2. Use `MERGE`

In [0]:
# Let's create a simple table to merge
items = [('IA', 10), ('CA', 2500), ('OR', None)]
cols = ['addr_state', 'count']
merge_table = spark.createDataFrame(items, cols)
merge_table.createOrReplaceTempView("merge_table")
display(merge_table)

addr_state,count
IA,10.0
CA,2500.0
OR,


Instead of writing separate `INSERT` and `UPDATE` statements, we can use a `MERGE` statement.

In [0]:
%sql
MERGE INTO loan_by_state as d
USING merge_table as m
on d.addr_state = m.addr_state
WHEN MATCHED THEN 
  UPDATE SET *
WHEN NOT MATCHED 
  THEN INSERT *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
3,3,0,0


In [0]:
%sql
-- Review current loans within the `loan_by_state_delta` Delta Lake table
select addr_state, sum(`count`) as loans from loan_by_state group by addr_state

addr_state,loans
AZ,34706.0
SC,18102.0
LA,17385.0
MN,26664.0
NJ,54569.0
DC,3616.0
OR,
VA,42115.0
,18.0
RI,6507.0


##![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Schema Evolution
With the `mergeSchema` option, you can evolve your Delta Lake table schema

In [0]:
# Generate new loans with dollar amounts 
loans = sql("select addr_state, cast(rand(10)*count as bigint) as count, cast(rand(10) * 10000 * count as double) as amount from loan_by_state")
display(loans)

addr_state,count,amount
,3.0,30770.948483200224
531xx,0.0,8051.143958005459
AK,2087.0,20874195.03379271
AL,17418.0,174188711.94415113
AR,2344.0,23449495.7488624
AZ,12724.0,127246851.0178497
CA,2019.0,20196720.445929702
CO,22220.0,222209771.94801
CT,16506.0,165060768.49106088
DC,1133.0,11330841.699721044


In [0]:
DELTALAKE_SILVER_PATH = "/tmp/loan_by_state"

In [0]:
# Let's write this data out to our Delta table
loans.write.format("delta").mode("append").save(DELTALAKE_SILVER_PATH)

**Note**: This command fails because the schema of our new data does not match the schema of our original data

In [0]:
# Add the mergeSchema option
loans.write.option("mergeSchema","true").format("delta").mode("append").save(DELTALAKE_SILVER_PATH)

**Note**: With the `mergeSchema` option, we can merge these different schemas together.

In [0]:
%sql
-- Review current loans within the `loan_by_state_delta` Delta Lake table
select addr_state, sum(`amount`) as amount from loan_by_state group by addr_state order by sum(`amount`) desc limit 10

addr_state,amount
TX,847941523.4848284
NY,698358273.6874249
IL,579232740.3374095
NJ,381633992.0641493
OH,302165265.1672292
NC,300479541.96542174
PA,287234973.72512925
MI,266547209.8092273
CO,222209771.94801
MA,211616117.0529003


## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Let's Travel back in Time!
Databricks Delta’s time travel capabilities simplify building data pipelines for the following use cases. 

* Audit Data Changes
* Reproduce experiments & reports
* Rollbacks

As you write into a Delta table or directory, every operation is automatically versioned.

You can query by:
1. Using a timestamp
1. Using a version number

using Python, Scala, and/or Scala syntax; for these examples we will use the SQL syntax.  

For more information, refer to [Introducing Delta Time Travel for Large Scale Data Lakes](https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html)

### ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Review Delta Lake Table History
All the transactions for this table are stored within this table including the initial set of insertions, update, delete, merge, and inserts with schema modification

In [0]:
%sql
DESCRIBE HISTORY loan_by_state

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
3,2022-03-22T04:27:18.000+0000,100708,vini.jaiswal@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(12411746),0804-214524-boded127,2.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 1973, numOutputRows -> 54)",
2,2022-03-22T04:24:48.000+0000,100708,vini.jaiswal@databricks.com,MERGE,"Map(predicate -> (d.`addr_state` = m.`addr_state`), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(12411746),0804-214524-boded127,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 51, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, executionTimeMs -> 1914, numTargetRowsInserted -> 0, scanTimeMs -> 991, numTargetRowsUpdated -> 3, numOutputRows -> 54, numTargetChangeFilesAdded -> 0, numSourceRows -> 3, numTargetFilesRemoved -> 1, rewriteTimeMs -> 764)",
1,2022-03-22T04:24:03.000+0000,100708,vini.jaiswal@databricks.com,UPDATE,Map(predicate -> (addr_state#17416 = WA)),,List(12411746),0804-214524-boded127,0.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 53, numAddedChangeFiles -> 0, executionTimeMs -> 1363, scanTimeMs -> 952, numAddedFiles -> 1, numUpdatedRows -> 1, rewriteTimeMs -> 411)",
0,2022-03-22T02:25:39.000+0000,100708,vini.jaiswal@databricks.com,CREATE TABLE AS SELECT,"Map(isManaged -> false, description -> null, partitionBy -> [], properties -> {})",,List(12411746),0318-151752-abed99,,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 54, numOutputBytes -> 1374)",


### ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Time Travel via Version Number
Below are SQL syntax examples of Delta Time Travel by using a Version Number

In [0]:
%sql
SELECT * FROM loan_by_state VERSION AS OF 0

addr_state,count
AZ,34706
SC,18102
LA,17385
MN,26664
NJ,54569
DC,3616
OR,17545
VA,42115
RI,6507
KY,14360


In [0]:
%sql
SELECT * FROM loan_by_state VERSION AS OF 2

addr_state,count
,18.0
531xx,1.0
AK,3614.0
AL,18382.0
AR,11200.0
AZ,34706.0
CA,2500.0
CO,31143.0
CT,22940.0
DC,3616.0


## Run Our Model
Let's run a simple linear regression model to predict the number of loans based on the population of the state
* The following shell statements downloads the us_census_2020 data that we will join with the `loan_by_state_delta` table

In [0]:
%sh mkdir -p /dbfs/tmp/data-council/census/ && wget -O /dbfs/tmp/data-council/census/us_census_2010.csv https://pages.databricks.com/rs/094-YMS-629/images/us_census_2010.csv && ls -al /dbfs/tmp/data-council/census/

In [0]:
dbutils.fs.cp("file:/dbfs/tmp/data-council/census", "dbfs:/tmp/sais_eu_19_demo/census/", recurse=True)

### Notes
If you forgot to install `mlflow` and `yellowbrick` on your cluster, instead of re-running everything all over again:
* Note that the Delta Lake table is stored in `DELTALAKE_SILVERPATH` or `/ml/loan_by_state_delta`
* You can add the libraries, restart the cluster and then start reading the data from the following cells (instead of rerunning everything all over again)
* Just uncomment the cell below to reconnect to your Delta Table

In [0]:
%pip install mlflow yellowbrick

In [0]:
# Recreate loan_by_state_delta view
spark.read.format("delta").load("/tmp/loan_by_state").createOrReplaceTempView("loan_by_state")
# Check data
display(spark.sql("select count(1) from loan_by_state"))

count(1)
108


In [0]:
# Include census data
census = spark.read.csv('/tmp/data-council/census/us_census_2010.csv', sep=',', inferSchema=True, header=True)
census.createOrReplaceTempView("census")

# Data versions (0, 2, 3)
dfv0 = spark.sql("select c.Population, l.count as label from (select addr_state as State, count from loan_by_state  version as of 0) l left outer join census c on c.State = l.State")
dfv2 = spark.sql("select c.Population, l.count as label from (select addr_state as State, count from loan_by_state  version as of 2) l left outer join census c on c.State = l.State")
dfv3 = spark.sql("select c.Population, l.count as label from (select addr_state as State, count from loan_by_state  version as of 3 where count is not null) l left outer join census c on c.State = l.State")

In [0]:
# Import MLflow
import mlflow
import mlflow.spark
#print("MLflow Version: %s" % mlflow.__version__)

# Display Residuals (yellowbrick)
def displayResiduals(train, test):
  from sklearn.linear_model import Ridge
  from yellowbrick.regressor import ResidualsPlot

  # define feature columns
  featureColumns = ['Population']

  # Create pandas DataFrames
  pdf_train = train.toPandas()
  pdf_test = test.toPandas()

  # Convert to X, y train and test values
  X_train = pdf_train[['Population']].to_numpy()
  y_train = pdf_train[['label']].to_numpy().flatten()
  X_test = pdf_test[['Population']].to_numpy()
  y_test = pdf_test[['label']].to_numpy().flatten()  
  
  # Instantiate the linear model and visualizer
  ridge = Ridge()
  visualizer = ResidualsPlot(ridge)

  # Visualize
  visualizer.fit(X_train, y_train)  # Fit the training data to the model
  visualizer.score(X_test, y_test)  # Evaluate the model on the test data
  visualizer.show(outpath="ridge-model-residuals.png")
  #fig=visualizer.poof()             # Draw/show/poof the data
  
# Predict Loan Count (based on state population)
def predictLoanCount(df, data_version):
  from pyspark.ml.linalg import Vectors
  from pyspark.ml.feature import VectorAssembler
  from pyspark.ml.regression import LinearRegression
  from pyspark.ml.evaluation import RegressionEvaluator
  
  # assemble vector
  assembler = VectorAssembler(
    inputCols=["Population"],
    outputCol="features")
  output = assembler.transform(df)

  # Log mlflow
  with mlflow.start_run() as run:  
    # Define LinearRegression algorithm
    lr = LinearRegression(maxIter=10, regParam=0.0, elasticNetParam=0.8)
    model = lr.fit(output)  

    # Calculate predictions
    predictions = model.transform(output)

    # calculate RMSE
    evaluator = RegressionEvaluator(metricName="rmse")
    RMSE = evaluator.evaluate(predictions)
    #print("Model: Root Mean Squared Error = " + str(RMSE))

    # Log Parameters
    mlflow.log_param("data version", data_version)
    mlflow.log_metric("RMSE", RMSE)

    # Log Model
    mlflow.spark.log_model(model, "model") 
    
    # Save if not data_version is "v0"
    if (data_version != 'v0'):
      # Log artifacts (output files)
      mlflow.log_artifact("ridge-model-residuals.png")
  
  # return predictions DataFrame
  #return predictions

In [0]:
# Calculate predictions
# Initial version of data (v0)
predictLoanCount(dfv0, 'v0')

# Version 6 (after streaming of Iowa data)
displayResiduals(dfv0, dfv2)
predictLoanCount(dfv2, 'v2')

# Version 9 (after correction of data: update, delete, merge)
displayResiduals(dfv0, dfv3)
predictLoanCount(dfv3, 'v3')

#### Review RMSE and Residuals
* Review the RMSE values via the MLflow Sidebar