In [20]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F
import delta as delta

In [21]:
builder = SparkSession.builder.appName("test001") \
    .master("local") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = delta.configure_spark_with_delta_pip(builder).getOrCreate()
sc = SparkContext.getOrCreate()

In [22]:
newJson = '{"Name":"something1","Url":"https://stackoverflow.com","Author":"jangcy","BlogEntries":100,"Caller":"jangcy"}'
df = spark.read.json(sc.parallelize([newJson]))
df.show(truncate=False)

+------+-----------+------+----------+-------------------------+
|Author|BlogEntries|Caller|Name      |Url                      |
+------+-----------+------+----------+-------------------------+
|jangcy|100        |jangcy|something1|https://stackoverflow.com|
+------+-----------+------+----------+-------------------------+



In [23]:
df.write.format("delta").mode("overwrite").save("delta_merge_test01.parquet")

df1 = spark.read.format("delta").load("delta_merge_test01.parquet")
df1.show()

                                                                                

+------+-----------+------+----------+--------------------+
|Author|BlogEntries|Caller|      Name|                 Url|
+------+-----------+------+----------+--------------------+
|jangcy|        100|jangcy|something1|https://stackover...|
+------+-----------+------+----------+--------------------+



In [24]:
newJson = '{"Name":"something","Url":"https://stackoverflow.com","Author":"jangcy","BlogEntries":100,"Caller":"jangcy"}'
df2 = spark.read.json(sc.parallelize([newJson]))
df2.show(truncate=False)

+------+-----------+------+---------+-------------------------+
|Author|BlogEntries|Caller|Name     |Url                      |
+------+-----------+------+---------+-------------------------+
|jangcy|100        |jangcy|something|https://stackoverflow.com|
+------+-----------+------+---------+-------------------------+



In [25]:
deltaTable = delta.tables.DeltaTable.forPath(spark, "delta_merge_test01.parquet")

deltaTable.alias("deltaTable") \
  .merge(
    df2.alias("df2"),
    "deltaTable.Name = df2.Name") \
  .whenMatchedUpdate(set = { "Name": F.col("deltaTable.Name"),
                           "Author": F.col("deltaTable.Author"),
                           "BlogEntries": F.col("deltaTable.BlogEntries"),
                           "Caller": F.col("deltaTable.Caller"),
                           "Url": F.col("deltaTable.Url")}) \
  .whenNotMatchedInsert(values = { "Name": F.col("df2.Name"),
                           "Author": F.col("df2.Author"),
                           "BlogEntries": F.col("df2.BlogEntries"),
                           "Caller": F.col("df2.Caller"),
                           "Url": F.col("df2.Url")}) \
  .execute()


                                                                                

In [26]:
deltaTable.toDF().show()

+------+-----------+------+----------+--------------------+
|Author|BlogEntries|Caller|      Name|                 Url|
+------+-----------+------+----------+--------------------+
|jangcy|        100|jangcy|something1|https://stackover...|
|jangcy|        100|jangcy| something|https://stackover...|
+------+-----------+------+----------+--------------------+



In [27]:
newJson = '{"Name":"something","Url":"https://stackoverflow.com/something","Author":"jangcy","BlogEntries":111,"Caller":"jangcy"}'
df2 = spark.read.json(sc.parallelize([newJson]))
df2.show(truncate=False)

+------+-----------+------+---------+-----------------------------------+
|Author|BlogEntries|Caller|Name     |Url                                |
+------+-----------+------+---------+-----------------------------------+
|jangcy|111        |jangcy|something|https://stackoverflow.com/something|
+------+-----------+------+---------+-----------------------------------+



In [28]:
deltaTable = delta.tables.DeltaTable.forPath(spark, "delta_merge_test01.parquet")

deltaTable.alias("deltaTable") \
    .merge(
    df2.alias("df2"),
    "deltaTable.Name = df2.Name") \
    .whenMatchedUpdateAll()\
    .whenNotMatchedInsertAll()\
    .execute()


                                                                                

In [29]:
deltaTable.toDF().show()

+------+-----------+------+----------+--------------------+
|Author|BlogEntries|Caller|      Name|                 Url|
+------+-----------+------+----------+--------------------+
|jangcy|        111|jangcy| something|https://stackover...|
|jangcy|        100|jangcy|something1|https://stackover...|
+------+-----------+------+----------+--------------------+



In [30]:
deltaTable = delta.tables.DeltaTable.forPath(spark, "delta_merge_test01.parquet")
deltaTable.toDF().show()

+------+-----------+------+----------+--------------------+
|Author|BlogEntries|Caller|      Name|                 Url|
+------+-----------+------+----------+--------------------+
|jangcy|        111|jangcy| something|https://stackover...|
|jangcy|        100|jangcy|something1|https://stackover...|
+------+-----------+------+----------+--------------------+



In [31]:
df = spark.read.format("delta").option("versionAsOf", 0).load("delta_merge_test01.parquet")
df.show()

                                                                                

+------+-----------+------+----------+--------------------+
|Author|BlogEntries|Caller|      Name|                 Url|
+------+-----------+------+----------+--------------------+
|jangcy|        100|jangcy|something1|https://stackover...|
+------+-----------+------+----------+--------------------+





In [32]:
df = spark.read.format("delta").option("versionAsOf", 1).load("delta_merge_test01.parquet")
df.show()

                                                                                

+------+-----------+------+----------+--------------------+
|Author|BlogEntries|Caller|      Name|                 Url|
+------+-----------+------+----------+--------------------+
|jangcy|        100|jangcy|something1|https://stackover...|
|jangcy|        100|jangcy| something|https://stackover...|
+------+-----------+------+----------+--------------------+



In [33]:
df = spark.read.format("delta").option("versionAsOf", 2).load("delta_merge_test01.parquet")
df.show()

+------+-----------+------+----------+--------------------+
|Author|BlogEntries|Caller|      Name|                 Url|
+------+-----------+------+----------+--------------------+
|jangcy|        111|jangcy| something|https://stackover...|
|jangcy|        100|jangcy|something1|https://stackover...|
+------+-----------+------+----------+--------------------+

