In [1]:
import socket
# import delta
from pyspark.sql import DataFrame, DataFrameWriter, SparkSession
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType
from pyspark import SparkConf
# from delta.tables import *

In [2]:
sparkConf = SparkConf().setAppName("ingress_update_delta" + socket.gethostname()).setMaster("local[*]") \
            .set("spark.jars.packages",
                 "io.delta:delta-core_2.12:0.8.0")\
            .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .set("spark.databricks.delta.schema.autoMerge.enabled", True) \
            .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark-3.0.2-bin-hadoop-3.2.0/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7fe2924f-2e9e-45e5-921e-b57fb6007127;1.0
	confs: [default]
	found io.delta#delta-core_2.12;0.8.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 375ms :: artifacts dl 9ms
	:: modules in use:
	com.ibm.icu#icu4j;58.2 from central in [default]
	io.delta#delta-core_2.12;0.8.0 from central in [default]
	org.abego.

In [3]:
tableName = "deltaTable"
basePath = "Deltas/deltaTable"

In [4]:
data = [(1,"apple",10,"iphone"), (2,"samsung",20,"galaxy"), (3,"dell",30,"monitor")]
columns = ["pk_id","name","value", "type"]
df = spark.createDataFrame(data).toDF(*columns)

df.write.format("delta"). \
    mode("overwrite"). \
    option("mergeSchema", True). \
    save(basePath)

spark.read.format("delta").load(basePath).show(10, False)

                                                                                

+-----+-------+-----+-------+
|pk_id|name   |value|type   |
+-----+-------+-----+-------+
|1    |apple  |10   |iphone |
|2    |samsung|20   |galaxy |
|3    |dell   |30   |monitor|
+-----+-------+-----+-------+



In [5]:
!ls Deltas/deltaTable

_delta_log
part-00000-09267332-ef4c-4fc4-89ce-136e4da9e825-c000.snappy.parquet
part-00000-0def5718-2b18-4058-b342-9dc8004fdb29-c000.snappy.parquet
part-00000-241b308b-11a0-45e5-8def-a03e2b9ca9b4-c000.snappy.parquet
part-00000-d9ae659f-dd36-4ac9-9487-786d4f0ed755-c000.snappy.parquet
part-00001-11a7fb8d-0750-4768-8eb3-efb7bc46025c-c000.snappy.parquet
part-00001-3a30a77d-2908-4974-bcb7-59f7e69b5969-c000.snappy.parquet
part-00001-403a4cb0-e5b0-47de-90c9-f74af86cf25e-c000.snappy.parquet
part-00001-6c7bb7ed-0d11-4b79-b50d-6913115cd1e3-c000.snappy.parquet


In [6]:
!ls Deltas/deltaTable/_delta_log

00000000000000000000.json  00000000000000000002.json
00000000000000000001.json  00000000000000000003.json


In [10]:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "deltaTable")

                                                                                

### Update and Append

In [11]:
deltas = [(1,"apple",20,"iphone"), (4, "other", 50, "other")]
columns = ["pk_id","name","value", "type"]
df = spark.createDataFrame(deltas).toDF(*columns)

deltaTable.alias("baseline").merge(
  df.alias('updates'),
    'baseline.pk_id = updates.pk_id'
  ) \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

deltaTable.toDF().show()

                                                                                

+-----+-----+-----+-------+---------+
|pk_id| name|value|   type|new_field|
+-----+-----+-----+-------+---------+
|    1|apple|   20| iphone|     null|
|    3| dell|   30|monitor|     null|
|    4|other|   50|  other|     null|
+-----+-----+-----+-------+---------+



### Delete

In [12]:
deletes = [(2,"samsung",20,"galaxy")]
columns = ["pk_id","name","value", "type"]
df = spark.createDataFrame(deletes).toDF(*columns)

deltaTable.alias("baseline").merge(
  df.alias('updates'),
    'baseline.pk_id = updates.pk_id'
  ) \
  .whenMatchedDelete() \
  .execute()

deltaTable.toDF().show(10, False)

                                                                                

