In [2]:
import os
from pyspark.sql import SparkSession
from datetime import date

today = date.today().strftime("%b-%d-%Y")

AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY")
AWS_S3_ENDPOINT = os.getenv("AWS_S3_ENDPOINT")
AWS_BUCKET_NAME = os.getenv("AWS_BUCKET_NAME")
HIVE_METASTORE_URI = os.getenv("HIVE_METASTORE_URI")


POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
POSTGRES_ENDPOINT = os.getenv("POSTGRES_ENDPOINT")
POSTGRES_DB = os.getenv("POSTGRES_DB")

spark = SparkSession.builder \
    .master("local[4]") \
    .appName('Clean data') \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083")\
    .config("spark.sql.warehouse.dir","s3a://datalake/warehouse")\
    .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_KEY) \
    .config("fs.s3a.endpoint", AWS_S3_ENDPOINT)\
    .config("spark.hadoop.fs.s3a.path.style.access", "true")\
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("fs.s3a.connection.ssl.enabled", "false")\
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
    .config('spark.jars.packages', 'io.delta:delta-core_2.12:1.0.1') \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .enableHiveSupport()\
    .config('spark.jars','/opt/spark/jars/aws-java-sdk-bundle-1.11.375.jar')\
    .config('spark.jars','/opt/spark/jars/hadoop-aws-3.2.0.jar')\
    .config('spark.jars','/opt/spark/jars/delta-core_2.12-1.0.1.jar')\
    .config("spark.executor.extraClassPath", "/opt/spark/jars/aws-java-sdk-bundle-1.11.375.jar:/opt/spark/jars/hadoop-aws-3.2.0.jar:/opt/spark/jars/postgresql-42.3.5.jar") \
    .config("spark.driver.extraClassPath","/opt/spark/jars/aws-java-sdk-bundle-1.11.375.jar:/opt/spark/jars/hadoop-aws-3.2.0.jar:/opt/spark/jars/postgresql-42.3.5.jar") \
    .getOrCreate()






:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-86ca6e80-d50b-47ef-be39-254c55520d90;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.0.1 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 619ms :: artifacts dl 16ms
	:: modules in use:
	com.ibm.icu#icu4j;58.2 from central in [default]
	io.delta#delta-core_2.12;1.0.1 from central in [default]
	org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
	org.antlr#ST4;4.0.8 from central in [default]
	org.antlr#antlr-runtime;3.5.2 fro

23/06/13 07:12:48 INFO Executor: Starting executor ID driver on host a208eb66d409
23/06/13 07:12:48 INFO Executor: Fetching file:///root/.ivy2/jars/org.antlr_ST4-4.0.8.jar with timestamp 1686640366851
23/06/13 07:12:48 INFO Utils: /root/.ivy2/jars/org.antlr_ST4-4.0.8.jar has been previously copied to /tmp/spark-3549339a-b4fd-48a2-a60d-d687548627ab/userFiles-d0a99444-5bd8-4a11-9115-e35145af188e/org.antlr_ST4-4.0.8.jar
23/06/13 07:12:48 INFO Executor: Fetching file:///root/.ivy2/jars/org.antlr_antlr4-4.7.jar with timestamp 1686640366851
23/06/13 07:12:48 INFO Utils: /root/.ivy2/jars/org.antlr_antlr4-4.7.jar has been previously copied to /tmp/spark-3549339a-b4fd-48a2-a60d-d687548627ab/userFiles-d0a99444-5bd8-4a11-9115-e35145af188e/org.antlr_antlr4-4.7.jar
23/06/13 07:12:48 INFO Executor: Fetching file:///root/.ivy2/jars/io.delta_delta-core_2.12-1.0.1.jar with timestamp 1686640366851
23/06/13 07:12:48 INFO Utils: /root/.ivy2/jars/io.delta_delta-core_2.12-1.0.1.jar has been previously copie

## ACID Demo


In [None]:
from delta.tables import *

In [None]:
cars = [
    {"make": "Toyota", "model": "Corolla", "year": 2021, "color": "black", "price": 20000},
    {"make": "Honda", "model": "Civic", "year": 2022, "color": "white", "price": 25000},
    {"make": "Ford", "model": "F-150", "year": 2021, "color": "red", "price": 35000},
    {"make": "Tesla", "model": "Model S", "year": 2020, "color": "blue", "price": 80000},
    {"make": "Chevrolet", "model": "Impala", "year": 2019, "color": "silver", "price": 15000}
]

