## How Delta Lake and Apache Spark can be used to build lakehouses
- Lettura e scrittura di tabelle delta usando Apache Spark
- Come Delta Lake permetee scritture in simultanea garantendo ACID
- Costruire pipeline con *update*, *delete*, *merge* garantendo ACID
- Viaggiare nel tempo per interrogare versioni diverse della tabella

## Loading Data into a Delta Lake Table
E' molto semplice passare da una scrittura di file formato Parquet a Delta: basta specificare *.option('delta')* invece che *'parquet'*.

In [0]:
# definisco il percorso del file
sourcePath = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"

# definisco il percorso di arrivo
deltaPath = "/tmp/loans_delta"

# creo la tabella delta con i dati dei prestiti
(spark.read.format("parquet").load(sourcePath)
.write.format("delta").save(deltaPath))

In [0]:
%fs

ls dbfs:/tmp/loans_delta/

path,name,size,modificationTime
dbfs:/tmp/loans_delta/_delta_log/,_delta_log/,0,0
dbfs:/tmp/loans_delta/part-00000-13d5d315-7165-44a8-b643-ab8adfa12ce4-c000.snappy.parquet,part-00000-13d5d315-7165-44a8-b643-ab8adfa12ce4-c000.snappy.parquet,164916,1666384858000


In [0]:
# creo una vista locale sui dati dei prestiti in formato delta cosi' da poterla interrogare in sql
spark.read.format("delta").load(deltaPath).createOrReplaceTempView("loans_delta")

In [0]:
%sql

SELECT count(*) as num_rows
FROM loans_delta

num_rows
14705


In [0]:
%sql

SELECT *
FROM loans_delta
LIMIT 5

loan_id,funded_amnt,paid_amnt,addr_state
0,1000,182.22,CA
1,1000,361.19,WA
2,1000,176.26,TX
3,1000,1000.0,OK
4,1000,249.98,PA


## Enforcing Schema on Write to Prevent Data Corruption
Un problema comune nel gestire dati di diversi formati comuni (JSON, Parquet, etc.) e' quello di scrivere non correttamente il formato del dato. Delta Lake registra a livello di metadati lo schema dei dati, potendo cosi' verificare se il formato dei dati inseriti sia consistente con quello registrato nei metadati della tabella delta di arrivo. Se non sara' compatibile Spark ci dara' un errore, **prima** che qualsiasi record sia stato scritto e committato, prevenendo cosi' *data corruption*.

In [0]:
# situazine attuale del nostro file delta che contiene i dati dei prestiti
df_delta = spark.read.format('delta').load(deltaPath)
display(df_delta.limit(5))
df_delta.printSchema()

# 4 campi: long (int); integer; double; string

loan_id,funded_amnt,paid_amnt,addr_state
0,1000,182.22,CA
1,1000,361.19,WA
2,1000,176.26,TX
3,1000,1000.0,OK
4,1000,249.98,PA