+-----+-----+-----+-------+---------+
|pk_id|name |value|type   |new_field|
+-----+-----+-----+-------+---------+
|1    |apple|20   |iphone |null     |
|3    |dell |30   |monitor|null     |
|4    |other|50   |other  |null     |
+-----+-----+-----+-------+---------+



In [13]:
deltaTable.delete("pk_id = 1 or pk_id = '101'")
deltaTable.toDF().show(10, False)

                                                                                

+-----+-----+-----+-------+---------+
|pk_id|name |value|type   |new_field|
+-----+-----+-----+-------+---------+
|3    |dell |30   |monitor|null     |
|4    |other|50   |other  |null     |
+-----+-----+-----+-------+---------+



In [14]:
deletes = [(4, "other", 50, "other"),(3, "dell", 30, "monitor")]
columns = ["pk_id","name","value", "type"]
df = spark.createDataFrame(deletes).toDF(*columns)

deltaTable.alias("baseline").merge(
  df.alias('updates'),
    'baseline.pk_id = updates.pk_id'
  ) \
  .whenMatchedDelete("updates.type='other'") \
  .execute()

deltaTable.toDF().show(10, False)

                                                                                

+-----+----+-----+-------+---------+
|pk_id|name|value|type   |new_field|
+-----+----+-----+-------+---------+
|3    |dell|30   |monitor|null     |
+-----+----+-----+-------+---------+



In [15]:
spark.read.format("delta").load(basePath).printSchema()

root
 |-- pk_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- value: long (nullable = true)
 |-- type: string (nullable = true)



In [16]:
schema_change = [(101,"phones","apple",220,"iphone"),(2,"phones","samsung",20,"galaxy"), (1,"other", "other", 50, "other")]
columns = ["pk_id","new_field","name","value", "type"]
df = spark.createDataFrame(schema_change).toDF(*columns)


deltaTable.alias("baseline").merge(
  df.alias('updates'),
    'baseline.pk_id = updates.pk_id'
  ) \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

deltaTable.toDF().show()

                                                                                

+-----+-------+-----+-------+---------+
|pk_id|   name|value|   type|new_field|
+-----+-------+-----+-------+---------+
|    2|samsung|   20| galaxy|   phones|
|  101|  apple|  220| iphone|   phones|
|    1|  other|   50|  other|    other|
|    3|   dell|   30|monitor|     null|
+-----+-------+-----+-------+---------+



In [17]:
!cat Deltas/deltaTable/_delta_log/00000000000000000000.json

