In [None]:
#%pip install pyspark==3.4.0 delta-spark==2.4.0

"""
Author: Matt Martin
Date: 2023-09-03
Desc: Scratch pad testing delta lake
"""

In [None]:
#spark.sparkContext.addPyFile("https://repo1.maven.org/maven2/io/delta/delta-core_2.12/0.8.0/delta-core_2.12-0.8.0.jar")


In [None]:
from datetime import datetime
import pyspark
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:
def drop_table_if_exists(tbl_path) -> None:
    
    from delta import DeltaTable

    try:
        # Load the Delta table
        delta_table = DeltaTable.forPath(spark, tbl_path)

        # Delete the Delta table
        delta_table.delete()
    except Exception as e:
        pass

In [2]:
def create_or_replace_delta_table(df, tbl_path) -> None:
    try:
        df.write.format("delta").mode("overwrite").save(tbl_path)
    except Exception as e:
        df.write.format("delta").save(tbl_path)
        

In [12]:
def build_src_table() -> None:
    schema = StructType([
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("hire_date", DateType(), True)
    ])

    data = [
        ("Matt", 20, datetime(2022,8,19)),
        ("Bill", 35, datetime(2023,4,15)),
        ("Nancy", 57, datetime(2022,4,23)),
        ("Rachel", 19, datetime(2021,6,7)),
    ]

    df = spark.createDataFrame(data, schema=schema)
    create_or_replace_delta_table(df, "./test/src_ppl")


In [13]:
def build_tgt_table() -> None:
    schema = StructType([
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("hire_date", DateType(), True)
    ])

    data = [
        ("Matt", 18, datetime(2022,8,19)),
        ("Bill", 22, datetime(2022,9,4)),
    ]

    df = spark.createDataFrame(data, schema=schema)
    create_or_replace_delta_table(df, "./test/tgt_ppl")

In [14]:
## build source and target
build_src_table()
build_tgt_table()

## sample

## merge target

## sample target again

In [15]:
src_df = spark.read.format("delta").load("./test/src_ppl")
tgt_df = spark.read.format("delta").load("./test/tgt_ppl")

src_df.createTempView("src_tbl")
tgt_df.createTempView("tgt_tbl")

print('source dataset')
src_df.show()
print('target_dataset')
tgt_df.show()

source dataset
+------+---+----------+
|  name|age| hire_date|
+------+---+----------+
|Rachel| 19|2021-06-07|
| Nancy| 57|2022-04-23|
|  Bill| 35|2023-04-15|
|  Matt| 20|2022-08-19|
+------+---+----------+

target_dataset
+----+---+----------+
|name|age| hire_date|
+----+---+----------+
|Bill| 22|2022-09-04|
|Matt| 18|2022-08-19|
+----+---+----------+



In [16]:
### now do a merge and then reevaluate the target file
sql = """
    MERGE into tgt_tbl as tgt
        using src_tbl as src
            on tgt.name = src.name
        when matched then update
            set tgt.age = src.age, tgt.hire_date = src.hire_date
        when not matched then
            insert (name, age, hire_date)
            values (src.name, src.age, src.hire_date)
"""
spark.sql(sql)

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [17]:
tgt_df.show()

+------+---+----------+
|  name|age| hire_date|
+------+---+----------+
|  Bill| 35|2023-04-15|
|  Matt| 20|2022-08-19|
| Nancy| 57|2022-04-23|
|Rachel| 19|2021-06-07|
+------+---+----------+



In [18]:
spark.sql("Update tgt_tbl set age = 20 where name = 'Rachel'")
tgt_df.show()

+------+---+----------+
|  name|age| hire_date|
+------+---+----------+
|  Bill| 35|2023-04-15|
|  Matt| 20|2022-08-19|
| Nancy| 57|2022-04-23|
|Rachel| 20|2021-06-07|
+------+---+----------+



In [30]:
df = spark.read.format("delta").load("./test/tgt_ppl")
df.show()

+------+---+----------+
|  name|age| hire_date|
+------+---+----------+
|  Bill| 35|2023-04-15|
|  Matt| 20|2022-08-19|
| Nancy| 57|2022-04-23|
|Rachel| 20|2021-06-07|
+------+---+----------+



In [20]:
#write perm results back out 
create_or_replace_delta_table(df, "./test/tgt_ppl")

In [31]:
df.createTempView("persons")
spark.sql("SELECT * FROM persons").show()

+------+---+----------+
|  name|age| hire_date|
+------+---+----------+
|  Bill| 35|2023-04-15|
|  Matt| 20|2022-08-19|
| Nancy| 57|2022-04-23|
|Rachel| 20|2021-06-07|
+------+---+----------+



In [None]:
### you can do direct writes to delta tables without having to put in a df first

sql = """
    MERGE INTO delta.`./test/tgt_ppl` AS TGT
        USING delta.`./test/src_ppl` as SRC
            ON TGT.id = SRC.id
        WHEN NOT MATCHED THEN 
            INSERT (id)
            VALUES (SRC.id)
"""
spark.sql(sql)

spark.sql("CREATE OR REPLACE TABLE persons_copy USING delta location './test/persons_copy' AS SELECT * FROM persons")

spark.sql("CREATE TABLE test2_ppl USING delta LOCATION './test/tgt_ppl' AS SELECT * FROM t1")

spark.sql("UPDATE delta.`./test/tgt_ppl` set id = 12 where id = 7")

spark.sql("delete from delta.`./test/tgt_ppl` where id in (6,8)")