root
 |-- loan_id: long (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- paid_amnt: double (nullable = true)
 |-- addr_state: string (nullable = true)



In [0]:
from pyspark.sql.functions import col

In [0]:
# creo un DataFrame da aggiungere al file delta gia' presente in deltaPath
cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']
items = [(1111111, 1000, 1000.0, 'TX', True)
        ,(2222222, 2000, 0.0, 'CA', False)]

loanUpdates = (spark.createDataFrame(items, cols).withColumn("funded_amnt", col("funded_amnt").cast('int')))

# lo scrivo, ma restituisce errore (giustamente!!)
loanUpdates.write.format("delta").mode("append").save(deltaPath)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-3057555664942535>[0m in [0;36m<cell line: 9>[0;34m()[0m
[1;32m      7[0m [0;34m[0m[0m
[1;32m      8[0m [0;31m# lo scrivo, ma restituisce errore (giustamente!!)[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 9[0;31m [0mloanUpdates[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"delta"[0m[0;34m)[0m[0;34m.[0m[0mmode[0m[0;34m([0m[0;34m"append"[0m[0;34m)[0m[0;34m.[0m[0msave[0m[0;34m([0m[0mdeltaPath[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             

In [0]:
# andiamo a vedere come era fatto il DataFrame vecchio e quello nuovo:
display(df_delta.limit(5))
df_delta.printSchema()
display(loanUpdates)
loanUpdates.printSchema()

loan_id,funded_amnt,paid_amnt,addr_state
0,1000,182.22,CA
1,1000,361.19,WA
2,1000,176.26,TX
3,1000,1000.0,OK
4,1000,249.98,PA


root
 |-- loan_id: long (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- paid_amnt: double (nullable = true)
 |-- addr_state: string (nullable = true)



loan_id,funded_amnt,paid_amnt,addr_state,closed
1111111,1000,1000.0,TX,True
2222222,2000,0.0,CA,False


root
 |-- loan_id: long (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- paid_amnt: double (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- closed: boolean (nullable = true)



L'errore (*To enable schema migration using DataFrameWriter or DataStreamWriter, please set:'.option("mergeSchema", "true")'.*) ci da' anche un suggerimento su quello che dobbiamo fare per evolvere lo schema della tabella!

## Evolving Schemas to Accommodate Changing Data
Aggiungere il campo di prima mancante.

In [0]:
(loanUpdates.write
 .format("delta")
 .mode("append")
 .option("mergeSchema", "true")
 .save(deltaPath))

In [0]:
%fs

ls dbfs:/tmp/loans_delta/

path,name,size,modificationTime
dbfs:/tmp/loans_delta/_delta_log/,_delta_log/,0,0
dbfs:/tmp/loans_delta/part-00000-13d5d315-7165-44a8-b643-ab8adfa12ce4-c000.snappy.parquet,part-00000-13d5d315-7165-44a8-b643-ab8adfa12ce4-c000.snappy.parquet,164916,1666384858000
dbfs:/tmp/loans_delta/part-00003-e725ddf2-0d20-4527-b8f4-b3585c3fd321-c000.snappy.parquet,part-00003-e725ddf2-0d20-4527-b8f4-b3585c3fd321-c000.snappy.parquet,1573,1666384885000
dbfs:/tmp/loans_delta/part-00007-cb2f397d-d39d-4680-8d09-081ef6aaffd5-c000.snappy.parquet,part-00007-cb2f397d-d39d-4680-8d09-081ef6aaffd5-c000.snappy.parquet,1573,1666384885000


In [0]:
df_delta_merged = spark.read.format('delta').load(deltaPath)
display(df_delta_merged)
df_delta_merged.printSchema()

loan_id,funded_amnt,paid_amnt,addr_state,closed
0,1000,182.22,CA,
1,1000,361.19,WA,
2,1000,176.26,TX,
3,1000,1000.0,OK,
4,1000,249.98,PA,
5,1000,408.6,CA,
6,1000,1000.0,MD,
7,1000,168.81,OH,
8,1000,193.64,TX,
9,1000,218.83,CT,


root
 |-- loan_id: long (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- paid_amnt: double (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- closed: boolean (nullable = true)



In [0]:
df_delta_merged.createOrReplaceTempView("loans_delta_merged")

In [0]:
%sql

select *
from loans_delta_merged
where closed is not null

loan_id,funded_amnt,paid_amnt,addr_state,closed
1111111,1000,1000.0,TX,True
2222222,2000,0.0,CA,False


In [0]:
display(df_delta_merged.where(col('closed').isNotNull()))

loan_id,funded_amnt,paid_amnt,addr_state,closed
1111111,1000,1000.0,TX,True
2222222,2000,0.0,CA,False


## Transforming Exisiting Data
Delta Lake supporta i comandi **DDL** UPDATE, DELETE, MERGE che permettono di costruire pipeline complesse. Ognuno di questi comandi di modifica dei dati **garantisce ACID transactions**.

#### Fixing errors: UPDATE
Immaginiamo che tutti i prestiti con *addr_state = 'OR'* dovrebbero invece avere *'WA'*.
Se il file fosse stato Parquet, avremmo dovuto: 
- copiare tutte le righe "giuste" in una nuova tabella
- copiare tutte le righe "sbagliate" in un DataFrame poi fare la modifica
- inserire il DataFrame corretto nella nuova tabella
- droppare la vecchia tabella e rinominare la nuova come la vecchia appena eliminata

In [0]:
# performare il comando update sulla tabella delta

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.update("addr_state = 'OR'", {"addr_state": "'WA'"})

In [0]:
deltaTable_df = deltaTable.toDF()
display(deltaTable_df.where(col('addr_state') == 'OR'))

# non ci sono più record con quell' addr_state (OR)

loan_id,funded_amnt,paid_amnt,addr_state,closed


#### Fixing errors: DELETE
Immaginiamo che sia stato richiesto di eliminare tutti i dati riguardo tutti i prestiti che sono stati completamente pagati.

In [0]:
# performare il comdando delete sulla tabella delta

deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.delete("funded_amnt >= paid_amnt")

In [0]:
deltaTable_df = deltaTable.toDF()
display(deltaTable_df.limit(10))

loan_id,funded_amnt,paid_amnt,addr_state,closed


#### Fixing errors: MERGE
Immaginiamo di avere una tabella con delle nuove informazioni riguardo i prestiti (la *loanUpdates*). Vogliamo che i record della nuova tabella vadano ad aggiornare (**UPDATE**) i record già presenti nella tabella di partenza (la *deltaTable*) attraverso un certo campo (il *loan_id*). Altrimenti, se non trova un match tra il campo *loan_id* della *deltaTable* e il campo *loan_id* della *loanUpdates* allora farà l'**INSERT** del nuovo record.

In [0]:
(deltaTable # in questo momento ha 0 record, perchè prima abbiamo fatto il delete di tutto
.alias("target") 
.merge(loanUpdates.alias("source"), "target.loan_id = source.loan_id")
.whenMatchedUpdateAll() # se matchato l'id allora farà l'update
.whenNotMatchedInsertAll() # se non matchato allora farà l'insert
.execute())

In [0]:
deltaTable.toDF().show() 

# giustamente ci sono 2 righe, quelle dell'insert del merge che arrivano dalla loanUpdates

+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|1111111|       1000|   1000.0|        TX|  true|
|2222222|       2000|      0.0|        CA| false|
+-------+-----------+---------+----------+------+



#### Time Travel
E' possibile leggere versioni precedenti delle tabelle delta, attraverso l'option "versionAsOf" e "timestampAsOf". E' possibile il time travel grazie al file di log dove viene segnato tutto.

In [0]:
%fs

ls dbfs:/tmp/loans_delta/_delta_log 

path,name,size,modificationTime
dbfs:/tmp/loans_delta/_delta_log/00000000000000000000.crc,00000000000000000000.crc,2096,1666384861000
dbfs:/tmp/loans_delta/_delta_log/00000000000000000000.json,00000000000000000000.json,1762,1666384859000
dbfs:/tmp/loans_delta/_delta_log/00000000000000000001.crc,00000000000000000001.crc,2176,1666384887000
dbfs:/tmp/loans_delta/_delta_log/00000000000000000001.json,00000000000000000001.json,2443,1666384885000
dbfs:/tmp/loans_delta/_delta_log/00000000000000000002.crc,00000000000000000002.crc,2176,1666384896000
dbfs:/tmp/loans_delta/_delta_log/00000000000000000002.json,00000000000000000002.json,1677,1666384894000
dbfs:/tmp/loans_delta/_delta_log/00000000000000000003.crc,00000000000000000003.crc,2163,1666384902000
dbfs:/tmp/loans_delta/_delta_log/00000000000000000003.json,00000000000000000003.json,1764,1666384900000
dbfs:/tmp/loans_delta/_delta_log/00000000000000000004.crc,00000000000000000004.crc,2169,1666384906000
dbfs:/tmp/loans_delta/_delta_log/00000000000000000004.json,00000000000000000004.json,2188,1666384905000


In [0]:
# cosi andiamo a vedere le versioni della tabella delta
display(deltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2022-10-21T20:41:45.000+0000,4677173902999840,alessio.tugnoli.8@gmail.com,MERGE,"Map(predicate -> (target.loan_id = source.loan_id), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(2331967270375503),1021-163537-4mtca75w,3.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, executionTimeMs -> 2260, numTargetRowsInserted -> 2, scanTimeMs -> 947, numTargetRowsUpdated -> 0, numOutputRows -> 2, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 0, rewriteTimeMs -> 1164)",,Databricks-Runtime/11.2.x-scala2.12
3,2022-10-21T20:41:40.000+0000,4677173902999840,alessio.tugnoli.8@gmail.com,DELETE,"Map(predicate -> [""(CAST(funded_amnt AS DOUBLE) >= paid_amnt)""])",,List(2331967270375503),1021-163537-4mtca75w,2.0,WriteSerializable,False,"Map(numRemovedFiles -> 3, numCopiedRows -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 1777, numDeletedRows -> 14707, scanTimeMs -> 975, numAddedFiles -> 0, rewriteTimeMs -> 802)",,Databricks-Runtime/11.2.x-scala2.12
2,2022-10-21T20:41:34.000+0000,4677173902999840,alessio.tugnoli.8@gmail.com,UPDATE,Map(predicate -> (addr_state#7809 = OR)),,List(2331967270375503),1021-163537-4mtca75w,1.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numCopiedRows -> 14527, numAddedChangeFiles -> 0, executionTimeMs -> 2373, scanTimeMs -> 1038, numAddedFiles -> 1, numUpdatedRows -> 178, rewriteTimeMs -> 1335)",,Databricks-Runtime/11.2.x-scala2.12
1,2022-10-21T20:41:25.000+0000,4677173902999840,alessio.tugnoli.8@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(2331967270375503),1021-163537-4mtca75w,0.0,WriteSerializable,True,"Map(numFiles -> 2, numOutputRows -> 2, numOutputBytes -> 3146)",,Databricks-Runtime/11.2.x-scala2.12
0,2022-10-21T20:40:59.000+0000,4677173902999840,alessio.tugnoli.8@gmail.com,WRITE,"Map(mode -> ErrorIfExists, partitionBy -> [])",,List(2331967270375503),1021-163537-4mtca75w,,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 14705, numOutputBytes -> 164916)",,Databricks-Runtime/11.2.x-scala2.12


In [0]:
(deltaTable
.history(3)
.select("version", "timestamp", "operation", "operationParameters")
.show(truncate=False))

+-------+-------------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------+
|version|timestamp          |operation|operationParameters                                                                                                                                |
+-------+-------------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------+
|4      |2022-10-21 20:41:45|MERGE    |{predicate -> (target.loan_id = source.loan_id), matchedPredicates -> [{"actionType":"update"}], notMatchedPredicates -> [{"actionType":"insert"}]}|
|3      |2022-10-21 20:41:40|DELETE   |{predicate -> ["(CAST(funded_amnt AS DOUBLE) >= paid_amnt)"]}                                                                                      |
|2      |2022-10-21 20:41:34|UPDATE   |{predicate -> (addr_s

In [0]:
display(spark.read
.format("delta")
.option("timestampAsOf", "2022-10-21 20:41:40") # timestamp after table creation
.load(deltaPath))

# giustamente questa è vuota perchè è la versione che fa riferimento al DELETE

loan_id,funded_amnt,paid_amnt,addr_state,closed


In [0]:
display(spark.read.format("delta")
.option("versionAsOf", "4")
.load(deltaPath))

# questa è la versione del MERGE con la loanUpdates

loan_id,funded_amnt,paid_amnt,addr_state,closed
1111111,1000,1000.0,TX,True
2222222,2000,0.0,CA,False


#### Why this is useful?
- Ripetere esperimenti di Machine Learning con versioni diverse della stessa tabella.
- Confrontare i dati tra 2 tabelle di versioni diverse.
- Roll back, tornare indietro a precedenti versioni della tabella (come DataFrame) e poi sovrascriverla.