{"commitInfo":{"timestamp":1652337475328,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isBlindAppend":false,"operationMetrics":{"numFiles":"2","numOutputBytes":"2294","numOutputRows":"3"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"5648fd3e-91fd-489d-a1f8-ab0c2d250947","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"pk_id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"type\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1652337470349}}
{"add":{"path":"part-00000-0def5718-2b18-4058-b342-9dc8004fdb29-c000.snappy.parquet","partitionValues":{},"size":1158,"modificationTime":1652337473000,"dataChange":true}}
{"add":{"path":"part-00001

In [18]:
deltaTable.toDF().write.format("delta").option("mergeSchema",True).mode("overwrite").save(basePath)

                                                                                

In [20]:
help(deltaTable.restoreToTimestamp)

AttributeError: 'DeltaTable' object has no attribute 'restoreToTimestamp'

In [21]:
!cat deltaTable/_delta_log/00000000000000000010.json

{"commitInfo":{"timestamp":1651852332217,"operation":"MERGE","operationParameters":{"predicate":"(baseline.`pk_id` = updates.`pk_id`)","matchedPredicates":"[{\"predicate\":\"(updates.`type` = 'other')\",\"actionType\":\"delete\"}]","notMatchedPredicates":"[]"},"readVersion":9,"isBlindAppend":false,"operationMetrics":{"numTargetRowsCopied":"1","numTargetRowsDeleted":"0","numTargetFilesAdded":"2","executionTimeMs":"0","numTargetRowsInserted":"0","scanTimeMs":"1094","numTargetRowsUpdated":"0","numOutputRows":"1","numSourceRows":"2","numTargetFilesRemoved":"1","rewriteTimeMs":"2519"}}}
{"remove":{"path":"part-00107-2588de64-999c-4487-ab70-12b8e742ed0f-c000.snappy.parquet","deletionTimestamp":1651852332216,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":1322}}
{"add":{"path":"part-00000-2e9a3a5e-a888-45db-8062-48bd6467f344-c000.snappy.parquet","partitionValues":{},"size":623,"modificationTime":1651852329000,"dataChange":true}}
{"add":{"path":"part-00107-420d5550-ad

In [22]:
spark.read.format("delta").load(basePath).show()

+-----+-------+-----+-------+---------+
|pk_id|   name|value|   type|new_field|
+-----+-------+-----+-------+---------+
|    2|samsung|   20| galaxy|   phones|
|  101|  apple|  220| iphone|   phones|
|    1|  other|   50|  other|    other|
|    3|   dell|   30|monitor|     null|
+-----+-------+-----+-------+---------+



In [23]:
deltaTable.toDF().printSchema()

root
 |-- pk_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- value: long (nullable = true)
 |-- type: string (nullable = true)
 |-- new_field: string (nullable = true)



In [24]:
deltaTable.toDF().write.format("delta").option("mergeSchema",True).option("overwriteSchema",True).mode("overwrite").save(basePath)

                                                                                

In [25]:
spark.read.format("delta").load(basePath).show()

+-----+-------+-----+-------+---------+
|pk_id|   name|value|   type|new_field|
+-----+-------+-----+-------+---------+
|    2|samsung|   20| galaxy|   phones|
|  101|  apple|  220| iphone|   phones|
|    1|  other|   50|  other|    other|
|    3|   dell|   30|monitor|     null|
+-----+-------+-----+-------+---------+



In [26]:
spark.read.format("delta").load(basePath).printSchema()

root
 |-- pk_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- value: long (nullable = true)
 |-- type: string (nullable = true)
 |-- new_field: string (nullable = true)



In [27]:
deltaTable.history(1)

DataFrame[version: bigint, timestamp: timestamp, userId: string, userName: string, operation: string, operationParameters: map<string,string>, job: struct<jobId:string,jobName:string,runId:string,jobOwnerId:string,triggerType:string>, notebook: struct<notebookId:string>, clusterId: string, readVersion: bigint, isolationLevel: string, isBlindAppend: boolean, operationMetrics: map<string,string>, userMetadata: string]

In [28]:
deltaTable.vacuum()

                                                                                

Deleted 38 files and directories in a total of 1 directories.


DataFrame[]

In [None]:
spark.stop()

In [29]:
!ls deltaTable

_delta_log
part-00000-0dbe011d-eaad-4bd4-abdd-2f8011602659-c000.snappy.parquet
part-00000-1512ab50-13d4-4303-a8a2-189068563112-c000.snappy.parquet
part-00000-44206d9d-bbe6-4697-8a31-d28d0e6af4b6-c000.snappy.parquet
part-00000-4ed9cc54-889b-4a54-8db7-6c4f875951ae-c000.snappy.parquet
part-00000-5263f752-8e81-43eb-a184-1ee3545858a9-c000.snappy.parquet
part-00000-73479f5a-fe73-413b-9899-c694cc1cc1b6-c000.snappy.parquet
part-00000-80a4fd75-9c3b-4990-bdc5-2eb174f723af-c000.snappy.parquet
part-00000-a528aab2-aa4d-4e0c-93df-cea6f5fb9f9c-c000.snappy.parquet
part-00000-aac298f5-9c24-48d8-9415-195c3d3c838c-c000.snappy.parquet
part-00000-db885079-8812-4678-9b21-f7eea15c0434-c000.snappy.parquet
part-00069-061433de-3efe-4cf6-b52e-c203f68a7eef-c000.snappy.parquet
part-00069-ccbb5395-3d0c-47e5-a5e1-36a8e0fff942-c000.snappy.parquet
part-00107-0e1d9ab6-698b-4112-8d35-f23564e857f5-c000.snappy.parquet
part-00107-9d288664-6ea9-44fb-a5e1-ec07542fcbd5-c000.snappy.parquet
part-00109-cf65e10f-cb40-4dd6-95c2-88