In [None]:
from pyspark.sql.session import SparkSession
maven_coords = [
    "org.apache.spark:spark-avro_2.12:3.2.1",
    "io.delta:delta-core_2.12:2.3.0"
]
spark = (SparkSession.builder.appName("MyDelta_App")
    .config("spark.jars.packages", ",".join(maven_coords))  
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.executor.memory", "512m")
    .config("spark.driver.memory", "1g")
    .config('spark.ui.port', '4040')
    .getOrCreate()
    )
spark

In [None]:
from pyspark.sql.types import *
from datetime import datetime

schema = StructType(
          [StructField("name", StringType(), False),
           StructField("dob", DateType(), False)])

df = spark.createDataFrame([["ash", datetime.strptime("2020-01-01", "%Y-%m-%d")]], schema = schema)

df.write.mode("overwrite").format("delta").saveAsTable("delta_table")

In [None]:
spark.sql("select * from delta_table").show()

In [None]:
import delta

In [None]:
from delta.tables import *

schema = StructType(
          [StructField("new_name", StringType(), False),
           StructField("new_dob", DateType(), False)
          ]
        )
new_df = spark.createDataFrame([["ash", datetime.strptime("2025-01-01", "%Y-%m-%d")]], schema = schema)

print("Original dataframe...")
spark.sql("select * from delta_table").show()

delta_table_ref = DeltaTable.forName(spark, "delta_table")
delta_table_ref.merge(new_df, "name=new_name") \
  .whenMatchedUpdate(set =
    {
      "dob": "new_dob"
    }
    ) \
  .execute()

print("after merge...")
spark.sql("select * from delta_table").show()

In [None]:
schema = StructType(
          [StructField("name", StringType(), False),
           StructField("dob", DateType(), False)
          ]        )
df = spark.createDataFrame([["ash", datetime.strptime("2010-01-01", "%Y-%m-%d")]], schema = schema)

new_df = spark.createDataFrame([["ash", datetime.strptime("2026-01-01", "%Y-%m-%d")],
                                ["mat", datetime.strptime("9926-01-01", "%Y-%m-%d")]], schema = schema)

df.write.mode("overwrite").format("delta").saveAsTable("my_delta")
new_df.write.mode("overwrite").format("delta").saveAsTable("updates_to_delta")

print("Original dataframe...")
spark.sql("select * from my_delta").show()

spark.sql("""Merge into my_delta
              using updates_to_delta
              on my_delta.name = updates_to_delta.name
              when matched then
                update set
                  dob = updates_to_delta.dob
              when not matched then
                insert (name, dob)  values (updates_to_delta.name, updates_to_delta.dob)

            """)
print("after merging using sql api ...")
spark.sql("select * from my_delta").show()