####
1. Write data to delta lake (managed table)
2. Write data to delta lake (external table)
3. Read data from delta lake (Table)
4. Read data from delta lake (File)


In [0]:
%sql
CREATE DATABASE IF NOT EXISTS f1_demo
LOCATION '/mnt/formula1ltedl/demo'

In [0]:
result_df = spark.read.option("inferSchema",  True).json("/mnt/formula1ltedl/raw/2021-03-28/results.json")

In [0]:
result_df.write.format("delta").mode("overwrite").saveAsTable("f1_demo.results_managed")

In [0]:
%sql
select * from f1_demo.results_managed

In [0]:
result_df.write.format("delta").mode("overwrite").save("/mnt/formula1ltedl/demo/results_externals")

In [0]:
%sql 
CREATE TABLE f1_demo.results_external
USING DELTA
LOCATION '/mnt/formula1ltedl/demo/results_externals'

In [0]:
results_external_df=spark.read.format("delta").load("/mnt/formula1ltedl/demo/results_externals")

In [0]:
result_df.write.format("delta").mode("overwrite").partitionBy("constructorId").saveAsTable("f1_demo.results_partitionned")

####
1. Update Delta Table
2. Delete from Delta Table

In [0]:
%sql
select * from f1_demo.results_managed

In [0]:
%sql 
UPDATE f1_demo.results_managed
SET points = 11 - position
where position<=10

In [0]:
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "/mnt/formula1ltedl/demo/results_managed")
deltaTable.update("position <= 10" , {"points": "21 - position"})


In [0]:
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "/mnt/formula1ltedl/demo/results_managed")
deltaTable.delete("position > 10")

In [0]:
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "/mnt/formula1ltedl/demo/results_managed")
deltaTable.delete("points = 0")

In [0]:
%sql
select * from f1_demo.results_managed


#####
Upsert Using Merge

In [0]:
# Drivers Day 1
drivers_day1_df = spark.read \
    .option("inferSchema",  True) \
        .json("/mnt/formula1ltedl/raw/2021-03-28/drivers.json") \
            .filter("driverId <= 10") \
                .select("driverId", "dob", "name.forename", "name.surname")

# Drivers Day 2
from pyspark.sql.functions import upper
drivers_day2_df = spark.read \
    .option("inferSchema",  True) \
        .json("/mnt/formula1ltedl/raw/2021-03-28/drivers.json") \
            .filter("driverId Between 6 and 15") \
                .select("driverId", "dob", upper("name.forename").alias("forename"), upper("name.surname").alias("surname"))

# Drivers Day 3
from pyspark.sql.functions import upper
drivers_day3_df = spark.read \
    .option("inferSchema",  True) \
        .json("/mnt/formula1ltedl/raw/2021-03-28/drivers.json") \
            .filter("driverId Between 1 and 5 OR driverId Between 16 and 20") \
                .select("driverId", "dob", upper("name.forename").alias("forename"), upper("name.surname").alias("surname"))



In [0]:
# Create tempView for the 3 dataframes
drivers_day1_df.createOrReplaceTempView("drivers_day1")
drivers_day2_df.createOrReplaceTempView("drivers_day2")

In [0]:
%sql
CREATE TABLE IF NOT EXISTS f1_demo.drivers_merge (
driverId INT, 
dob DATE, 
forename STRING,
surname STRING,
createdDate DATE,
updatedDate DATE)
USING DELTA

In [0]:
%sql
/*day1*/
MERGE into f1_demo.drivers_merge a
USING drivers_day1 b
on a.driverId = b.driverId
When matched THEN
  UPDATE SET a.dob = b.dob,
             a.forename =b.forename,
             a.surname = b.surname,
             a.updatedDate = current_timestamp
WHEN NOT MATCHED THEN
  Insert (driverId,dob, forename,surname,createdDate) VALUES (driverId,dob, forename,surname,current_timestamp)

