In [1]:
import os
from pyspark.sql import SparkSession
import pyspark
from pyspark.sql.functions import explode
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, ArrayType


In [2]:
spark = SparkSession.builder \
        .config("spark.eventLog.enabled", "true") \
        .config("spark.eventLog.dir", "/tmp/spark-events/") \
        .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.1") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .appName("delta-example") \
        .getOrCreate()


In [3]:
from delta.tables import *

In [4]:
spark

In [5]:
data = spark.range(0, 5)
data.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [9]:
path = "/tmp/delta-table"

In [21]:
data.write.format("delta").mode("overwrite").save(path)

In [13]:
df = spark.read.format("delta").load(path)
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [6]:
!ls /tmp/delta-table

_delta_log
part-00000-c1401f89-2c33-4f97-aa47-0bfee59db96b-c000.snappy.parquet
part-00001-71aa7ab8-5f04-4376-82df-5072abee01da-c000.snappy.parquet
part-00003-fa5a319b-98eb-425c-b3c1-d7ce08d61577-c000.snappy.parquet
part-00004-838a7822-d5bd-4e8f-88b7-47f72cd6fa49-c000.snappy.parquet
part-00006-6fab2b44-c734-4e38-a828-19fa97ce88c2-c000.snappy.parquet
part-00007-60447444-2241-4c4c-a854-2be7adac144f-c000.snappy.parquet


In [7]:
!ls /tmp/delta-table/_delta_log

00000000000000000000.json


In [10]:
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(path)

In [16]:
!ls /tmp/delta-table

_delta_log
part-00000-2d5e0830-2b2f-45d8-a325-7007012ddd6a-c000.snappy.parquet
part-00000-58d61cee-ae8d-42e9-a4bd-a8f6f24139bb-c000.snappy.parquet
part-00001-5c5f39a5-3159-4936-8cde-1fd2b90b9289-c000.snappy.parquet
part-00001-8d651d82-eb4c-445d-a888-df3f72dc2c62-c000.snappy.parquet
part-00003-3cb127d7-5caf-4bde-9c83-ebed0fdf2291-c000.snappy.parquet
part-00003-6817bcfb-e769-4fb9-b645-54fd71595058-c000.snappy.parquet
part-00004-10c60768-bf0e-4810-af73-7c90fe913549-c000.snappy.parquet
part-00004-8bc7ea50-b64a-4204-9ee8-39af45edee05-c000.snappy.parquet
part-00006-11141901-a731-4f22-941e-9e0b0610b603-c000.snappy.parquet
part-00006-eb124203-2f72-49b0-a541-80349b2f28c2-c000.snappy.parquet
part-00007-291b8633-507b-4688-8f59-da4e4c005fe9-c000.snappy.parquet
part-00007-f5d36757-af1c-420b-9efd-f327d326dc56-c000.snappy.parquet


In [15]:
!ls /tmp/delta-table/_delta_log

00000000000000000000.json  00000000000000000001.json


In [22]:
df = spark.read.format("delta").load(path)
df.show()

+---+
| id|
+---+
|  7|
|  9|
|  5|
|  6|
|  8|
+---+



In [33]:
from delta.tables import DeltaTable
from pyspark.sql.functions import expr, col


deltaTable = DeltaTable.forPath(spark, path)

# look this is not a dataframe,
# it a utility to handle DeltaTable ACID transactions
deltaTable.toDF().show()

# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })

# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

# Upsert (merge) new data
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

+---+
| id|
+---+
|  7|
|  9|
|  5|
+---+

+---+
| id|
+---+
| 15|
|  2|
| 19|
| 10|
| 11|
|  1|
|  6|
| 18|
|  0|
|  4|
| 14|
| 12|
|  8|
| 17|
| 13|
| 16|
|  7|
|  5|
|  9|
|  3|
+---+



In [40]:
df = spark.read.format("delta").option("versionAsOf", 6).load(path)
df.show()

+---+
| id|
+---+
| 15|
|  2|
| 19|
| 10|
| 11|
|  1|
|  6|
| 18|
|  0|
|  4|
| 14|
| 12|
|  8|
| 17|
| 13|
| 16|
|  7|
|  5|
|  9|
|  3|
+---+



In [19]:
# spark.stop()