In [0]:
%sql
CREATE TABLE delta.`/tmp/deltain-table` USING DELTA AS SELECT col1 as id FROM VALUES 200,300,400,500,600;

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM delta.`/tmp/deltain-table`;

id
200
300
400
500
600


In [0]:
%sql
INSERT OVERWRITE delta.`/tmp/deltain-table` SELECT col1 as id FROM VALUES 5,6,7,8,9;

num_affected_rows,num_inserted_rows
5,5


In [0]:
%sql
SELECT * FROM delta.`/tmp/deltain-table` VERSION AS OF 0;

id
200
300
400
500
600


In [0]:
# python
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

In [0]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [0]:
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")

In [0]:
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [0]:
streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

In [0]:
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

In [0]:
# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
deltaTable.toDF().show()

+---+
| id|
+---+
|137|
| 11|
| 29|
| 61|
| 99|
| 89|
| 93|
|  7|
| 53|
|131|
|143|
| 71|
|123|
| 87|
|119|
|101|
| 77|
|111|
| 43|
| 59|
+---+
only showing top 20 rows



In [0]:
# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF().show()

+---+
| id|
+---+
|137|
| 11|
| 29|
| 61|
| 99|
| 89|
| 93|
|  7|
| 53|
|131|
|143|
| 71|
|123|
| 87|
|119|
|101|
| 77|
|111|
| 43|
| 59|
+---+
only showing top 20 rows



In [0]:
# Upsert (merge) new data
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

+---+
| id|
+---+
| 21|
| 23|
| 25|
| 27|
| 29|
| 31|
| 33|
| 35|
| 37|
| 39|
| 41|
| 43|
| 45|
| 47|
| 49|
| 51|
| 53|
| 55|
| 57|
| 59|
+---+
only showing top 20 rows

