In [27]:
import pyspark
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
        .config("spark.databricks.delta.retentionDurationCheck.enabled","false")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [33]:

table_path = "./tmp/delta-table"
df = spark.read.format("delta").load(table_path)
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



In [31]:
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, table_path)

# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })

# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

# Upsert (merge) new data
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

23/04/19 01:33:08 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
23/04/19 01:33:08 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
23/04/19 01:33:08 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



In [36]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, table_path)

fullHistoryDF = deltaTable.history()    # get the full history of the table
fullHistoryDF.show()
#lastOperationDF = deltaTable.history(1) # get the last operation

+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|   operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|     14|2023-04-19 01:33:...|  null|    null|       MERGE|{predicate -> (ol...|null|    null|     null|         13|     Serializable|        false|{numTargetRowsCop...|        null|Apache-Spark/3.3....|
|     13|2023-04-19 01:33:...|  null|    null|      DELETE|{predicate -> ["(...|null|    null|     null|         12|     Serializable|        false|{numRemovedFiles ...|        null|Ap

In [56]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, table_path)

detailDF = deltaTable.detail()
detailDF.show()

+------+--------------------+----+-----------+--------------------+--------------------+--------------------+----------------+--------+-----------+----------+----------------+----------------+--------------------+
|format|                  id|name|description|            location|           createdAt|        lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|       tableFeatures|
+------+--------------------+----+-----------+--------------------+--------------------+--------------------+----------------+--------+-----------+----------+----------------+----------------+--------------------+
| delta|70c5aaf5-74e9-46e...|null|       null|file:/home/aleks/...|2023-04-19 01:45:...|2023-04-19 02:00:...|              []|       1|       4357|        {}|               1|               2|[appendOnly, inva...|
+------+--------------------+----+-----------+--------------------+--------------------+--------------------+----------------+--------+---------

In [53]:
df = spark.read.format("delta").option("versionAsOf", 0).load(table_path)
df.show()

+---+
| id|
+---+
+---+



In [55]:

from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "./tmp/delta-table")
deltaTable.optimize().executeCompaction()

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,totalClusterParallelism:bigint,totalScheduledTasks:bigint,autoCompactParallelismStats:struct<maxClusterActiveParallelism:bigint,minClusterActiveParallelism:bigint,maxSessionActiveParallelism:bigint,minSessionActiveParallelism:bigint>,de

In [57]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, table_path)  # path-based tables, or
#deltaTable = DeltaTable.forName(spark, tableName)    # Hive metastore-based tables

deltaTable.vacuum(0)        # vacuum files not required by versions older than the default retention period

#deltaTable.vacuum(100)     # vacuum files not required by versions more than 100 hours old

                                                                                

Deleted 915 files and directories in a total of 1 directories.


DataFrame[]

                                                                                

In [23]:
spark.stop()

### STREAMING

In [44]:
streamingDf = spark.readStream.format("rate").load()

stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "./tmp/checkpoint").start("./tmp/delta-table")


23/04/19 01:45:06 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

In [51]:
stream.stop()

23/04/19 02:00:20 ERROR FileFormatWriter: Aborting job d0ba25d7-a813-4631-bbc7-bf941d25ca9a.
java.lang.InterruptedException
	at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1048)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187)
	at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:334)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:943)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:255)
	at org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$1(TransactionalWrite.scala:398)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala

23/04/19 02:00:20 WARN TaskSetManager: Lost task 0.0 in stage 9061.0 (TID 137668) (192.168.250.148 executor driver): TaskKilled (Stage cancelled)
23/04/19 02:00:20 WARN TaskSetManager: Lost task 4.0 in stage 9061.0 (TID 137672) (192.168.250.148 executor driver): TaskKilled (Stage cancelled)
23/04/19 02:00:20 WARN TaskSetManager: Lost task 3.0 in stage 9061.0 (TID 137671) (192.168.250.148 executor driver): TaskKilled (Stage cancelled)
23/04/19 02:00:20 WARN TaskSetManager: Lost task 6.0 in stage 9061.0 (TID 137674) (192.168.250.148 executor driver): TaskKilled (Stage cancelled)
23/04/19 02:00:20 WARN TaskSetManager: Lost task 5.0 in stage 9061.0 (TID 137673) (192.168.250.148 executor driver): TaskKilled (Stage cancelled)
23/04/19 02:00:20 WARN TaskSetManager: Lost task 1.0 in stage 9061.0 (TID 137669) (192.168.250.148 executor driver): TaskKilled (Stage cancelled)
23/04/19 02:00:20 WARN TaskSetManager: Lost task 2.0 in stage 9061.0 (TID 137670) (192.168.250.148 executor driver): TaskKil

In [45]:
df = spark.read.format("delta").load(table_path)
df.show()

+---+
| id|
+---+
| 46|
| 73|
| 52|
| 49|
| 71|
| 79|
| 12|
| 16|
|  4|
| 17|
| 19|
| 56|
| 27|
| 59|
| 53|
| 77|
| 13|
| 37|
| 39|
| 24|
+---+
only showing top 20 rows



                                                                                