In [0]:
%sql
/*day2*/
MERGE into f1_demo.drivers_merge a
USING drivers_day2 b
on a.driverId = b.driverId
When matched THEN
  UPDATE SET a.dob = b.dob,
             a.forename =b.forename,
             a.surname = b.surname,
             a.updatedDate = current_timestamp
WHEN NOT MATCHED THEN
  Insert (driverId,dob, forename,surname,createdDate) VALUES (driverId,dob, forename,surname,current_timestamp)

In [0]:
#from delta.tables import DeltaTable

#deltaTable = DeltaTable.forPath(spark, "/mnt/formula1ltedl/demo/drivers_merge")
#deltaTable.delete()

In [0]:
# day 3 merge with pyspark

from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp

deltaTable= deltaTable.forPath(spark, "/mnt/formula1ltedl/demo/drivers_merge")

deltaTable.alias("tgt").merge(drivers_day3_df.alias("upd"), "tgt.driverId = upd.driverId") \
        .whenMatchedUpdate(set = {"dob": "upd.dob", "forename": "upd.forename", "surname": "upd.surname", "updatedDate": "current_timestamp()"}) \
        .whenNotMatchedInsert (values={
                "driverId": "upd.driverId",
                "dob": "upd.dob",
                "forename": "upd.forename",
                "surname": "upd.surname",
                "createdDate": "current_timestamp()"
                }
                ) \
        .execute()

1. History & versionning
2. Time travel
3. Vaccum

In [0]:
%sql
desc history f1_demo.drivers_merge

In [0]:
%sql
select * from f1_demo.drivers_merge timestamp as of '2024-06-01T13:49:23.000+00:00';

In [0]:
df = spark.read.format("delta").option("timestampAsOf", '2024-06-01T14:08:07.000+00:00').load("/mnt/formula1ltedl/demo/drivers_merge")

In [0]:
%sql
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM f1_demo.drivers_merge RETAIN 0 HOURS

In [0]:
%sql
delete from f1_demo.drivers_merge where driverId=1;

In [0]:
%sql
desc history f1_demo.drivers_merge

In [0]:
%sql
select * from  f1_demo.drivers_merge version as of 10;

In [0]:
%sql
merge into f1_demo.drivers_merge as a using f1_demo.drivers_merge version as of 10 b on (a.driverId=b.driverId)
when not matched then 
insert *

#### DeltaLake Transaction Log

In [0]:
%sql
CREATE TABLE IF NOT EXISTS f1_demo.drivers_txn (
driverId INT, 
dob DATE, 
forename STRING,
surname STRING,
createdDate DATE,
updatedDate DATE)
USING DELTA

In [0]:
%sql
desc history f1_demo.drivers_txn

In [0]:
%sql
insert into f1_demo.drivers_txn
select * from f1_demo.drivers_merge
where driverId=2;

In [0]:
%sql
delete from f1_demo.drivers_txn
where driverId=1;

In [0]:
for driver_id in range(3, 20):
    spark.sql(f"""insert into f1_demo.drivers_txn
                  select * from f1_demo.drivers_merge where driverId = {driver_id}""")

### Convert Parquet to Delta

In [0]:
%sql
CREATE TABLE IF NOT EXISTS f1_demo.drivers_convert_to_delta (
driverId INT, 
dob DATE, 
forename STRING,
surname STRING,
createdDate DATE,
updatedDate DATE)
USING PARQUET

In [0]:
%sql
insert into f1_demo.drivers_convert_to_delta 
select * from f1_demo.drivers_merge
;

In [0]:
%sql 
CONVERT TO DELTA f1_demo.drivers_convert_to_delta 

In [0]:
df = spark.table("f1_demo.drivers_convert_to_delta")
df.write.format("parquet").save("/mnt/formula1ltedl/demo/drivers_convert_to_delta_new")

In [0]:
%sql 
CONVERT TO DELTA parquet.`/mnt/formula1ltedl/demo/drivers_convert_to_delta_new`