# Save Type - append vs overwrite

In [None]:
from pyspark.sql import SparkSession, DataFrame
from common.session import get_spark_session
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = get_spark_session(session_name="HelloWorld")
table_path="/usr/local/work/data/delta_crud1"

columns = ["first_name", "age"]
data = [("bob", 47), ("li", 23), ("leonard", 51)]
df = spark.createDataFrame(data, columns)
df.write.format("delta").option('delta.enableChangeDataFeed', 'true').save(table_path)

In [None]:
columns = ["first_name", "age"]
data = [("anna", 47), ("maria", 23), ("teresa", 51)]
df = spark.createDataFrame(data, columns)

In [None]:
df.write.format("delta").mode('append').save(table_path)

In [5]:
spark.read.format("delta").load(table_path).show()

+----------+---+
|first_name|age|
+----------+---+
|   leonard| 51|
|    teresa| 51|
|     maria| 23|
|      anna| 47|
|       bob| 47|
|        li| 23|
+----------+---+



In [6]:
df.write.format("delta").mode('overwrite').save(table_path)

In [7]:
spark.read.format("delta").load(table_path).show()

+----------+---+
|first_name|age|
+----------+---+
|    teresa| 51|
|     maria| 23|
|      anna| 47|
+----------+---+



# Change Data Feed

In [18]:
spark.read.format("delta").option("versionAsOf", 0).load(table_path).show()

+----------+---+
|first_name|age|
+----------+---+
|   leonard| 51|
|       bob| 47|
|        li| 23|
+----------+---+



In [12]:
spark.read.format("delta").option("readChangeFeed", "true").option("startingVersion", 0).load(table_path).show()

AnalysisException: Error getting change data for range [0 , 3] as change data was not
recorded for version [0]. If you've enabled change data feed on this table,
use `DESCRIBE HISTORY` to see when it was first enabled.
Otherwise, to start recording change data, use `ALTER TABLE table_name SET TBLPROPERTIES
(delta.enableChangeDataFeed=true)`.

# Delta Lake feature - Merge

In [9]:
from delta.tables import *

targetDF = DeltaTable.forPath(spark, table_path)

columns = ["first_name", "age"]
data = [("anna", 22), ("maria", 23), ("tomas", 11)]
df = spark.createDataFrame(data, columns)


(targetDF.alias('target')
  .merge(df.alias('source'), "source.first_name = target.first_name")
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .whenMatchedUpdate(condition='target.age <> source.age', set={'age':'source.age'})
  .execute()
)

                                                                                

In [10]:
spark.read.format("delta").load(table_path).show()

+----------+---+
|first_name|age|
+----------+---+
|      anna| 22|
|     maria| 23|
|     tomas| 11|
+----------+---+

