### Delta Lakes

1. Write data to delta lake (managed table)
2. Write data to delta lake (external table)
3. Read data from delta lake (Table)
4. Read data from delta lake (File)

https://docs.delta.io/latest/delta-batch.html#-deltadataframewrites

Documentation:
* [Delta.io](https://docs.delta.io/latest/index.html)
* [databricks](https://docs.databricks.com/en/delta/index.html) 
* [Microsoft website](https://learn.microsoft.com/en-us/azure/databricks/delta/)


### Write & Read  to Delta Lake


In [0]:
%sql
CREATE DATABASE IF NOT EXISTS f1_demo
LOCATION '/mnt/formula1dl/demo'

In [0]:
results_df = spark.read \
.option("inferSchema", True) \
.json("/mnt/formula1dl/raw/2021-03-28/results.json")

##### Managed Table
* Parquet vs Delta

In [0]:
results_df.write.format("delta").mode("overwrite").saveAsTable("f1_demo.results_managed")

In [0]:
%sql
SELECT * FROM f1_demo.results_managed;

##### PySpark: External Delta Lake Table
* changing `delta"` vs `parquet`
* PySpark

In [0]:
results_df.write.format("delta").mode("overwrite").save("/mnt/formula1dl/demo/results_external")

##### SQL: External Delta Table
* changing `delta"` vs `parquet`


In [0]:
%sql
CREATE TABLE f1_demo.results_external
USING DELTA
LOCATION '/mnt/formula1dl/demo/results_external'

In [0]:
%sql
SELECT * FROM f1_demo.results_external

In [0]:
results_external_df = spark.read.format("delta").load("/mnt/formula1dl/demo/results_external")

In [0]:
display(results_external_df)

In [0]:
results_df.write.format("delta").mode("overwrite").partitionBy("constructorId").saveAsTable("f1_demo.results_partitioned")

In [0]:
%sql
SHOW PARTITIONS f1_demo.results_partitioned

### Update / Delete from Delta Table
Documentation
1. [Update Delta Table](https://docs.delta.io/latest/delta-update.html#update-a-table)
2. [Delete From Delta Table](https://docs.delta.io/latest/delta-update.html#delete-from-a-table)

In [0]:
%sql
SELECT * FROM f1_demo.results_managed;

##### SQL: Update Delta Table

In [0]:
%sql
UPDATE f1_demo.results_managed
  SET points = 11 - position
WHERE position <= 10

In [0]:
%sql
SELECT * FROM f1_demo.results_managed;

##### PySpark: Update Delta Table

In [0]:
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "/mnt/formula1dl/demo/results_managed")

deltaTable.update("position <= 10", { "points": "21 - position" } ) 

In [0]:
%sql
SELECT * FROM f1_demo.results_managed;

##### SQL: Delete from Delta Table

In [0]:
%sql
DELETE FROM f1_demo.results_managed
WHERE position > 10;

In [0]:
%sql
SELECT * FROM f1_demo.results_managed;

##### PySpark: Delete from Delta Table

In [0]:
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "/mnt/formula1dl/demo/results_managed")

deltaTable.delete("points = 0") 

In [0]:
%sql
SELECT * FROM f1_demo.results_managed;

### Upsert in Delta Table using merge

Documentation:
* [upsert](https://docs.delta.io/latest/delta-update.html#-delta-merge)

Merge does all 3 of these in one statement:
* insert
* update
* delete

upsert has conditions!

```
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    ...
WHEN NOT MATCHED
  THEN INSERT (
    id,
    ...
  )
  VALUES (
    people10mupdates.id,
    ...
  )
```

##### Day 1
* simulates day one of datafram
* 10 records (1-10)

In [0]:
drivers_day1_df = spark.read \
.option("inferSchema", True) \
.json("/mnt/formula1dl/raw/2021-03-28/drivers.json") \
.filter("driverId <= 10") \
.select("driverId", "dob", "name.forename", "name.surname")

In [0]:
display(drivers_day1_df)

##### Day 2
* simulates day two
* 10 records (6-15)
* 6-10 updated
* 11-15 are inserted

In [0]:
from pyspark.sql.functions import upper

drivers_day2_df = spark.read \
.option("inferSchema", True) \
.json("/mnt/formula1dl/raw/2021-03-28/drivers.json") \
.filter("driverId BETWEEN 6 AND 15") \
.select("driverId", "dob", upper("name.forename").alias("forename"), upper("name.surname").alias("surname"))

In [0]:
drivers_day1_df.createOrReplaceTempView("drivers_day1")

In [0]:
drivers_day2_df.createOrReplaceTempView("drivers_day2")

In [0]:
display(drivers_day2_df)

##### Day 3
* simulates day three of data
* 10 records (1-5 & 16-20)
* 1-5 updated
* 16-20 are inserted

In [0]:
from pyspark.sql.functions import upper

drivers_day3_df = spark.read \
.option("inferSchema", True) \
.json("/mnt/formula1dl/raw/2021-03-28/drivers.json") \
.filter("driverId BETWEEN 1 AND 5 OR driverId BETWEEN 16 AND 20") \
.select("driverId", "dob", upper("name.forename").alias("forename"), upper("name.surname").alias("surname"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-14570012583986>:3[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01msql[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfunctions[39;00m [38;5;28;01mimport[39;00m upper
[0;32m----> 3[0m drivers_day3_df [38;5;241m=[39m spark[38;5;241m.[39mread \
[1;32m      4[0m [38;5;241m.[39moption([38;5;124m"[39m[38;5;124minferSchema[39m[38;5;124m"[39m, [38;5;28;01mTrue[39;00m) \
[1;32m      5[0m [38;5;241m.[39mjson([38;5;124m"[39m[38;5;124m/mnt/formula1dl/raw/2021-03-28/drivers.json[39m[38;5;124m"[39m) \
[1;32m      6[0m [38;5;241m.[39mfilter([38;5;124m"[39m[38;5;124mdriverId BETWEEN 1 AND 5 OR driverId BETWEEN 16 AND 20[39m[38;5;124m"[39m) \
[1;32m      7[0m [38;5;241m.[39mselect([38;5;124m"[39m[38;5;124m

##### NOTE
* Delta is default for databricks version 8 and above
* createdDate DATE: only new records 
* updatedDate DATE: only 

In [0]:
%sql
CREATE TABLE IF NOT EXISTS f1_demo.drivers_merge (
driverId INT,
dob DATE,
forename STRING, 
surname STRING,
createdDate DATE, 
updatedDate DATE
)
USING DELTA


### Day1
* tgt is alias for drivers_merge
* upd alias for drivers_day1
* 

Key functionality is the time updates tgt.updatedDate vs createdDate
```
WHEN MATCHED THEN
  UPDATE SET ...,
             tgt.updatedDate = current_timestamp
WHEN NOT MATCHED
  THEN INSERT (...,createdDate ) VALUES (..., current_timestamp)

```

In [0]:
%sql
MERGE INTO f1_demo.drivers_merge tgt
USING drivers_day1 upd
ON tgt.driverId = upd.driverId
WHEN MATCHED THEN
  UPDATE SET tgt.dob = upd.dob,
             tgt.forename = upd.forename,
             tgt.surname = upd.surname,
             tgt.updatedDate = current_timestamp
WHEN NOT MATCHED
  THEN INSERT (driverId, dob, forename,surname,createdDate ) VALUES (driverId, dob, forename,surname, current_timestamp)

### Notes
* should get 10 records
* updatedDate should be null

In [0]:
%sql SELECT * FROM f1_demo.drivers_merge;

### Day 2

%md
### Notes
* should get 10 records
* 1-5, 11-15 updatedDate should be null
* 6-10 should ahve values in updatedDate and createdDate

In [0]:
%sql
MERGE INTO f1_demo.drivers_merge tgt
USING drivers_day2 upd
ON tgt.driverId = upd.driverId
WHEN MATCHED THEN
  UPDATE SET tgt.dob = upd.dob,
             tgt.forename = upd.forename,
             tgt.surname = upd.surname,
             tgt.updatedDate = current_timestamp
WHEN NOT MATCHED
  THEN INSERT (driverId, dob, forename,surname,createdDate ) VALUES (driverId, dob, forename,surname, current_timestamp)

In [0]:
%sql SELECT * FROM f1_demo.drivers_merge;

### Day 3
* uses pyspark
* uses delta table api `deltaTable.alias("tgt")`
* alias tgt
* uses day 3 dataframe: `drivers_day3_df.alias("upd")`
* must import `current_timestamp`
* updates 1-5 (dates in both updatedDate and createdDate)
* inserts 16-20 (null in updatedDate)

key functionality:

```
  .whenMatchedUpdate(set = ...., "updatedDate": "current_timestamp()" } ) \
  .whenNotMatchedInsert(values =
    {..., 
      "createdDate": "current_timestamp()"
```

In [0]:
from pyspark.sql.functions import current_timestamp
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "/mnt/formula1dl/demo/drivers_merge")

deltaTable.alias("tgt").merge(
    drivers_day3_df.alias("upd"),
    "tgt.driverId = upd.driverId") \
  .whenMatchedUpdate(set = { "dob" : "upd.dob", "forename" : "upd.forename", "surname" : "upd.surname", "updatedDate": "current_timestamp()" } ) \
  .whenNotMatchedInsert(values =
    {
      "driverId": "upd.driverId",
      "dob": "upd.dob",
      "forename" : "upd.forename", 
      "surname" : "upd.surname", 
      "createdDate": "current_timestamp()"
    }
  ) \
  .execute()

In [0]:
%sql SELECT * FROM f1_demo.drivers_merge;

### Advanced Features: History & Versions
1. History & Versioning
2. Time Travel - Query based on time or versions
3. Vaccum

In [0]:
%sql
DESC HISTORY f1_demo.drivers_merge

##### Version information
* 1: shows 10 records
* 2: shows 15 records 

In [0]:
%sql
SELECT * FROM f1_demo.drivers_merge VERSION AS OF 2;

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-14570012583999>:7[0m
[1;32m      5[0m     display(df)
[1;32m      6[0m     [38;5;28;01mreturn[39;00m df
[0;32m----> 7[0m   _sqldf [38;5;241m=[39m [43m____databricks_percent_sql[49m[43m([49m[43m)[49m
[1;32m      8[0m [38;5;28;01mfinally[39;00m:
[1;32m      9[0m   [38;5;28;01mdel[39;00m ____databricks_percent_sql

File [0;32m<command-14570012583999>:4[0m, in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m [38;5;28;01mdef[39;00m [38;5;21m____databricks_percent_sql[39m():
[1;32m      3[0m   [38;5;28;01mimport[39;00m [38;5;21;01mbase64[39;00m
[0;32m----> 4[0m   df [38;5;241m=[39m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[43mbase64[49m[38;5;241;43m.[39;49m[43mstandard_b64decode[49m[43m([49m[38;5;124;43m"[39;49m

##### SQL: Select data from a specific version / timestamp

In [0]:
%sql
SELECT * FROM f1_demo.drivers_merge TIMESTAMP AS OF '2021-06-23T15:40:33.000+0000';

##### PySpark: Select data from a specific version / timestamp

In [0]:
df = spark.read.format("delta").option("timestampAsOf", '2021-06-23T15:40:33.000+0000').load("/mnt/formula1dl/demo/drivers_merge")

In [0]:
display(df)

### GDPR / Legal Requirement: Removing Old Data
Sometimes there are legal requirements to remove old data.  Either from state, HR or external customers.  The command `VACUUM` handles all of that.

In [0]:
%sql
VACUUM f1_demo.drivers_merge

In [0]:
%sql
SELECT * FROM f1_demo.drivers_merge TIMESTAMP AS OF '2021-06-23T15:40:33.000+0000';

##### Retention Period - Removes History
* set for 

`SET spark.databricks.delta.retentionDurationCheck.enabled = false;` - This line is setting a configuration in the Spark session. It disables the retention duration check for Delta tables. In Databricks, Delta tables have a property that prevents the VACUUM command from removing files that are needed for older versions of the table. Disabling this check allows you to run VACUUM with a retention period less than the default (7 days).

`VACUUM f1_demo.drivers_merge RETAIN 0 HOURS;` - The VACUUM command is used to clean up files in a Delta table that are no longer needed by the table. The RETAIN 0 HOURS part specifies that you want to remove all data files not needed for the current version of the table, regardless of how old they are. This can be risky because it means you won't be able to revert to older versions of the table or query the historical data.

In [0]:
%sql
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM f1_demo.drivers_merge RETAIN 0 HOURS

In [0]:
%sql
SELECT * FROM f1_demo.drivers_merge TIMESTAMP AS OF '2021-06-23T15:40:33.000+0000';

##### Null History
* all history should be null

In [0]:
%sql
SELECT * FROM f1_demo.drivers_merge

In [0]:
%sql
DESC HISTORY f1_demo.drivers_merge;

##### Restoring Records after Accidental Deletion

* Delete driverId =1 or Lewis Hamilton
* Show it exists in previous database

In [0]:
%sql
DELETE FROM f1_demo.drivers_merge WHERE driverId = 1;

In [0]:
%sql 
SELECT * FROM f1_demo.drivers_merge VERSION AS OF 3;

#### Restoring Records: 
* f1_demo.drivers_merge VERSION AS OF 3 src

In [0]:
%sql
MERGE INTO f1_demo.drivers_merge tgt
USING f1_demo.drivers_merge VERSION AS OF 3 src
   ON (tgt.driverId = src.driverId)
WHEN NOT MATCHED THEN
   INSERT *

In [0]:
%sql DESC HISTORY f1_demo.drivers_merge

##### Lewis Hamilton's records should be restored

In [0]:
%sql
SELECT * FROM f1_demo.drivers_merge

### Transaction Logs / Delta Logs
* Hive: nameof tble, attributes, 
* transaction logs: not in HIV due to efficiency reasons
* json created for every change (checkpoint at 10)
* Kept for 30 days

In [0]:
%sql
CREATE TABLE IF NOT EXISTS f1_demo.drivers_txn (
driverId INT,
dob DATE,
forename STRING, 
surname STRING,
createdDate DATE, 
updatedDate DATE
)
USING DELTA

In [0]:
%sql
DESC HISTORY f1_demo.drivers_txn

##### NOTES
* you should see a new file on data late / delta lake called: `demo/_delta_log`

The folder `demo/_delta_log` contains:
* crc
* json : contains history, 1 json per parquet file version

The folder `demo/` contains:
* snappy.parquet

In [0]:
%fs ls demo/_delta_log

In [0]:
%sql
INSERT INTO f1_demo.drivers_txn
SELECT * FROM f1_demo.drivers_merge
WHERE driverId = 1;

In [0]:
%sql
DESC HISTORY f1_demo.drivers_txn

In [0]:
%sql
INSERT INTO f1_demo.drivers_txn
SELECT * FROM f1_demo.drivers_merge
WHERE driverId = 2;

In [0]:
##### Tbis action adds another JSON & Parquet

In [0]:
%sql
DELETE FROM  f1_demo.drivers_txn
WHERE driverId = 1;

##### Checkpoint_parquet
* A new json is created for each insert up to 10 times
* After 10 transactiong there is a checkoint_parquet
* max file reads will be 9 Json's and 1 checkpoint


In [0]:
for driver_id in range(3, 20):
  spark.sql(f"""INSERT INTO f1_demo.drivers_txn
                SELECT * FROM f1_demo.drivers_merge
                WHERE driverId = {driver_id}""")

In [0]:
%sql
INSERT INTO f1_demo.drivers_txn
SELECT * FROM f1_demo.drivers_merge;

### Convert Parquet to Delta

##### Creates Parquet
* FOLDER: `demo/drivers_convert_to_delta`
* Empty
* No logs


In [0]:
%sql
CREATE TABLE IF NOT EXISTS f1_demo.drivers_convert_to_delta (
driverId INT,
dob DATE,
forename STRING, 
surname STRING,
createdDate DATE, 
updatedDate DATE
)
USING PARQUET

###### Insert data into Table
* FOLDER: `demo/drivers_convert_to_delta`

Files created
* started
* Success
* parquet


In [0]:
%sql
INSERT INTO f1_demo.drivers_convert_to_delta
SELECT * FROM f1_demo.drivers_merge

###### Convert to Delta Table
* FOLDER: `demo/drivers_convert_to_delta/_delta_log`

Files created
* json
* checkpoint_parquet


In [0]:
%sql
CONVERT TO DELTA f1_demo.drivers_convert_to_delta

###### PySpark: df into Parquet
* FOLDER: `demo/drivers_convert_to_delta_new`

Files created
* started
* Success
* parquet


In [0]:
df = spark.table("f1_demo.drivers_convert_to_delta")

In [0]:
df.write.format("parquet").save("/mnt/formula1dl/demo/drivers_convert_to_delta_new")

###### Spark SQL: Convert to Delta Table
* FOLDER: `demo/drivers_convert_to_delta/_delta_log`

Files created
* json
* checkpoint_parquet


In [0]:
%sql
CONVERT TO DELTA parquet.`/mnt/formula1dl/demo/drivers_convert_to_delta_new`