In [0]:
%sql
CREATE TABLE delta.`/tmp/deltain-table` USING DELTA AS SELECT col1 as id FROM VALUES 200,300,400,500,600;

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM delta.`/tmp/deltain-table`;

id
200
300
400
500
600


In [0]:
df = spark.read.format("delta").load("/tmp/deltain-table")
df.show()

+---+
| id|
+---+
|200|
|300|
|400|
|500|
|600|
+---+



In [0]:
# 2nd example

data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

In [0]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  3|
|  4|
|  0|
|  1|
|  2|
+---+



In [0]:
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")

In [0]:
data.show()


+---+
| id|
+---+
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [0]:
# Update Without overwrite

from delta.tables import *
from pyspark.sql.functions import *

In [0]:
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

In [0]:
deltaTable.toDF().show()

+---+
| id|
+---+
|  8|
|  9|
|  5|
|  6|
|  7|
+---+



In [0]:
# update values

deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
deltaTable.toDF().show()

+---+
| id|
+---+
|  5|
|  7|
|  9|
+---+



+---+
| id|
+---+
|  5|
|106|
|  7|
|108|
|  9|
+---+



In [0]:
# delete values

deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF().show()

+---+
| id|
+---+
|  5|
|  7|
|  9|
+---+



In [0]:
# merge
newData = spark.range(0, 20)
newData.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



In [0]:
deltaTable.alias("oldData") .merge(newData.alias("newData"),"oldData.id = newData.id") \
.whenMatchedUpdate(set = { "id": col("newData.id") }) \
.whenNotMatchedInsert(values = { "id": col("newData.id") }) \
.execute()

In [0]:
deltaTable.toDF().show()

+---+
| id|
+---+
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [0]:
streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

In [0]:
stream.stop()