# create a Spark DataFrame from the car data
df = spark.createDataFrame(cars)

In [None]:
deltaPath = "s3a://datalake/Demo1"
df.write.format("delta").mode("append").save(deltaPath)

In [None]:
from delta import *
deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.delete("year < 2020")

## Schema Enforcement

In [4]:
deltaPath = f's3a://datalake/bronze/CarPartsDB/Jun-06-2023/Part_in_Order//'
# Reading tables from landing area
print('\nReading ...')
Part_in_Oder = spark.read.format("delta").load(deltaPath)
Part_in_Oder.show()
Part_in_Oder.printSchema()


Reading ...


23/06/13 07:14:22 INFO DelegatingLogStore: LogStore org.apache.spark.sql.delta.storage.S3SingleDriverLogStore is used for scheme s3a
23/06/13 07:14:22 INFO DeltaLog: Loading version 4.
23/06/13 07:14:22 INFO Snapshot: [tableId=39ba93e8-1789-4fe9-8554-6dcb23ed824a] DELTA: Compute snapshot for version: 4
23/06/13 07:14:22 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 180.4 KiB, free 434.2 MiB)
23/06/13 07:14:22 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 29.1 KiB, free 434.2 MiB)
23/06/13 07:14:22 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on a208eb66d409:38921 (size: 29.1 KiB, free: 434.4 MiB)
23/06/13 07:14:22 INFO SparkContext: Created broadcast 0 from toString at <unknown>:0
23/06/13 07:14:22 INFO DeltaLogFileIndex: Created DeltaLogFileIndex(JSON, numFilesInSegment: 5, totalFileSize: 3836)
23/06/13 07:14:26 INFO FileSourceStrategy: Pushed Filters: 
23/06/13 07:14:26 INFO FileSourceStrategy: Post-

23/06/13 07:14:31 INFO ShuffleBlockFetcherIterator: Getting 1 (368.0 B) non-empty blocks including 1 (368.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
23/06/13 07:14:31 INFO ShuffleBlockFetcherIterator: Getting 1 (276.0 B) non-empty blocks including 1 (276.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
23/06/13 07:14:31 INFO ShuffleBlockFetcherIterator: Getting 1 (276.0 B) non-empty blocks including 1 (276.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
23/06/13 07:14:31 INFO ShuffleBlockFetcherIterator: Getting 1 (276.0 B) non-empty blocks including 1 (276.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
23/06/13 07:14:31 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms
23/06/13 07:14:31 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms
23/06/13 07:14:31 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms
23/06/13 07:14:31 INFO ShuffleBlockFetcherIterator: Started 0 rem

23/06/13 07:14:32 INFO ShuffleBlockFetcherIterator: Getting 3 (1812.0 B) non-empty blocks including 3 (1812.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
23/06/13 07:14:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
23/06/13 07:14:32 INFO BlockManagerInfo: Removed broadcast_2_piece0 on a208eb66d409:38921 in memory (size: 66.7 KiB, free: 434.2 MiB)
23/06/13 07:14:32 INFO ShuffleBlockFetcherIterator: Getting 0 (0.0 B) non-empty blocks including 0 (0.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
23/06/13 07:14:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
23/06/13 07:14:32 INFO ShuffleBlockFetcherIterator: Getting 0 (0.0 B) non-empty blocks including 0 (0.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
23/06/13 07:14:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
23/06/13 07:14:32 INFO ShuffleBlockFetcherIterator: Getting 0 (0.0 B) non-empty blocks including 0 (0.0 B) 

23/06/13 07:14:33 INFO TaskSetManager: Starting task 12.0 in stage 1.0 (TID 23) (a208eb66d409, executor driver, partition 12, PROCESS_LOCAL, 4442 bytes) taskResourceAssignments Map()
23/06/13 07:14:33 INFO Executor: Running task 12.0 in stage 1.0 (TID 23)
23/06/13 07:14:33 INFO TaskSetManager: Finished task 7.0 in stage 1.0 (TID 19) in 210 ms on a208eb66d409 (executor driver) (17/50)
23/06/13 07:14:33 INFO Executor: Finished task 8.0 in stage 1.0 (TID 20). 3700 bytes result sent to driver
23/06/13 07:14:33 INFO ShuffleBlockFetcherIterator: Getting 0 (0.0 B) non-empty blocks including 0 (0.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
23/06/13 07:14:33 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
23/06/13 07:14:33 INFO TaskSetManager: Starting task 13.0 in stage 1.0 (TID 24) (a208eb66d409, executor driver, partition 13, PROCESS_LOCAL, 4442 bytes) taskResourceAssignments Map()
23/06/13 07:14:33 INFO TaskSetManager: Finished task 8.0 in stage 1.0 (T

23/06/13 07:14:33 INFO Executor: Finished task 26.0 in stage 1.0 (TID 32). 3700 bytes result sent to driver
23/06/13 07:14:33 INFO TaskSetManager: Starting task 33.0 in stage 1.0 (TID 38) (a208eb66d409, executor driver, partition 33, PROCESS_LOCAL, 4442 bytes) taskResourceAssignments Map()
23/06/13 07:14:33 INFO Executor: Running task 33.0 in stage 1.0 (TID 38)
23/06/13 07:14:33 INFO TaskSetManager: Finished task 26.0 in stage 1.0 (TID 32) in 184 ms on a208eb66d409 (executor driver) (32/50)
23/06/13 07:14:33 INFO ShuffleBlockFetcherIterator: Getting 0 (0.0 B) non-empty blocks including 0 (0.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
23/06/13 07:14:33 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
23/06/13 07:14:33 INFO ShuffleBlockFetcherIterator: Getting 0 (0.0 B) non-empty blocks including 0 (0.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
23/06/13 07:14:33 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
23

23/06/13 07:14:34 INFO Executor: Finished task 40.0 in stage 1.0 (TID 44). 3700 bytes result sent to driver
23/06/13 07:14:34 INFO Executor: Finished task 36.0 in stage 1.0 (TID 41). 3700 bytes result sent to driver
23/06/13 07:14:34 INFO Executor: Finished task 37.0 in stage 1.0 (TID 42). 3700 bytes result sent to driver
23/06/13 07:14:34 INFO Executor: Finished task 38.0 in stage 1.0 (TID 43). 3700 bytes result sent to driver
23/06/13 07:14:34 INFO TaskSetManager: Starting task 41.0 in stage 1.0 (TID 45) (a208eb66d409, executor driver, partition 41, PROCESS_LOCAL, 4442 bytes) taskResourceAssignments Map()
23/06/13 07:14:34 INFO TaskSetManager: Finished task 40.0 in stage 1.0 (TID 44) in 857 ms on a208eb66d409 (executor driver) (39/50)
23/06/13 07:14:35 INFO Executor: Running task 41.0 in stage 1.0 (TID 45)
23/06/13 07:14:35 INFO TaskSetManager: Finished task 36.0 in stage 1.0 (TID 41) in 1008 ms on a208eb66d409 (executor driver) (40/50)
23/06/13 07:14:35 INFO TaskSetManager: Starting

23/06/13 07:14:35 INFO ShuffleBlockFetcherIterator: Getting 50 (4.7 KiB) non-empty blocks including 50 (4.7 KiB) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
23/06/13 07:14:35 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
23/06/13 07:14:35 INFO CodeGenerator: Code generated in 11.431349 ms
23/06/13 07:14:35 INFO CodeGenerator: Code generated in 20.99505 ms
23/06/13 07:14:35 INFO Executor: Finished task 0.0 in stage 2.0 (TID 53). 4527 bytes result sent to driver
23/06/13 07:14:35 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 53) in 123 ms on a208eb66d409 (executor driver) (1/1)
23/06/13 07:14:35 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
23/06/13 07:14:35 INFO DAGScheduler: ResultStage 2 (toString at <unknown>:0) finished in 0.137 s
23/06/13 07:14:35 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
23/06/13 07:14:35 INFO TaskSchedulerImpl: Killing

23/06/13 07:14:36 INFO BlockManager: Found block rdd_12_8 locally
23/06/13 07:14:36 INFO BlockManager: Found block rdd_12_9 locally
23/06/13 07:14:36 INFO Executor: Finished task 8.0 in stage 4.0 (TID 62). 3039 bytes result sent to driver
23/06/13 07:14:36 INFO TaskSetManager: Starting task 12.0 in stage 4.0 (TID 66) (a208eb66d409, executor driver, partition 12, PROCESS_LOCAL, 4453 bytes) taskResourceAssignments Map()
23/06/13 07:14:36 INFO TaskSetManager: Finished task 8.0 in stage 4.0 (TID 62) in 31 ms on a208eb66d409 (executor driver) (9/50)
23/06/13 07:14:36 INFO BlockManager: Found block rdd_12_10 locally
23/06/13 07:14:36 INFO Executor: Finished task 9.0 in stage 4.0 (TID 63). 3039 bytes result sent to driver
23/06/13 07:14:36 INFO Executor: Running task 12.0 in stage 4.0 (TID 66)
23/06/13 07:14:36 INFO TaskSetManager: Starting task 13.0 in stage 4.0 (TID 67) (a208eb66d409, executor driver, partition 13, PROCESS_LOCAL, 4453 bytes) taskResourceAssignments Map()
23/06/13 07:14:36 I

23/06/13 07:14:36 INFO BlockManager: Found block rdd_12_24 locally
23/06/13 07:14:36 INFO BlockManager: Found block rdd_12_26 locally
23/06/13 07:14:36 INFO Executor: Finished task 24.0 in stage 4.0 (TID 78). 3082 bytes result sent to driver
23/06/13 07:14:36 INFO TaskSetManager: Starting task 28.0 in stage 4.0 (TID 82) (a208eb66d409, executor driver, partition 28, PROCESS_LOCAL, 4453 bytes) taskResourceAssignments Map()
23/06/13 07:14:36 INFO Executor: Finished task 26.0 in stage 4.0 (TID 80). 3082 bytes result sent to driver
23/06/13 07:14:36 INFO Executor: Running task 28.0 in stage 4.0 (TID 82)
23/06/13 07:14:36 INFO TaskSetManager: Starting task 29.0 in stage 4.0 (TID 83) (a208eb66d409, executor driver, partition 29, PROCESS_LOCAL, 4453 bytes) taskResourceAssignments Map()
23/06/13 07:14:36 INFO TaskSetManager: Finished task 24.0 in stage 4.0 (TID 78) in 126 ms on a208eb66d409 (executor driver) (25/50)
23/06/13 07:14:36 INFO TaskSetManager: Finished task 26.0 in stage 4.0 (TID 80)

23/06/13 07:14:36 INFO SparkContext: Starting job: showString at <unknown>:0
23/06/13 07:14:36 INFO DAGScheduler: Got job 2 (showString at <unknown>:0) with 1 output partitions
23/06/13 07:14:36 INFO DAGScheduler: Final stage: ResultStage 5 (showString at <unknown>:0)
23/06/13 07:14:36 INFO DAGScheduler: Parents of final stage: List()
23/06/13 07:14:36 INFO DAGScheduler: Missing parents: List()
23/06/13 07:14:36 INFO DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[24] at showString at <unknown>:0), which has no missing parents
23/06/13 07:14:36 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 15.0 KiB, free 432.7 MiB)
23/06/13 07:14:36 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 5.9 KiB, free 432.7 MiB)
23/06/13 07:14:36 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on a208eb66d409:38921 (size: 5.9 KiB, free: 434.1 MiB)
23/06/13 07:14:36 INFO SparkContext: Created broadcast 7 from broadcast at D

+----------------+--------+----------------+-----------------+--------+
|part_in_order_id|order_id|part_supplier_id|actual_sale_price|quantity|
+----------------+--------+----------------+-----------------+--------+
|               1|       1|               1|               20|       2|
|               2|       1|               4|               35|       1|
|               3|       2|               2|               25|       4|
|               4|       3|               3|               30|       5|
|               5|       4|               4|               35|    null|
|               6|       5|               5|               40|    null|
|               5|       4|               4|               35|    null|
|               6|       5|               5|               40|    null|
+----------------+--------+----------------+-----------------+--------+

root
 |-- part_in_order_id: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- part_supplier_id: integer (nullable

[Stage 5:>                                                          (0 + 1) / 1]23/06/13 07:14:37 INFO FileScanRDD: Reading File path: s3a://datalake/bronze/CarPartsDB/Jun-06-2023/Part_in_Order/part-00001-f04297e6-dff0-43e4-aff3-e360c40ece4d-c000.snappy.parquet, range: 0-1162, partition values: [empty row]
23/06/13 07:14:37 INFO S3AInputStream: Switching to Random IO seek policy
23/06/13 07:14:37 INFO S3AInputStream: Switching to Random IO seek policy
23/06/13 07:14:37 INFO Executor: Finished task 0.0 in stage 5.0 (TID 104). 1852 bytes result sent to driver
23/06/13 07:14:37 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 104) in 665 ms on a208eb66d409 (executor driver) (1/1)
23/06/13 07:14:37 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool 
23/06/13 07:14:37 INFO DAGScheduler: ResultStage 5 (showString at <unknown>:0) finished in 0.699 s
23/06/13 07:14:37 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie 

In [5]:
from pyspark.sql.functions import col

In [6]:
cols = ['part_in_order_id','order_id','part_supplier_id','actual_sale_price', 'quantity', 'discount']
items = [
(5, 4, 4, 35, 5, True),
(6, 5, 5, 40, 6, False)
]
part_in_order_updates = spark.createDataFrame(items, cols) \
            .withColumn("part_in_order_id", col("part_in_order_id").cast("int")) \
            .withColumn("order_id", col("order_id").cast("int")) \
            .withColumn("part_supplier_id", col("part_supplier_id").cast("int")) \
            .withColumn("actual_sale_price", col("actual_sale_price").cast("int")) \
            .withColumn("quantity", col("quantity").cast("int")) 
part_in_order_updates.write.format("delta").mode("append").save(deltaPath)


AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 964cd0e0-4f37-47e1-841c-447c2c4c638d).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- part_in_order_id: integer (nullable = true)
-- order_id: integer (nullable = true)
-- part_supplier_id: integer (nullable = true)
-- actual_sale_price: integer (nullable = true)
-- quantity: integer (nullable = true)


Data schema:
root
-- part_in_order_id: integer (nullable = true)
-- order_id: integer (nullable = true)
-- part_supplier_id: integer (nullable = true)
-- actual_sale_price: integer (nullable = true)
-- quantity: integer (nullable = true)
-- discount: boolean (nullable = true)

         

23/06/13 07:42:48 INFO BlockManagerInfo: Removed broadcast_8_piece0 on a208eb66d409:38921 in memory (size: 5.9 KiB, free: 434.2 MiB)
23/06/13 07:42:48 INFO BlockManagerInfo: Removed broadcast_3_piece0 on a208eb66d409:38921 in memory (size: 97.5 KiB, free: 434.3 MiB)
23/06/13 07:42:48 INFO BlockManagerInfo: Removed broadcast_7_piece0 on a208eb66d409:38921 in memory (size: 5.9 KiB, free: 434.3 MiB)
23/06/13 07:42:48 INFO BlockManagerInfo: Removed broadcast_5_piece0 on a208eb66d409:38921 in memory (size: 29.8 KiB, free: 434.3 MiB)


## Adit Data Changes

In [42]:
deltaTable = DeltaTable.forPath(spark, deltaPath)

In [43]:
(deltaTable
 .history(3)
 .select("version", "timestamp", "operation", "operationParameters")
 .show(truncate=False))

+-------+-------------------+---------+--------------------------------------+
|version|timestamp          |operation|operationParameters                   |
+-------+-------------------+---------+--------------------------------------+
|4      |2023-06-06 10:34:22|WRITE    |{mode -> Append, partitionBy -> []}   |
|3      |2023-06-06 10:31:25|WRITE    |{mode -> Append, partitionBy -> []}   |
|2      |2023-06-06 08:41:58|WRITE    |{mode -> Overwrite, partitionBy -> []}|
+-------+-------------------+---------+--------------------------------------+



## Querying Previous Snapshots

In [58]:
(spark.read
 .format("delta")
 .option("timestampAsOf", "2023-06-06 08:41:58") 
 .load(deltaPath))



DataFrame[part_in_order_id: int, order_id: int, part_supplier_id: int, actual_sale_price: int, quantity: int]

In [59]:
(spark.read.format("delta")
 .option("versionAsOf", "3")
 .load(deltaPath))

DataFrame[part_in_order_id: int, order_id: int, part_supplier_id: int, actual_sale_price: int, quantity: int]