# Chapter 7: Streaming in and out of your Delta Lake


# Notebook Intro and Setup

This notebook should run end to end with the "run all" mode.

We encourage exploration and breaking stuff to get the most from it.

In [None]:
# # Import resources
# import subprocess
# import os

# # Set working dir
# # Note: Original working dir is different than this. 
# # Reason: I want to differentiate the work and easily track it in local env instead of Docker Container

# try:
#     target_dir = "<your-path-to>streaming\\ch07"

#     # Create the directory if it doesn't exist
#     if not os.path.exists(target_dir):
#         os.makedirs(target_dir)
#         print(f"✅ Directory '{target_dir}' created.")
#     else:
#         print(f"📁 Directory '{target_dir}' already exists.")
    
#     # Change the current working directory
#     os.chdir(target_dir)
#     print(f"📂 Changed working directory to: {os.getcwd()}")
# except Exception as ex:
#     raise str(ex)


In [12]:
# Set additional Spark configuration options

spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

spark.sparkContext.setLogLevel("ERROR")


In [None]:
desc_ext_covid_nyt = spark.sql("DESCRIBE EXTENDED default.covid_nyt")
desc_ext_covid_nyt

# Paths

Placeholder for Delta Tables and other sources location

# Delta readStream chained with writeStream example

Target: Read `default.covid_nyt` Delta Table and write the data to `default.covid_nyt_stream_1` Delta Table.  
Using append mode and centralized directory for Stream Checkpoint. No transformations.  

`IMPORTANT NOTE:` In order to create a table in metastore using writeStream, need to create it first using DDL and specify location. Then use the location to writeStream in order to save data. This way the table will be queryable.
```
.start(path)
.toTable(schema.tableName)
```
These 2 options cannot be used together in a writeStream. Need to choose from start and set table path or toTable to set schema.tableName.  
Workaround:
1. Create table using DDL USING DELTA LOCATION <your/path>
2. Use .start(path/from/1) to write data in the delta table
3. Query the table using spark SQL


In [34]:
spark.sql(
    """
        DROP TABLE IF EXISTS default.covid_nyt_stream_1;
    """
).show()

++
||
++
++



In [35]:
spark.sql(
    """
        CREATE TABLE IF NOT EXISTS default.covid_nyt_stream_1
        USING DELTA
        LOCATION './covid_nyt_stream_1'
    """
).show()

++
||
++
++



In [19]:
covid_nyt_stream_1 = (
    spark
    .readStream
    .format("delta")
    .option("ignoreDeletes", "true")
    .table("covid_nyt")
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("checkpointLocation", "streaming_checkpoints/ch07/covid_nyt_stream_1/ws1")
    .queryName("covid_nyt_stream")
    .start("./spark-warehouse/covid_nyt_stream_1")
    # .toTable("default.covid_nyt_stream_1")
    # start and toTable cannot be used together. Only 1 can be used in writeStream
    # See also comment in section begining
)

In [None]:
covid_nyt_stream_1.awaitTermination()

In [None]:
covid_nyt_stream_1.processAllAvailable()

In [38]:
covid_nyt_stream_1.stop()

In [39]:
print(covid_nyt_stream_1.status)
print(covid_nyt_stream_1.last_progress)

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}


AttributeError: 'StreamingQuery' object has no attribute 'last_progress'

In [26]:
spark.sql("SHOW TABLES").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|  default|           covid_nyt|      false|
|  default|   covid_nyt_by_date|      false|
|  default|nonoptimal_covid_nyt|      false|
+---------+--------------------+-----------+



In [7]:
spark.sql("""
SELECT * FROM covid_nyt_stream_1
""").show()

+----------+-----------+----------+-----+-----+------+
|      date|     county|     state| fips|cases|deaths|
+----------+-----------+----------+-----+-----+------+
|2020-01-21|  Snohomish|Washington|53061|    1|     0|
|2020-01-22|  Snohomish|Washington|53061|    1|     0|
|2020-01-23|  Snohomish|Washington|53061|    1|     0|
|2020-01-24|       Cook|  Illinois|17031|    1|     0|
|2020-01-24|  Snohomish|Washington|53061|    1|     0|
|2020-01-25|     Orange|California| 6059|    1|     0|
|2020-01-25|       Cook|  Illinois|17031|    1|     0|
|2020-01-25|  Snohomish|Washington|53061|    1|     0|
|2020-01-26|   Maricopa|   Arizona| 4013|    1|     0|
|2020-01-26|Los Angeles|California| 6037|    1|     0|
|2020-01-26|     Orange|California| 6059|    1|     0|
|2020-01-26|       Cook|  Illinois|17031|    1|     0|
|2020-01-26|  Snohomish|Washington|53061|    1|     0|
|2020-01-27|   Maricopa|   Arizona| 4013|    1|     0|
|2020-01-27|Los Angeles|California| 6037|    1|     0|
|2020-01-2

# Delta readStream and writeStream separated
Minor Note: In original notebook the first stream that copies the data in new table is separated and this one is chained together.

Target:
- Check count of `default.covid_nyt_stream_1`
- readStream - read `default.covid_nyt_stream_1`, filter only records with `deaths > 0`
- writeStream - write to new Delta Table `default.covid_nyt_deaths_stream`
  - checkpointLocation: `streaming_checkpoints/ch07/covid_nyt_deaths_stream`

In [None]:
desc_ext_covid_nyt_stream_1 =  spark.sql("DESCRIBE EXTENDED default.covid_nyt_stream_1")
desc_ext_covid_nyt_stream_1

In [11]:
# Check count of Delta Table
covid_nyt_stream_1_count = (
    spark
    .read
    .format("delta")
    .load("./spark-warehouse/covid_nyt_stream_1")
).count()

covid_nyt_stream_1_count

1111930

In [12]:
spark.sql(
    """
        DROP TABLE IF EXISTS default.covid_nyt_deaths;
    """
)

In [13]:
spark.sql(
    """
        CREATE TABLE IF NOT EXISTS default.covid_nyt_deaths
        USING DELTA
        LOCATION './covid_nyt_deaths'
    """
)

In [16]:
from pyspark.sql.functions import col

streaming_query_covid_nyt_stream_1 = \
(
    spark
    .readStream
    .format("delta")
    .load("./spark-warehouse/covid_nyt_stream_1")
    .filter(col("deaths") > 0)
)

In [18]:
streaming_query_covid_nyt_stream_1 \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("mergeSchema", "true") \
.option("checkpointLocation", "streaming_checkpoints/ch07/covid_deaths/ws1") \
.start("./spark-warehouse/covid_nyt_deaths")


<pyspark.sql.streaming.query.StreamingQuery at 0x1ea8d357690>

In [19]:
covid_nyt_deaths = spark.sql(
    """
        SELECT *
        FROM default.covid_nyt_deaths;
    """
)

covid_nyt_deaths

date,county,state,fips,cases,deaths
2020-02-29,King,Washington,53033,4,1
2020-03-01,King,Washington,53033,11,3
2020-03-02,King,Washington,53033,15,6
2020-03-03,King,Washington,53033,22,10
2020-03-04,Placer,California,6061,2,1
2020-03-04,King,Washington,53033,33,11
2020-03-05,Placer,California,6061,2,1
2020-03-05,King,Washington,53033,52,11
2020-03-06,Placer,California,6061,5,1
2020-03-06,Lee,Florida,12071,1,1


In [20]:
# Check count of Delta Table
covid_nyt_deaths_count = (
    spark
    .read
    .format("delta")
    .load("./spark-warehouse/covid_nyt_deaths")
).count()

covid_nyt_deaths_count

843458

# Setting different options
- `ignoreDeletes`
- `ignoreChanges`
- `startingVersion`
- `startingTimestamp`
- 2 types of `eventTimeOrder` with a watermark

Target:
Write logic and create different readStream commands while setting list of options above.
Write all options to see differences. Read the book to see what expectations and observations to provide

# ignoreDeletes & ignoreChanges
Short Summary:
- `ignoreDeletes`
    - ignores delete operations as it comes across them **if a new file is not created**. If a parquet /from input delta table/ gets deleted, those changes will not be propagated to downstream destinations. 
    - We can use this setting to avoid stream failing the stream processing job and still support important delete operations. / Like to fulfill GDPR request from person and his right to be forgotten / **The catch for this scenario is data needs to be partitioned on the same values/column we use as filter in delete operation, so there are no remnants that can be moved to a new file.**

- `ignoreChanges`
    - Behave differently than `ignoreDeletes` /this is why the testing is going to be separated in 2 targets below /
    - Allows new files to be created. If we update or delete few records, which result in creating new file in the source Delta Table, that is considered as new file in the stream that reads from the table. This ensures we have freshest version of the data, but also if the new file also contains records that exist in target already they can be duplicated.
    - **[prefered option]** Solution here would be providing logic for deduplication/upsert to target table
    - Or somehow differentiate the data with additional timekeeping information / i.e adding `version_as_of` column into target or similar /     

Target:
- `ignoreDeletes`
    - Create readStream with ignoreDeletes, source `covid_nyt`
    - Create writeStream to target `covid_nyt_ignore_deletes` / do not forget to create table in catalog first /
    - Delete a record from source following the instructions/details in short summary
    - Observe does delete get processed to new stream target table
    - **Do the test with partitioned and non-partitioned source tables**
- `ignoreChanges`
    - Create readStream with ignoreChanges, source `covid_nyt`
    - Create writeStream to target `covid_nyt_ignore_changes` / do not forget to create table in catalog first /
    - Update and Delete a record from source following the instructions/details in short summary
        - **Make sure to create 2 test cases - where new file `will` and `will not` be created** 
    - Observe are duplicates presented post updates.
    - Create upsert/deduplication logic and repeat previous 3 steps to ensure the solution works


### ignoreDeletes test with Non-Partitioned source table - default.covid_nyt

### Conclusion after observations and testing:
- 1st Stream:
    - All data from `default.covid_nyt` was streamed and written to `default.covid_nyt_ignore_deletes`
    - After deletion of single record, files in source table were deleted which caused the stream to fail because of `ignoreDeletes` feature turned on
    - After the failure, UPDATE AND INSERT DMLs were executed to source table, but no new data was processed, simply because stream was marked as FAILED  
      in Spark UI and couldn't process any new data
    - Second attempt to run the same stream (with same checkpoint directory) resulted in same stream failure because of the DELETE executed earlier
- 2nd Stream:
    - Updated checkpoint directory in writeStream to a new one, in order to be able to re-execute the stream
    - This resulted in reprocessing the entire data into the target table and causing huge amount of duplicates to be saved
    - DELETE, UPDATE and INSERT were also processed and available into target table

**Conclusion**:
- It is not safe to use `ignoreDeletes` on a non-partitioned tables, if any DELETE operations are going to be executed (In case files are getting overriden?)
- This behavior may be caused by the choosed predicates for delete operation, but I believe that even with another predicate a file are going to be overwritten  
  again because table is non-partitioned

### ignoreDeletes test with Partitioned source table   
- bronze.covid_nyt_by_date - partitioned by `date`
- One stream query only
    - All data from `bronze.covid_nyt_by_date` was streamed and written to `default.covid_nyt_ignore_deletes_partitioned`
    - After records were deleted, they still exist in target table
        - Used predicate `date = 2020-01-28`, since source is `date` partitioned entire partition was deleted from source
    - TEST SUCCESSFULL

### ignoreChanges with Non-Partitioned source table - `default.covid_nyt`
- Entire source was appended to target `covid_nyt_ignore_changes_non_partitioned`
- Using non-partitioned table as a source creates new parquet file in the delta table and mark other as deleted
- This means, updated data is being duplicated in the target table (See overview of `ignoreChanges` in cell above)
- Met problems: Bad paths in Delta Table.. not related to stream

### ignoreChanges with Partitioned source table - `bronze.covid_nyt_by_date`
- Entire source was appended to target `covid_nyt_ignore_changes_partitioned`
- Using non-partitioned table as a source creates new parquet file in the delta table and mark other as deleted
- State of update predicate rows pre and post updates is stored in target
- In terms of PK this can be considered as 

#### ignoreDeletes Non-Partitioned Table as Source

In [None]:
spark.sql(
    """
        DROP TABLE IF EXISTS default.covid_nyt_ignore_deletes;
    """
)

In [13]:
# Create Delta Tables Structure in order to write stream data
spark.sql(
    """
        CREATE TABLE IF NOT EXISTS default.covid_nyt_ignore_deletes
        USING DELTA
        LOCATION './covid_nyt_ignore_deletes'
    """
)

In [None]:
# Note: Spark needs update in order to support ANALYZE .. COMPUTE STATISTICS
# in order to collect statistics of Delta Tables v2
spark.sql(
    """
        ANALYZE TABLE default.covid_nyt COMPUTE STATISTICS
    """
)

In [26]:
# Alternative approach
df = spark.read.format("json").load("./spark-warehouse/covid_nyt/_delta_log")
df


add,commitInfo,metaData,protocol
,"{Apache-Spark/3.5.4 Delta-Lake/3.2.0, true, Serializable, WRITE, {8, 6550290, 1111930}, {NULL, NULL, NULL, NULL, Append, [], NULL}, 1, 1741457277490, 2f88a686-d657-405a-9ac8-92b824f89f78}",,
,,"{{6, name, interval 7 days, NULL, NULL}, 1741457252578, {parquet}, 2a7017eb-b2ab-4be5-9f16-41c0ee2be0e6, [], {""type"":""struct"",""fields"":[{""name"":""date"",""type"":""date"",""nullable"":true,""metadata"":{""delta.columnMapping.id"":1,""delta.columnMapping.physicalName"":""date""}},{""name"":""county"",""type"":""string"",""nullable"":true,""metadata"":{""delta.columnMapping.id"":2,""delta.columnMapping.physicalName"":""col-56356e29-24ec-4c79-99f0-adf7cd37b672""}},{""name"":""state"",""type"":""string"",""nullable"":true,""metadata"":{""delta.columnMapping.id"":3,""delta.columnMapping.physicalName"":""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537""}},{""name"":""fips"",""type"":""integer"",""nullable"":true,""metadata"":{""delta.columnMapping.id"":4,""delta.columnMapping.physicalName"":""col-983dfa3b-37e6-4450-98bc-7c3762e9304d""}},{""name"":""cases"",""type"":""integer"",""nullable"":true,""metadata"":{""delta.columnMapping.id"":5,""delta.columnMapping.physicalName"":""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e""}},{""name"":""deaths"",""type"":""integer"",""nullable"":true,""metadata"":{""delta.columnMapping.id"":6,""delta.columnMapping.physicalName"":""col-164fc145-862d-4ea3-bec9-092ac5a3aacf""}}]}}",
"{true, 1741457277467, part-00000-4706d30d-61ab-4140-b5d9-5c7ce7907caa-c000.snappy.parquet, 928066, {""numRecords"":148339,""minValues"":{""date"":""2020-11-26"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Abbeville"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Alabama"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1001,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":0},""maxValues"":{""date"":""2021-01-10"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Ziebach"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Wyoming"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":78030,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":920560,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":25562},""nullCount"":{""date"":0,""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":0,""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":0,""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1262,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":3588}}}",,,
"{true, 1741457277445, part-00001-dc4bfe05-a840-4b66-a883-8c7d1143abf5-c000.snappy.parquet, 980374, {""numRecords"":150290,""minValues"":{""date"":""2020-10-10"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Abbeville"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Alabama"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1001,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":0},""maxValues"":{""date"":""2020-11-26"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Ziebach"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Wyoming"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":78030,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":383373,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":24230},""nullCount"":{""date"":0,""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":0,""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":0,""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1346,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":3588}}}",,,
"{true, 1741457277410, part-00002-40677ab9-37d8-4f54-827e-7d25dbfe8cd6-c000.snappy.parquet, 901110, {""numRecords"":147181,""minValues"":{""date"":""2021-01-10"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Abbeville"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Alabama"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1001,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":0},""maxValues"":{""date"":""2021-02-25"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Ziebach"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Wyoming"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":78030,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":1188101,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":29025},""nullCount"":{""date"":0,""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":0,""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":0,""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1250,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":3510}}}",,,
"{true, 1741457277482, part-00003-75742a71-5156-4d33-8321-0d76f5af8cd6-c000.snappy.parquet, 904599, {""numRecords"":151830,""minValues"":{""date"":""2020-08-25"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Abbeville"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Alabama"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1001,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":0},""maxValues"":{""date"":""2020-10-10"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Ziebach"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Wyoming"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":78030,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":281165,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":23882},""nullCount"":{""date"":0,""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":0,""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":0,""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1445,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":3666}}}",,,
"{true, 1741457277462, part-00004-da29f767-9b36-4f59-9e9b-a814516bfd93-c000.snappy.parquet, 862066, {""numRecords"":153372,""minValues"":{""date"":""2020-07-08"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Abbeville"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Alabama"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1001,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":0},""maxValues"":{""date"":""2020-08-25"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Ziebach"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Wyoming"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":78030,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":237032,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":23662},""nullCount"":{""date"":0,""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":0,""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":0,""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1355,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":3744}}}",,,
"{true, 1741457277482, part-00005-c62bbc2e-9383-4c9c-ac23-0c6e4ccd9aac-c000.snappy.parquet, 845325, {""numRecords"":155494,""minValues"":{""date"":""2020-05-19"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Abbeville"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Alabama"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1001,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":0},""maxValues"":{""date"":""2020-07-08"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Ziebach"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Wyoming"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":78030,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":222156,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":22690},""nullCount"":{""date"":0,""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":0,""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":0,""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1387,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":3890}}}",,,
"{true, 1741457277475, part-00006-555393a6-3820-4b75-bcdb-c9c58589fccd-c000.snappy.parquet, 793105, {""numRecords"":157865,""minValues"":{""date"":""2020-01-21"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Abbeville"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Alabama"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1001,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":0},""maxValues"":{""date"":""2020-05-19"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Ziebach"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Wyoming"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":78030,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":198114,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":20298},""nullCount"":{""date"":0,""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":0,""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":0,""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1746,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":1065}}}",,,
"{true, 1741457277074, part-00007-1c0dceb9-0467-4319-8551-f93c4214f7b1-c000.snappy.parquet, 335645, {""numRecords"":47559,""minValues"":{""date"":""2021-02-25"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Abbeville"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Alabama"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":1001,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":0},""maxValues"":{""date"":""2021-03-11"",""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":""Ziebach"",""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":""Wyoming"",""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":78030,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":1208672,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":30068},""nullCount"":{""date"":0,""col-56356e29-24ec-4c79-99f0-adf7cd37b672"":0,""col-e70f7f07-f9f5-4c56-b0c8-0e5308f0d537"":0,""col-983dfa3b-37e6-4450-98bc-7c3762e9304d"":408,""col-80e3bd88-c76a-4f0f-8f5b-a350fe9aa25e"":0,""col-164fc145-862d-4ea3-bec9-092ac5a3aacf"":1170}}}",,,


In [23]:
# Find columns used to partition the table, if any
covid_nyt_desc_ext = spark.sql(
    """
    DESCRIBE EXTENDED default.covid_nyt
    """
)

covid_nyt_desc_ext

col_name,data_type,comment
date,date,
county,string,
state,string,
fips,int,
cases,int,
deaths,int,
,,
# Detailed Table Information,,
Name,spark_catalog.default.covid_nyt,
Type,MANAGED,


In [None]:
# Find columns used to partition the table, if any
covid_nyt_desc_det = spark.sql(
    """
    DESCRIBE DETAIL default.covid_nyt
    """
)

covid_nyt_desc_det

In [20]:
# Select data from table to choose target for deletion
# Find columns used to partition the table, if any
covid_nyt_df = spark.sql(
    """
    SELECT * FROM default.covid_nyt
    ORDER BY date
    """
)

covid_nyt_df

date,county,state,fips,cases,deaths
2020-01-21,Snohomish,Washington,53061,1,0
2020-01-22,Snohomish,Washington,53061,1,0
2020-01-23,Snohomish,Washington,53061,1,0
2020-01-24,Cook,Illinois,17031,1,0
2020-01-24,Snohomish,Washington,53061,1,0
2020-01-25,Orange,California,6059,1,0
2020-01-25,Cook,Illinois,17031,1,0
2020-01-25,Snohomish,Washington,53061,1,0
2020-01-26,Maricopa,Arizona,4013,1,0
2020-01-26,Los Angeles,California,6037,1,0


In [56]:
# Create readStream with ignoreDeletes
stream_covid_nyt_ignore_deletes = (
    spark
    .readStream
    .format("delta")
    .option("ignoreDeletes", "true")
    .load("./spark-warehouse/covid_nyt")
)

In [60]:
# Create writeStream that writes to created Delta table

# .option("checkpointLocation", "streaming_checkpoints/ch07/covid_nyt_ignore_deletes/ws1")
# First checkpointed fails the stream, because a file was deleted after DELETE command of source non-partitioned table
# Second checkpoint to restart the stream and see if update and inserts are getting reprocessed and data duplicated because of new stream start

stream_covid_nyt_ignore_deletes \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "streaming_checkpoints/ch07/covid_nyt_ignore_deletes/ws2") \
.start("./spark-warehouse/covid_nyt_ignore_deletes")

<pyspark.sql.streaming.query.StreamingQuery at 0x16b75154210>

In [33]:
# Select from source before delete
# Target to Delete - date = 2020-01-24, county = Snohomish	
covid_nyt_ignore_deletes_select = spark.sql(
    """
        SELECT * 
        FROM default.covid_nyt
        WHERE date = CAST('2020-01-24' AS DATE)
            AND county = 'Snohomish'
    """
)

covid_nyt_ignore_deletes_select

date,county,state,fips,cases,deaths
2020-01-24,Snohomish,Washington,53061,1,0


In [34]:
# Observe source table before delete - covid_nyt_ignore_deletes
covid_nyt_ignore_deletes_count_post_delete = spark.sql(
  """
      SELECT COUNT(*) FROM default.covid_nyt_ignore_deletes
  """
)

covid_nyt_ignore_deletes_count_post_delete

count(1)
1111930


In [35]:
# Delete record from covid_nyt following the rule of using column used as partition
# Target to Delete - date = 2020-01-24, county = Snohomish	
covid_nyt_ignore_deletes_delete = spark.sql(
    """
        DELETE FROM default.covid_nyt
        WHERE date = CAST('2020-01-24' AS DATE)
            AND county = 'Snohomish'
    """
)

In [71]:
# Observe target table after delete - covid_nyt_ignore_deletes
covid_nyt_ignore_deletes_count_post_delete = spark.sql(
  """
      SELECT COUNT(*) 
      FROM default.covid_nyt_ignore_deletes
  """
)

covid_nyt_ignore_deletes_count_post_delete

count(1)
2223862


In [37]:
# See source post delete
# Target to Delete - date = 2020-01-24, county = Snohomish	
covid_nyt_ignore_deletes_select = spark.sql(
    """
        SELECT * 
        FROM default.covid_nyt
        WHERE date = CAST('2020-01-24' AS DATE)
            AND county = 'Snohomish'
    """
)

In [38]:
# Observe target post delete
# Target to Delete - date = 2020-01-24, county = Snohomish	
covid_nyt_ignore_deletes_select = spark.sql(
    """
        SELECT * 
        FROM default.covid_nyt_ignore_deletes
        WHERE date = CAST('2020-01-24' AS DATE)
            AND county = 'Snohomish'
    """
)

covid_nyt_ignore_deletes_select

date,county,state,fips,cases,deaths
2020-01-24,Snohomish,Washington,53061,1,0


In [54]:
# Observe source history
covid_nyt_versions = spark.sql(
    """
     DESCRIBE HISTORY default.covid_nyt
    """
)

covid_nyt_versions

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
8,2025-04-17 22:39:11.739,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,7.0,Serializable,True,"{numFiles -> 2, numOutputRows -> 2, numOutputBytes -> 5908}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
7,2025-04-17 22:35:55.654,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,6.0,Serializable,True,"{numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 2895}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
6,2025-04-17 22:33:42.608,,,UPDATE,"{predicate -> [""((date#4601 = 2020-01-24) AND (county#4602 = Cook))""]}",,,,5.0,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 775201, numCopiedRows -> 157863, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 2501, numDeletionVectorsUpdated -> 0, scanTimeMs -> 2084, numAddedFiles -> 1, numUpdatedRows -> 1, numAddedBytes -> 776209, rewriteTimeMs -> 416}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
5,2025-04-17 22:26:03.475,,,DELETE,"{predicate -> [""((date#3389 = 2020-01-24) AND (county#3390 = Snohomish))""]}",,,,4.0,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 793105, numCopiedRows -> 157864, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 1347, numDeletionVectorsUpdated -> 0, numDeletedRows -> 1, scanTimeMs -> 825, numAddedFiles -> 1, numAddedBytes -> 775201, rewriteTimeMs -> 521}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
4,2025-03-09 16:07:07.589,,,UNSET TBLPROPERTIES,"{properties -> [""engineering.team_name"",""engineering.slack""], ifExists -> true}",,,,3.0,Serializable,True,{},,Apache-Spark/3.5.4 Delta-Lake/3.2.0
3,2025-03-09 15:56:49.011,,,SET TBLPROPERTIES,"{properties -> {""engineering.team_name"":""dldg_authors"",""engineering.slack"":""delta-users.slack.com""}}",,,,2.0,Serializable,True,{},,Apache-Spark/3.5.4 Delta-Lake/3.2.0
2,2025-03-08 20:07:57.496,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,1.0,Serializable,True,"{numFiles -> 8, numOutputRows -> 1111930, numOutputBytes -> 6550290}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
1,2025-03-08 20:07:42.205,,,SET TBLPROPERTIES,"{properties -> {""delta.columnMapping.mode"":""name"",""delta.minReaderVersion"":""2"",""delta.minWriterVersion"":""5""}}",,,,0.0,Serializable,True,{},,Apache-Spark/3.5.4 Delta-Lake/3.2.0
0,2025-03-08 20:07:32.588,,,CREATE TABLE,"{partitionBy -> [], clusterBy -> [], description -> NULL, isManaged -> true, properties -> {""delta.logRetentionDuration"":""interval 7 days""}}",,,,,Serializable,True,{},,Apache-Spark/3.5.4 Delta-Lake/3.2.0


In [61]:
# Observe target post delete
# Target to Delete - date = 2020-01-24, county = Snohomish	
covid_nyt_ignore_deletes_versions = spark.sql(
    """
     DESCRIBE HISTORY default.covid_nyt_ignore_deletes
    """
)

covid_nyt_ignore_deletes_versions

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2025-04-17 22:48:54.534,,,STREAMING UPDATE,"{outputMode -> Append, queryId -> 2177981f-3d61-4e68-910e-998469d01ae5, epochId -> 0}",,,,0.0,Serializable,True,"{numRemovedFiles -> 0, numOutputRows -> 1111932, numOutputBytes -> 6229164, numAddedFiles -> 4}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
0,2025-04-17 22:25:34.993,,,STREAMING UPDATE,"{outputMode -> Append, queryId -> d57b5de8-c886-4e70-ba89-c564379b012a, epochId -> 0}",,,,,Serializable,True,"{numRemovedFiles -> 0, numOutputRows -> 1111930, numOutputBytes -> 6323054, numAddedFiles -> 4}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0


In [40]:
# Observe target post delete
# Target to Delete - date = 2020-01-24, county = Cook
covid_nyt_ignore_deletes_select = spark.sql(
    """
        SELECT * 
        FROM default.covid_nyt_ignore_deletes
        WHERE date = CAST('2023-01-24' AS DATE)
            AND county = 'Cook'
    """
)

covid_nyt_ignore_deletes_select

date,county,state,fips,cases,deaths
2020-01-24,Cook,Illinois,17031,1,0


In [48]:
# Insert single record on source and see if it gets processed into target
# Target to Insert - date = 2023-01-24, county = Cook cases = 12 deaths 3
covid_nyt_ignore_deletes_update = spark.sql(
    """
        INSERT INTO default.covid_nyt VALUES
        ('2025-01-24', 'Minneapolis', 'Minnesota', 17032, 9, 0),
        ('2025-01-25', 'Los Angeles', 'California', 17033, 1, 0)
    """
)

covid_nyt_ignore_deletes_update

In [69]:
# Observe target post delete
covid_nyt_ignore_deletes_select = spark.sql(
    """
        SELECT * 
        FROM default.covid_nyt_ignore_deletes
        WHERE 
        (
            date = CAST('2025-01-24' AS DATE)
            OR date = CAST('2025-01-25' AS DATE) 
        ) OR
        (
            county = 'Cook'
            AND date_trunc('year', date) = cast('2023-01-01' as date)
        )
    """
)

covid_nyt_ignore_deletes_select

date,county,state,fips,cases,deaths
2025-01-25,Los Angeles,California,17033,1,0
2025-01-24,Minneapolis,Minnesota,17032,9,0
2023-01-24,Cook,Illionis,17032,12,3


#### ignoreDeletes Partitioned tables as Source

In [23]:
spark.sql(
    """
        DROP TABLE IF EXISTS default.covid_nyt_ignore_deletes_partitioned;
    """
)

In [24]:
# Create Delta Tables Structure in order to write stream data
spark.sql(
    """
        CREATE TABLE IF NOT EXISTS default.covid_nyt_ignore_deletes_partitioned
        USING DELTA
        LOCATION './spark-warehouse/covid_nyt_ignore_deletes_partitioned'
    """
)

In [17]:
show_schemas = spark.sql(
    """
        SHOW TABLES FROM BRONZE;
    """
)

show_schemas

namespace,tableName,isTemporary
bronze,covid_nyt_by_date,False


In [None]:
# Get details for partitioned table for source
bronze_covid_nyt_by_date_det = spark.sql(
    """
        DESCRIBE DETAIL bronze.covid_nyt_by_date
    """
)

bronze_covid_nyt_by_date_det

In [28]:
stream_covid_nyt_by_date_ignore_updates = (
    spark
    .readStream
    .format("delta")
    .option("ignoreDeletes", "true")
    .load("./spark-warehouse/bronze.db/covid_nyt_by_date")
)

In [29]:

(
    stream_covid_nyt_by_date_ignore_updates
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("checkpointLocation", "streaming_checkpoints/ch07/covid_nyt_ignore_deletes_partitioned/ws1")
    .start("./spark-warehouse/covid_nyt_ignore_deletes_partitioned")
)

<pyspark.sql.streaming.query.StreamingQuery at 0x21a7fb52e90>

In [34]:
# [PRE-DELETE] Retrieve taget table count compare with target
from pyspark.sql.functions import lit

target_df = (
    spark
    .read
    .format("delta")
    .load("./spark-warehouse/covid_nyt_ignore_deletes_partitioned")
)

source_df = (
    spark
    .read
    .format("delta")
    .load("./spark-warehouse/bronze.db/covid_nyt_by_date")
)

source_vs_target_counts = (
    spark.createDataFrame([(source_df.count(),)], ["row_count"]).withColumn("r_type", lit("SOURCE"))
    .unionAll(
        spark.createDataFrame([(target_df.count(),)], ["row_count"]).withColumn("r_type", lit("TARGET"))
    )
)

source_vs_target_counts

row_count,r_type
1111930,SOURCE
1111930,TARGET


In [35]:
# Find and retrieve all source candidates for delete that will remove entire file/directory for single day
# date = 2020-01-28
from pyspark.sql.functions import col

covid_nyt_by_date_delete_candidates = (
    source_df
    .filter(source_df.date == '2020-01-28')
)

covid_nyt_by_date_delete_candidates

date,county,state,fips,cases,deaths
2020-01-28,Maricopa,Arizona,4013,1,0
2020-01-28,Los Angeles,California,6037,1,0
2020-01-28,Orange,California,6059,1,0
2020-01-28,Cook,Illinois,17031,1,0
2020-01-28,Snohomish,Washington,53061,1,0


In [37]:
#Count of candidate rows for delete 
covid_nyt_by_date_count = covid_nyt_by_date_delete_candidates.count()

covid_nyt_by_date_count

5

In [38]:
# DELETE DML for date = 2020-01-28
delete_response = spark.sql(
    """
        DELETE FROM bronze.covid_nyt_by_date
        WHERE date = CAST('2020-01-28' AS DATE);
    """
)

delete_response

num_affected_rows
5


In [12]:
# [POST-DELETE] Retrieve taget table count compare with target
from pyspark.sql.functions import lit

target_df_2 = (
    spark
    .read
    .format("delta")
    .load("./spark-warehouse/covid_nyt_ignore_deletes_partitioned")
)

source_df_2 = (
    spark
    .read
    .format("delta")
    .load("./spark-warehouse/bronze.db/covid_nyt_by_date")
)

source_vs_target_counts_2 = (
    spark.createDataFrame([(source_df_2.count(),)], ["row_count"]).withColumn("r_type", lit("SOURCE"))
    .unionAll(
        spark.createDataFrame([(target_df_2.count(),)], ["row_count"]).withColumn("r_type", lit("TARGET"))
    )
)

source_vs_target_counts_2

row_count,r_type
1111925,SOURCE
1111930,TARGET


In [52]:
# Check target rows that have been deleted from source
# date = 2020-01-28
from pyspark.sql.functions import col

target_check_deletes_still_exist = (
    target_df_2
    .filter(target_df_2.date == '2020-01-28')
)

covid_nyt_by_date_delete_candidates

date,county,state,fips,cases,deaths


In [54]:
target_df_2_ordered = (
    target_df_2.filter(target_df_2.date == '2020-01-28').orderBy("date")
)
target_df_2_ordered

date,county,state,fips,cases,deaths
2020-01-28,Maricopa,Arizona,4013,1,0
2020-01-28,Los Angeles,California,6037,1,0
2020-01-28,Orange,California,6059,1,0
2020-01-28,Cook,Illinois,17031,1,0
2020-01-28,Snohomish,Washington,53061,1,0


In [46]:
show_schemas = spark.sql(
    """
        SHOW TABLES FROM default;
    """
)

show_schemas

namespace,tableName,isTemporary
default,covid_nyt,False
default,covid_nyt_by_date,False
default,covid_nyt_deaths,False
default,covid_nyt_ignore_deletes,False
default,covid_nyt_ignore_deletes_partitioned,False
default,covid_nyt_stream_1,False
default,nonoptimal_covid_nyt,False


In [10]:
source_history = spark.sql(
    """
        DESCRIBE HISTORY bronze.covid_nyt_by_date
    """
)

source_history

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2025-04-18 20:06:46.391,,,DELETE,"{predicate -> [""(date#4471 = 2020-01-28)""]}",,,,1.0,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 1617, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 197, numDeletionVectorsUpdated -> 0, numDeletedRows -> 5, scanTimeMs -> 195, numAddedFiles -> 0, numAddedBytes -> 0, rewriteTimeMs -> 0}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
1,2025-03-19 11:34:36.185,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,0.0,Serializable,True,"{numFiles -> 421, numOutputRows -> 1111930, numOutputBytes -> 17654179}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
0,2025-03-19 11:34:26.935,,,CREATE TABLE,"{partitionBy -> [""date""], clusterBy -> [], description -> NULL, isManaged -> true, properties -> {}}",,,,,Serializable,True,{},,Apache-Spark/3.5.4 Delta-Lake/3.2.0


In [11]:
source_check_for_deleted_records = spark.sql(
    """
        SELECT *
        FROM bronze.covid_nyt_by_date
        WHERE date = CAST('2020-01-28' AS DATE)
    """
) 

source_check_for_deleted_records

date,county,state,fips,cases,deaths


#### ignoreChanges w/ Non-partitioned table

In [23]:
spark.sql(
    """
        DROP TABLE IF EXISTS default.covid_nyt_ignore_changes_non_partitioned;
    """
)

In [49]:
import os
print(os.getcwd())

E:\Programming\python_project\delta_lake_pyspark_learning\delta-lake-definitive-guide


In [24]:
# Create Delta Tables Structure in order to write stream data

spark.sql(
    """
        CREATE TABLE IF NOT EXISTS default.covid_nyt_ignore_changes_non_partitioned
        USING DELTA
        -- LOCATION './covid_nyt_ignore_changes_non_partitioned'
    """
)

In [25]:
stream_covid_nyt_ignore_updates = (
    spark
    .readStream
    .format("delta")
    .option("ignoreChanges", "true")
    .load("./spark-warehouse/covid_nyt") # Non-Partitioned Source
)

In [26]:
(
    stream_covid_nyt_ignore_updates
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("checkpointLocation", "streaming_checkpoints/ch07/covid_nyt_ignore_changes_non_partitioned/ws1")
    .start("./spark-warehouse/covid_nyt_ignore_changes_non_partitioned") # To Non-Partitioned Target
)

<pyspark.sql.streaming.query.StreamingQuery at 0x27dc1fc6c90>

In [27]:
# Find candidates in source for update
source_update_candidates = spark.sql(
    """
        SELECT *
        FROM default.covid_nyt
        WHERE date = CAST('2020-01-25' AS DATE) -- Predicate for updates
    """
)

source_update_candidates

date,county,state,fips,cases,deaths
2020-01-25,Orange,California,6059,10,0
2020-01-25,Cook,Illinois,17031,10,0
2020-01-25,Snohomish,Washington,53061,10,0


In [34]:
# Update
source_updates_response = spark.sql(
    """
        UPDATE default.covid_nyt
        SET cases = 13
        WHERE date = CAST('2020-01-25' AS DATE) -- Predicate for updates
    """
)
# Expected 3 updated rows
source_updates_response

num_affected_rows
3


In [35]:
# Check candidates in source post-update
source_update_candidates = spark.sql(
    """
        SELECT *
        FROM default.covid_nyt
        WHERE date = CAST('2020-01-25' AS DATE) -- Predicate for updates
    """
)

source_update_candidates

date,county,state,fips,cases,deaths
2020-01-25,Orange,California,6059,13,0
2020-01-25,Cook,Illinois,17031,13,0
2020-01-25,Snohomish,Washington,53061,13,0


In [36]:
# Check candidates in target post-update
target_update_candidates = spark.sql(
    """
        SELECT *
        FROM default.covid_nyt_ignore_changes_non_partitioned
        WHERE date = CAST('2020-01-25' AS DATE)
    """
)

target_update_candidates

date,county,state,fips,cases,deaths
2020-01-25,Orange,California,6059,13,0
2020-01-25,Cook,Illinois,17031,13,0
2020-01-25,Snohomish,Washington,53061,13,0
2020-01-25,Orange,California,6059,13,0
2020-01-25,Cook,Illinois,17031,13,0
2020-01-25,Snohomish,Washington,53061,13,0
2020-01-25,Orange,California,6059,10,0
2020-01-25,Cook,Illinois,17031,10,0
2020-01-25,Snohomish,Washington,53061,10,0


In [44]:
show_tables_again = spark.sql(
    """
        SHOW TABLES
    """
)

show_tables_again

namespace,tableName,isTemporary
default,covid_nyt,False
default,covid_nyt_by_date,False
default,covid_nyt_deaths,False
default,covid_nyt_ignore_changes_non_partitioned,False
default,covid_nyt_ignore_deletes,False
default,covid_nyt_ignore_deletes_partitioned,False
default,covid_nyt_stream_1,False
default,nonoptimal_covid_nyt,False


In [70]:
# Check history of Source
history_source = spark.sql(
    """
        DESCRIBE HISTORY default.covid_nyt
    """
)

history_source

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
11,2025-04-21 15:13:50.684,,,UPDATE,"{predicate -> [""(date#13627 = 2020-01-25)""]}",,,,10,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 775396, numCopiedRows -> 157861, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 1634, numDeletionVectorsUpdated -> 0, scanTimeMs -> 1279, numAddedFiles -> 1, numUpdatedRows -> 3, numAddedBytes -> 775396, rewriteTimeMs -> 355}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
10,2025-04-21 14:43:52.767,,,UPDATE,"{predicate -> [""(date#7620 = 2020-01-25)""]}",,,,9,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 775396, numCopiedRows -> 157861, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 1324, numDeletionVectorsUpdated -> 0, scanTimeMs -> 920, numAddedFiles -> 1, numUpdatedRows -> 3, numAddedBytes -> 775396, rewriteTimeMs -> 403}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
9,2025-04-21 13:57:52.903,,,UPDATE,"{predicate -> [""(date#3999 = 2020-01-25)""]}",,,,8,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 776209, numCopiedRows -> 157861, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 1457, numDeletionVectorsUpdated -> 0, scanTimeMs -> 996, numAddedFiles -> 1, numUpdatedRows -> 3, numAddedBytes -> 775396, rewriteTimeMs -> 460}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
8,2025-04-17 22:39:11.739,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,7,Serializable,True,"{numFiles -> 2, numOutputRows -> 2, numOutputBytes -> 5908}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
7,2025-04-17 22:35:55.654,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,6,Serializable,True,"{numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 2895}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
6,2025-04-17 22:33:42.608,,,UPDATE,"{predicate -> [""((date#4601 = 2020-01-24) AND (county#4602 = Cook))""]}",,,,5,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 775201, numCopiedRows -> 157863, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 2501, numDeletionVectorsUpdated -> 0, scanTimeMs -> 2084, numAddedFiles -> 1, numUpdatedRows -> 1, numAddedBytes -> 776209, rewriteTimeMs -> 416}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
5,2025-04-17 22:26:03.475,,,DELETE,"{predicate -> [""((date#3389 = 2020-01-24) AND (county#3390 = Snohomish))""]}",,,,4,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 793105, numCopiedRows -> 157864, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 1347, numDeletionVectorsUpdated -> 0, numDeletedRows -> 1, scanTimeMs -> 825, numAddedFiles -> 1, numAddedBytes -> 775201, rewriteTimeMs -> 521}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0


In [37]:
# Check history of Target
# Check history of Source
history_target = spark.sql(
    """
            DESCRIBE HISTORY default.covid_nyt_ignore_changes_non_partitioned
    """
)

history_target

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2025-04-21 21:28:54.652,,,STREAMING UPDATE,"{outputMode -> Append, queryId -> 83483b97-73dc-4eb5-94c2-11bd98074ce1, epochId -> 4}",,,,2.0,Serializable,True,"{numRemovedFiles -> 0, numOutputRows -> 157864, numOutputBytes -> 774349, numAddedFiles -> 1}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
2,2025-04-21 21:28:44.875,,,STREAMING UPDATE,"{outputMode -> Append, queryId -> 83483b97-73dc-4eb5-94c2-11bd98074ce1, epochId -> 3}",,,,1.0,Serializable,True,"{numRemovedFiles -> 0, numOutputRows -> 157864, numOutputBytes -> 774349, numAddedFiles -> 1}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
1,2025-04-21 21:27:35.133,,,STREAMING UPDATE,"{outputMode -> Append, queryId -> 83483b97-73dc-4eb5-94c2-11bd98074ce1, epochId -> 2}",,,,0.0,Serializable,True,"{numRemovedFiles -> 0, numOutputRows -> 157864, numOutputBytes -> 774148, numAddedFiles -> 1}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
0,2025-04-21 21:27:10.563,,,CREATE TABLE,"{partitionBy -> [], clusterBy -> [], description -> NULL, isManaged -> true, properties -> {}}",,,,,Serializable,True,{},,Apache-Spark/3.5.4 Delta-Lake/3.2.0


#### ignoreChanges w/ Partitioned table

In [None]:
spark.sql(
    """
        DROP TABLE IF EXISTS default.covid_nyt_ignore_changes_partitioned;
    """
)

In [9]:
# Create Delta Table Structure in order to write stream data

spark.sql(
    """
        CREATE TABLE IF NOT EXISTS default.covid_nyt_ignore_changes_partitioned
        USING DELTA
        -- LOCATION './covid_nyt_ignore_changes_partitioned'
    """
)

In [13]:
streaming_covid_nyt_by_date_ignore_changes = (
    spark
    .readStream
    .format("delta")
    .option("ignoreChanges", "true")
    .load("./spark-warehouse/bronze.db/covid_nyt_by_date") # Partitioned Source by `date`
)

In [15]:
(
    streaming_covid_nyt_by_date_ignore_changes
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("checkpointLocation", "streaming_checkpoints/ch07/covid_nyt_ignore_changes_partitioned/ws1")
    .start("./spark-warehouse/covid_nyt_ignore_changes_partitioned") # To Non-Partitioned Target
)

<pyspark.sql.streaming.query.StreamingQuery at 0x27dc1f73c50>

In [12]:
# Find candidate rows for update, tied to 1 partition
source_update_candidates = (
    spark.sql(
        """
            SELECT * 
            FROM bronze.covid_nyt_by_date
            WHERE date = CAST('2020-01-27' AS DATE)
        """
    )
)

source_update_candidates

date,county,state,fips,cases,deaths
2020-01-27,Maricopa,Arizona,4013,1,0
2020-01-27,Los Angeles,California,6037,1,0
2020-01-27,Orange,California,6059,1,0
2020-01-27,Cook,Illinois,17031,1,0
2020-01-27,Snohomish,Washington,53061,1,0


In [16]:
update_response = spark.sql(
    """
        UPDATE bronze.covid_nyt_by_date
        SET cases = 3
        WHERE date = CAST('2020-01-27' AS DATE)
    """
)

update_response

num_affected_rows
5


In [17]:
# See source post-update
source_update_candidates = (
    spark.sql(
        """
            SELECT * 
            FROM bronze.covid_nyt_by_date
            WHERE date = CAST('2020-01-27' AS DATE)
        """
    )
)

source_update_candidates

date,county,state,fips,cases,deaths
2020-01-27,Maricopa,Arizona,4013,3,0
2020-01-27,Los Angeles,California,6037,3,0
2020-01-27,Orange,California,6059,3,0
2020-01-27,Cook,Illinois,17031,3,0
2020-01-27,Snohomish,Washington,53061,3,0


In [None]:
# See target post-update

In [19]:
# See source post-update
source_update_candidates = (
    spark.sql(
        """
            SELECT * 
            FROM default.covid_nyt_ignore_changes_partitioned
            WHERE date = CAST('2020-01-27' AS DATE)
        """
    )
)

source_update_candidates

date,county,state,fips,cases,deaths
2020-01-27,Maricopa,Arizona,4013,1,0
2020-01-27,Los Angeles,California,6037,1,0
2020-01-27,Orange,California,6059,1,0
2020-01-27,Cook,Illinois,17031,1,0
2020-01-27,Snohomish,Washington,53061,1,0
2020-01-27,Maricopa,Arizona,4013,3,0
2020-01-27,Los Angeles,California,6037,3,0
2020-01-27,Orange,California,6059,3,0
2020-01-27,Cook,Illinois,17031,3,0
2020-01-27,Snohomish,Washington,53061,3,0


In [20]:
# See source history
source_history = (
    spark.sql(
        """
            DESCRIBE HISTORY bronze.covid_nyt_by_date
        """
    )
)

source_history

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2025-04-21 21:13:04.687,,,UPDATE,"{predicate -> [""(date#2542 = 2020-01-27)""]}",,,,2.0,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 1617, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 655, numDeletionVectorsUpdated -> 0, scanTimeMs -> 377, numAddedFiles -> 1, numUpdatedRows -> 5, numAddedBytes -> 1616, rewriteTimeMs -> 275}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
2,2025-04-18 20:06:46.391,,,DELETE,"{predicate -> [""(date#4471 = 2020-01-28)""]}",,,,1.0,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 1617, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 197, numDeletionVectorsUpdated -> 0, numDeletedRows -> 5, scanTimeMs -> 195, numAddedFiles -> 0, numAddedBytes -> 0, rewriteTimeMs -> 0}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
1,2025-03-19 11:34:36.185,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,0.0,Serializable,True,"{numFiles -> 421, numOutputRows -> 1111930, numOutputBytes -> 17654179}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
0,2025-03-19 11:34:26.935,,,CREATE TABLE,"{partitionBy -> [""date""], clusterBy -> [], description -> NULL, isManaged -> true, properties -> {}}",,,,,Serializable,True,{},,Apache-Spark/3.5.4 Delta-Lake/3.2.0


In [21]:
# See source history
target_history = (
    spark.sql(
        """
            DESCRIBE HISTORY default.covid_nyt_ignore_changes_partitioned
        """
    )
)

target_history

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2025-04-21 21:13:06.91,,,STREAMING UPDATE,"{outputMode -> Append, queryId -> 784a1a3c-3e13-4874-8117-e9c95c214a60, epochId -> 1}",,,,1.0,Serializable,True,"{numRemovedFiles -> 0, numOutputRows -> 5, numOutputBytes -> 1872, numAddedFiles -> 1}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
1,2025-04-21 21:12:43.354,,,STREAMING UPDATE,"{outputMode -> Append, queryId -> 784a1a3c-3e13-4874-8117-e9c95c214a60, epochId -> 0}",,,,0.0,Serializable,True,"{numRemovedFiles -> 0, numOutputRows -> 1111925, numOutputBytes -> 6759156, numAddedFiles -> 14}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
0,2025-04-21 20:59:26.978,,,CREATE TABLE,"{partitionBy -> [], clusterBy -> [], description -> NULL, isManaged -> true, properties -> {}}",,,,,Serializable,True,{},,Apache-Spark/3.5.4 Delta-Lake/3.2.0


# startingVersion and startingTimestamp tests
TLDR;
Both options are in order to start streaming of Delta table from a given point in time  
or a version. 

Using the `startingVersion` approach is more friendly since it will include  
versions from pointed one and forward. E.g. startingVersion 5 -> v5, v6, v7 .. vN  

Using the `startingTimestamp` can be tricky. If the used timestamp matches an exact  
timestamp that is in transaction log (timestamp when new version of Delta table was  
commited), it will start from that point in time including that version (inclusive  
range like example above for startingVersion).  

If the timestamp not match with one from the transaction log, the stream will start from  
that point in time going forward (exclusive range)

**`!!!BOTH OPTIONS CANNOT BE USED TOGETHER!!!`**

Test scenario:
- `startingVersion`
    - Use `startingVersion` in a streaming query by pointing a version in the middle of source table
    - Check data in source and target afterwards
    - More details to repeat the test:
        - Find a source table to stream
        - Include the latest version to start streaming from it
        - INSERT couple records into source table
        - Observe results into target
        - **See ChatGPT response for testing scenario** Maybe try to elaborate more on suggestions
- `startingTimestmap`
    - **Scenario 1**:
        - Use timestamp that matches a commit in the source table
        - Observe data in source and target
    - **Scenario 2**
        - Use `timestamp OR date` between 2 commits in source table
        - Observe data in source and target


Observations and conclusions:  
<details>  
<summary> `startingVersion` </summary>
    
- Observation 1:  
    - If source table have deletes in Delta Log, startingVersion cannot be used!  
    - As workaround you need to use `ignoreDeletes`: `true`
- Observation 2:
    - Turn on `ognoreDeletes`, same error as in observation1
    - Reason: Records from Parquet file were partially deleted, causing the non-deleted records to be rewritten in new file
- Observation 3:
    - Use version 3 as a starting point, since only updates are happening on source table going forward
    - And again similar proble. A table that have UPDATES in transaction logs, cannot be streamed
    - Suggestion from error is to use skipChangeCommits
- Observation 4:
    - Used skipChangeCommits, again facing problem with streaming the table
    - Tried with 2 new checkpoint directories  
-  **The Final observation**:
    - Source - `default.covid_nyt_stream_1` - table with 2 versions
        - v0 - CREATE TABLE, v1 - STREAMING UPDATE from other stream test
    - 1st micro batch
        - All data from source was streamed to target
    - 2nd micro batch
        - INSERT 3 new records to source
        - Stream to target
    - 3rd batch
        - COUNT source vs. target
        - INSERT 4 new records to source
        - Data was streamed to target
        - COUNT source vs. target
        - SUCCESS!     
</details>
<details>
<summary> `startingTimestamp` </summary>

- Scenario 1 - Timestamp matching commit in source table
    - Source - `default.covid_nyt_stream_1` - **startingTimestamp - 2025-04-24 22:44:32.71**
        - At this version and timestamp INSERT DML was executed and added 2 new rows
    - Target - `default.covid_nyt_starting_timestamp`
        - Data in source from this timestamp to latest available timestamp/version was streamed to target
            - Couple duplicates were also introduced. I think they are related to DML executed in test schenario above
        - NEW INSERT DML against source table was executed adding 1 new row
        - This row was processed to target table
- Scenario 2 - Timestmap between 2 versions in source table
    - Source - `default.covid_nyt_stream_1`
      - version 2 --> TS - 2025-04-24 22:44:32.71  | version 3 --> TS - 2025-04-26 14:37:59.655
      - startingTimestamp used for streaming - TS --> `2025-04-25 12:35:00.420`
    - Target - `default.covid_nyt_starting_timestamp_between_versions`
        - Data from source AS OF version 3 and beyond was streamed to target table
        - INSERT DML against source was executed with 1 record
        - New record was introduced to target
</details>

#### startingVersion scenario 1 and only

In [20]:
source_table_history = spark.sql(
    """
        DESCRIBE HISTORY bronze.covid_nyt_by_date
    """
)

source_table_history

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
5,2025-04-24 21:58:06.798,,,UPDATE,"{predicate -> [""((date#3176 = 2020-02-20) AND (cases#3180 = 2))""]}",,,,4.0,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 1811, numCopiedRows -> 9, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 2665, numDeletionVectorsUpdated -> 0, scanTimeMs -> 2477, numAddedFiles -> 1, numUpdatedRows -> 4, numAddedBytes -> 1811, rewriteTimeMs -> 188}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
4,2025-04-24 21:56:13.912,,,UPDATE,"{predicate -> [""((date#2774 = 2020-02-20) AND (cases#2778 = 1))""]}",,,,3.0,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 1811, numCopiedRows -> 5, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 3175, numDeletionVectorsUpdated -> 0, scanTimeMs -> 866, numAddedFiles -> 1, numUpdatedRows -> 8, numAddedBytes -> 1811, rewriteTimeMs -> 2308}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
3,2025-04-21 21:13:04.687,,,UPDATE,"{predicate -> [""(date#2542 = 2020-01-27)""]}",,,,2.0,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 1617, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 655, numDeletionVectorsUpdated -> 0, scanTimeMs -> 377, numAddedFiles -> 1, numUpdatedRows -> 5, numAddedBytes -> 1616, rewriteTimeMs -> 275}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
2,2025-04-18 20:06:46.391,,,DELETE,"{predicate -> [""(date#4471 = 2020-01-28)""]}",,,,1.0,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 1617, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 197, numDeletionVectorsUpdated -> 0, numDeletedRows -> 5, scanTimeMs -> 195, numAddedFiles -> 0, numAddedBytes -> 0, rewriteTimeMs -> 0}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
1,2025-03-19 11:34:36.185,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,0.0,Serializable,True,"{numFiles -> 421, numOutputRows -> 1111930, numOutputBytes -> 17654179}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
0,2025-03-19 11:34:26.935,,,CREATE TABLE,"{partitionBy -> [""date""], clusterBy -> [], description -> NULL, isManaged -> true, properties -> {}}",,,,,Serializable,True,{},,Apache-Spark/3.5.4 Delta-Lake/3.2.0


In [17]:
# pick update candidates to create one new version
# predicate 1 --> date = 2020-02-20 and cases = 1 set cases to 15
# predicate 2 --> date = 2020-02-20 and cases = 2 set cases to 25
spark.sql(
    """
    select * 
    from bronze.covid_nyt_by_date 
    where date = cast('2020-02-20' as date)
    """
)

date,county,state,fips,cases,deaths
2020-02-20,Maricopa,Arizona,4013,1,0
2020-02-20,Humboldt,California,6023,1,0
2020-02-20,Los Angeles,California,6037,1,0
2020-02-20,Orange,California,6059,1,0
2020-02-20,San Diego,California,6073,1,0
2020-02-20,San Francisco,California,6075,2,0
2020-02-20,Santa Clara,California,6085,2,0
2020-02-20,Cook,Illinois,17031,2,0
2020-02-20,Suffolk,Massachusetts,25025,1,0
2020-02-20,Douglas,Nebraska,31055,11,0


In [18]:
# UPDATE DMLs
# predicate 1 --> date = 2020-02-20 and cases = 1 set cases to 15

spark.sql(
    """
        UPDATE bronze.covid_nyt_by_date
        SET cases = 15
        WHERE date = CAST('2020-02-20' AS DATE)
            AND cases = 1
    """
)

num_affected_rows
8


In [19]:
# predicate 2 --> date = 2020-02-20 and cases = 2 set cases to 25

spark.sql(
    """
        UPDATE bronze.covid_nyt_by_date
        SET cases = 25
        WHERE date = CAST('2020-02-20' AS DATE)
            AND cases = 2
    """
)

num_affected_rows
4


In [13]:
# DROP TABLE

spark.sql(
    """
        DROP TABLE IF EXISTS default.covid_nyt_starting_version_v2
    """
)

In [14]:
# Create Delta Tables Structure in order to write stream data

spark.sql(
    """
        CREATE TABLE default.covid_nyt_starting_version_v2
        USING DELTA
    """
)

In [28]:
"""
Not Working: See observation notes above!
streaming_query_starting_version_2 = (
    spark
    .readStream
    .format("delta")
    .option("startingVersion", "3")
    .load("./spark-warehouse/bronze.db/covid_nyt_by_date")
    .writeStream
    .queryName("covid_nyt_by_date_starting_version_2")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("partitionBy", "date")
    # .option("ignoreDeletes", "true") # uppdate after 1st stream in ws1 checpoint failed
    .option("skipCommitChanges", "true") # ws5 | ws6
    .option("checkpointLocation", "streaming_checkpoints/ch07/covid_nyt_starting_version_v2/ws6") #ws1 - FAILED | ws2 - FAILED | ws3 - FAILED | ws4 - FAILED
    .start("./spark-warehouse/covid_nyt_starting_version_v2")
)

"""

'\nNot Working: See observation notes above!\nstreaming_query_starting_version_2 = (\n    spark\n    .readStream\n    .format("delta")\n    .option("startingVersion", "3")\n    .load("./spark-warehouse/bronze.db/covid_nyt_by_date")\n    .writeStream\n    .queryName("covid_nyt_by_date_starting_version_2")\n    .outputMode("append")\n    .option("mergeSchema", "true")\n    .option("partitionBy", "date")\n    # .option("ignoreDeletes", "true") # uppdate after 1st stream in ws1 checpoint failed\n    .option("skipCommitChanges", "true") # ws5 | ws6\n    .option("checkpointLocation", "streaming_checkpoints/ch07/covid_nyt_starting_version_v2/ws6") #ws1 - FAILED | ws2 - FAILED | ws3 - FAILED | ws4 - FAILED\n    .start("./spark-warehouse/covid_nyt_starting_version_v2")\n)\n\n'

In [16]:
streaming_query_starting_version_2 = (
    spark
    .readStream
    .format("delta")
    .option("startingVersion", "1")
    .load("./spark-warehouse/covid_nyt_stream_1")
    .writeStream
    .format("delta")
    .queryName("covid_nyt_by_date_starting_version_2_attempt_421")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("partitionBy", "date") # It does not work!
    # .option("ignoreDeletes", "true") # uppdate after 1st stream in ws1 checpoint failed
    .option("skipCommitChanges", "true") # ws5 | ws6
    .option("checkpointLocation", "streaming_checkpoints/ch07/covid_nyt_starting_version_v2/ws3") #ws1 - FAILED | ws2 - FAILED | ws3 - FAILED | ws4 - FAILED
    .start("./spark-warehouse/covid_nyt_starting_version_v2")
)

In [18]:
covid_nyt_starting_version_v2 = spark.sql(
    """
        INSERT INTO default.covid_nyt_stream_1 VALUES
        (CAST('2025-01-24' AS DATE), 'Minneapolis', 'Minnesota', 17032, 9, 0),
        (CAST('2025-01-25' AS DATE), 'Los Angeles', 'California', 17033, 1, 0)
    """
)

covid_nyt_starting_version_v2

In [17]:
spark.sql(
    """
        SELECT *
        FROM default.covid_nyt_starting_version_v2
    """
)

date,county,state,fips,cases,deaths
2020-10-10,Tucker,West Virginia,54093,46,0
2020-10-10,Tyler,West Virginia,54095,20,0
2020-10-10,Upshur,West Virginia,54097,175,0
2020-10-10,Wayne,West Virginia,54099,437,11
2020-10-10,Webster,West Virginia,54101,9,0
2020-10-10,Wetzel,West Virginia,54103,68,0
2020-10-10,Wirt,West Virginia,54105,20,0
2020-10-10,Wood,West Virginia,54107,408,6
2020-10-10,Wyoming,West Virginia,54109,136,5
2020-10-10,Adams,Wisconsin,55001,316,4


In [19]:
spark.sql(
    """
        SELECT 'default.covid_nyt_stream_1 - SOURCE BEFORE' as source_name, count(*) records_qty
        FROM default.covid_nyt_stream_1
        union all
        SELECT 'default.covid_nyt_starting_version_v2 - TARGET BEFORE' as source_name, count(*) records_qty
        FROM default.covid_nyt_starting_version_v2
    """
)

source_name,records_qty
default.covid_nyt_stream_1 - SOURCE BEFORE,1111934
default.covid_nyt_starting_version_v2 - TARGET BEFORE,1111934


In [20]:
covid_nyt_stream_1 = spark.sql(
    """
        INSERT INTO default.covid_nyt_stream_1 VALUES
        (CAST('2025-02-24' AS DATE), 'Minneapolis', 'Minnesota', 17032, 29, 0),
        (CAST('2025-03-25' AS DATE), 'Los Angeles', 'California', 17033, 10, 0),
        (CAST('2025-02-24' AS DATE), 'Buffalo',	'Wisconsin', 55011,	178, 2),
        (CAST('2025-03-25' AS DATE), 'Webster',	'West Virginia', 54101,	9, 0)
    """
)

covid_nyt_stream_1

In [21]:
spark.sql(
    """
        SELECT 'default.covid_nyt_stream_1 - SOURCE AFTER' as source_name, count(*) records_qty
        FROM default.covid_nyt_stream_1
        union all
        SELECT 'default.covid_nyt_starting_version_v2 - TARGET AFTER' as source_name, count(*) records_qty
        FROM default.covid_nyt_starting_version_v2
    """
)

source_name,records_qty
default.covid_nyt_stream_1 - SOURCE AFTER,1111938
default.covid_nyt_starting_version_v2 - TARGET AFTER,1111938


#### startingTimestamp - Scenario 1: timestamp match commit in source

In [13]:
# DROP TABLE

spark.sql(
    """
        DROP TABLE IF EXISTS default.covid_nyt_starting_timestamp
    """
)

In [23]:
# Create Delta Tables Structure in order to write stream data

spark.sql(
    """
        CREATE TABLE default.covid_nyt_starting_timestamp
        USING DELTA
    """
)

In [24]:
spark.sql(
    """
        DESCRIBE HISTORY default.covid_nyt_stream_1
    """
)

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2025-04-26 14:42:45.746,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,3.0,Serializable,True,"{numFiles -> 4, numOutputRows -> 4, numOutputBytes -> 6790}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
3,2025-04-26 14:37:59.655,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,2.0,Serializable,True,"{numFiles -> 2, numOutputRows -> 2, numOutputBytes -> 3412}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
2,2025-04-24 22:44:32.71,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,1.0,Serializable,True,"{numFiles -> 2, numOutputRows -> 2, numOutputBytes -> 3412}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
1,2025-04-16 11:52:45.073,,,STREAMING UPDATE,"{outputMode -> Append, queryId -> db5ee720-a032-4fda-9981-4d8f3ea10001, epochId -> 0}",,,,0.0,Serializable,True,"{numRemovedFiles -> 0, numOutputRows -> 1111930, numOutputBytes -> 6323054, numAddedFiles -> 4}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
0,2025-04-16 11:23:46.149,,,CREATE TABLE,"{partitionBy -> [], clusterBy -> [], description -> NULL, isManaged -> false, properties -> {}}",,,,,Serializable,True,{},,Apache-Spark/3.5.4 Delta-Lake/3.2.0


In [25]:
streaming_query_starting_version_2 = (
    spark
    .readStream
    .format("delta")
    .option("startingTimestamp", "2025-04-24 22:44:32.71")
    .load("./spark-warehouse/covid_nyt_stream_1")
    .writeStream
    .format("delta")
    .queryName("covid_nyt_stream_1_starting_timestamp_attempt_1")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("checkpointLocation", "streaming_checkpoints/ch07/covid_nyt_starting_timestamp/ws1") #ws1 - FAILED | ws2 - FAILED | ws3 - FAILED | ws4 - FAILED
    .start("./spark-warehouse/covid_nyt_starting_timestamp")
)

In [27]:
spark.sql(
    """
        SELECT *
        FROM default.covid_nyt_starting_timestamp
        ORDER BY date
    """
)

date,county,state,fips,cases,deaths
2025-01-24,Minneapolis,Minnesota,17032,9,0
2025-01-24,Minneapolis,Minnesota,17032,9,0
2025-01-25,Los Angeles,California,17033,1,0
2025-01-25,Los Angeles,California,17033,1,0
2025-02-24,Buffalo,Wisconsin,55011,178,2
2025-02-24,Minneapolis,Minnesota,17032,29,0
2025-03-25,Webster,West Virginia,54101,9,0
2025-03-25,Los Angeles,California,17033,10,0


In [28]:
covid_nyt_stream_1 = spark.sql(
    """
        INSERT INTO default.covid_nyt_stream_1 VALUES
        (CAST('2025-02-24' AS DATE), 'Wyoming',	'West Virginia', 54109, 136, 5)
    """
)

covid_nyt_stream_1

In [29]:
spark.sql(
    """
        SELECT *
        FROM default.covid_nyt_starting_timestamp
        ORDER BY date
    """
)

date,county,state,fips,cases,deaths
2025-01-24,Minneapolis,Minnesota,17032,9,0
2025-01-24,Minneapolis,Minnesota,17032,9,0
2025-01-25,Los Angeles,California,17033,1,0
2025-01-25,Los Angeles,California,17033,1,0
2025-02-24,Wyoming,West Virginia,54109,136,5
2025-02-24,Buffalo,Wisconsin,55011,178,2
2025-02-24,Minneapolis,Minnesota,17032,29,0
2025-03-25,Los Angeles,California,17033,10,0
2025-03-25,Webster,West Virginia,54101,9,0


#### startingTimestamp between versions

In [13]:
# DROP TABLE

spark.sql(
    """
        DROP TABLE IF EXISTS default.covid_nyt_starting_timestamp_between_versions
    """
)

In [30]:
# Create Delta Tables Structure in order to write stream data

spark.sql(
    """
        CREATE TABLE default.covid_nyt_starting_timestamp_between_versions
        USING DELTA
    """
)

In [31]:
spark.sql(
    """
        DESCRIBE HISTORY default.covid_nyt_stream_1
    """
)

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
5,2025-04-26 14:59:40.971,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,4.0,Serializable,True,"{numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 1703}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
4,2025-04-26 14:42:45.746,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,3.0,Serializable,True,"{numFiles -> 4, numOutputRows -> 4, numOutputBytes -> 6790}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
3,2025-04-26 14:37:59.655,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,2.0,Serializable,True,"{numFiles -> 2, numOutputRows -> 2, numOutputBytes -> 3412}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
2,2025-04-24 22:44:32.71,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,1.0,Serializable,True,"{numFiles -> 2, numOutputRows -> 2, numOutputBytes -> 3412}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
1,2025-04-16 11:52:45.073,,,STREAMING UPDATE,"{outputMode -> Append, queryId -> db5ee720-a032-4fda-9981-4d8f3ea10001, epochId -> 0}",,,,0.0,Serializable,True,"{numRemovedFiles -> 0, numOutputRows -> 1111930, numOutputBytes -> 6323054, numAddedFiles -> 4}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
0,2025-04-16 11:23:46.149,,,CREATE TABLE,"{partitionBy -> [], clusterBy -> [], description -> NULL, isManaged -> false, properties -> {}}",,,,,Serializable,True,{},,Apache-Spark/3.5.4 Delta-Lake/3.2.0


In [33]:
streaming_query_starting_version_2 = (
    spark
    .readStream
    .format("delta")
    .option("startingTimestamp", "2025-04-25 12:35:00.420")
    .load("./spark-warehouse/covid_nyt_stream_1")
    .writeStream
    .format("delta")
    .queryName("covid_nyt_stream_1_starting_timestamp_between_versions_attempt_1")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("checkpointLocation", "streaming_checkpoints/ch07/covid_nyt_starting_timestamp_between_versions/ws1") #ws1 - FAILED | ws2 - FAILED | ws3 - FAILED | ws4 - FAILED
    .start("./spark-warehouse/covid_nyt_starting_timestamp_between_versions")
)

In [34]:
spark.sql(
    """
        SELECT *
        FROM default.covid_nyt_starting_timestamp_between_versions
        ORDER BY date
    """
)

date,county,state,fips,cases,deaths
2025-01-24,Minneapolis,Minnesota,17032,9,0
2025-01-25,Los Angeles,California,17033,1,0
2025-02-24,Buffalo,Wisconsin,55011,178,2
2025-02-24,Wyoming,West Virginia,54109,136,5
2025-02-24,Minneapolis,Minnesota,17032,29,0
2025-03-25,Los Angeles,California,17033,10,0
2025-03-25,Webster,West Virginia,54101,9,0


In [35]:
covid_nyt_stream_1 = spark.sql(
    """
        INSERT INTO default.covid_nyt_stream_1 VALUES
        (CAST('2025-04-01' AS DATE), 'Maricopa', 'Arizona',	4013, 0, 0)
    """
)

covid_nyt_stream_1

In [36]:
spark.sql(
    """
        SELECT *
        FROM default.covid_nyt_starting_timestamp_between_versions
        ORDER BY date
    """
)

date,county,state,fips,cases,deaths
2025-01-24,Minneapolis,Minnesota,17032,9,0
2025-01-25,Los Angeles,California,17033,1,0
2025-02-24,Wyoming,West Virginia,54109,136,5
2025-02-24,Minneapolis,Minnesota,17032,29,0
2025-02-24,Buffalo,Wisconsin,55011,178,2
2025-03-25,Los Angeles,California,17033,10,0
2025-03-25,Webster,West Virginia,54101,9,0
2025-04-01,Maricopa,Arizona,4013,0,0


# Initial Snapshot with `withEventTimeOrder`

Default ordering when using Delta Lake as a streaming data source `is based on the last modification time of  
the Delta table file`. Streaming queries run until target table is caught up to the latest version of source  
Delta Table, this is called initial snapshot at the beginning of a streaming query. We may consider whether  
the default ordering (based on file last modification time) is a correct, or if there is another event time  
field that may simplify dataset ordering (e.g. field that points exact point in time when event appears).

If we rely on a timestamp column like `last_modified` or similar, records may be processed out of order,  
leading to dropping records as late events by the watermark. This can be avoided by enabling  
`withEventTimeOrder` option, which prefers event time over the modification time. 

- The data drop issue happens only when the initial Delta snapshot of a `stateful` streaming query is processed
  in the default order
- `withEventTimeOrder` is another of those setting **that takes effect only at the beginning of a streaming query**
   Once query is started or already in progress it cannot be changed. Unless you stop the current streaming query,
   delete checkpoint and make sure of the initial processing position option `(startVersion/startTimestamp)` to
   proceed.
- If you use `withEventTimeOrder` option enable, you cannot downgrade to version that does not support it. For
   Databricks Runtime the minimum version is 11.3LTS. In case you can downgrade versions, you **MUST** wait for
  initial snapshot to finish, or delete the checkpoint and restart the query.
- Scenarios where `withEventTimeOrder` **CANNOT** be used:
    -  If the event time column is a generated column and there are nonprojection transformations between the
      Delta source and the watermark. For instance, source table have column that is generated automatically
      based on existing column in the source table `(YEAR(src.reading_time))`. If event time column is generated
      column and you apply transformations before the watermart (filtering, renaming, selection of other columns,
      adding expressions) the system cannot guarantee event-time ordering any more and streaming will **FAIL**.
    - If there is watermark with multiple Delta sources in the stream query. For instance, **2 readStreams that
      are being joined** and `withEventTimeOrder` is applied
- **Due to increased shuffle operations, performance may be impacted.**  
  
### Test scenario:
- Create Delta table from streaming query that have a column called `event_time` with value `current_timestamp()`.
  It will be used as column for `withEventTimeOrder` feature.
- Create 2nd Delta table from streaming query that use `withEventTimeOrder` feature and
  `withWatermark("event_time", "10 seconds")`
- Observe processing and result. Observe job, tasks and stages as well as execution plans and other metrics
  provided in Spark UI.

### Observation and Conclusions
- 1st attempt: Source table was a partitioned table that got DELETE as last commit in Delta Log. By some reason
  streaming did not processed data to `default.covid_nyt_with_event_time_order_source` to be used with the event
  time feature. This attempt has `FAILED`!
- 2nd attempt: Changed source table to non-partitioned one `default.covid_nyt` table. Source was prepared with
  new column called `eventTime` that was used for 2nd streaming that enable `withEventTimeOrder`.
  Streaming processed entire source into target table, no delay was processed (data volume in this env is small
  couple hunder thousand records at most).
    - SPARK UI Observation: A lot `Exchange` stages were noticed, which are sort of Shuffling stages, as Delta
      Lake documentation warns.
    - **See Physical plan of streaming query at end of experiment!**
    - Considering this experiment as `SUCCESS`!  

In [18]:
# DROP TABLE

spark.sql(
    """
        DROP TABLE IF EXISTS default.covid_nyt_with_event_time_order_source
    """
)

In [19]:
# Create Delta Tables Structure in order to write stream data

spark.sql(
    """
        CREATE TABLE default.covid_nyt_with_event_time_order_source
        USING DELTA
    """
)

In [21]:
from pyspark.sql.functions import col, current_timestamp

source_streaming = (
    spark
    .readStream
    .format("delta")
    .load('./spark-warehouse/covid_nyt')
    .withColumn('eventTime', current_timestamp())
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("checkpointLocation", "streaming_checkpoints/ch07/covid_nyt_with_event_time_order_source/ws2")
    .start("./spark-warehouse/covid_nyt_with_event_time_order_source")
)

In [22]:
spark.sql(
    """
        select * from default.covid_nyt_with_event_time_order_source limit 10
    """
)

date,county,state,fips,cases,deaths,eventTime
2020-10-10,Tucker,West Virginia,54093,46,0,2025-04-26 19:28:05.701
2020-10-10,Tyler,West Virginia,54095,20,0,2025-04-26 19:28:05.701
2020-10-10,Upshur,West Virginia,54097,175,0,2025-04-26 19:28:05.701
2020-10-10,Wayne,West Virginia,54099,437,11,2025-04-26 19:28:05.701
2020-10-10,Webster,West Virginia,54101,9,0,2025-04-26 19:28:05.701
2020-10-10,Wetzel,West Virginia,54103,68,0,2025-04-26 19:28:05.701
2020-10-10,Wirt,West Virginia,54105,20,0,2025-04-26 19:28:05.701
2020-10-10,Wood,West Virginia,54107,408,6,2025-04-26 19:28:05.701
2020-10-10,Wyoming,West Virginia,54109,136,5,2025-04-26 19:28:05.701
2020-10-10,Adams,Wisconsin,55001,316,4,2025-04-26 19:28:05.701


In [23]:
from time import sleep
sleep(10)

In [17]:
# DROP TABLE

spark.sql(
    """
        DROP TABLE IF EXISTS default.covid_nyt_with_event_time_order_target
    """
)

In [24]:
# Create Delta Tables Structure in order to write stream data

spark.sql(
    """
        CREATE TABLE default.covid_nyt_with_event_time_order_target
        USING DELTA
    """
)

In [27]:
target_streaming = (
    spark
    .readStream
    .format("delta")
    .option("withEventTimeOrder", "true")
    .load("./spark-warehouse/covid_nyt_with_event_time_order_source")
    .withWatermark("eventTime", "10 seconds")
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("checkpointLocation", "streaming_checkpoints/ch07/covid_nyt_with_event_time_order_target/ws1")
    .start("./spark-warehouse/covid_nyt_with_event_time_order_target")
)

In [28]:
spark.sql(
    """
        select * from default.covid_nyt_with_event_time_order_source
            except
        select * from default.covid_nyt_with_event_time_order_target
    """
)

date,county,state,fips,cases,deaths,eventTime


In [29]:
spark.sql(
    """
        select distinct eventTime from default.covid_nyt_with_event_time_order_source
            union all
        select distinct eventTime from default.covid_nyt_with_event_time_order_target
    """
)

eventTime
2025-04-26 19:28:05.701
2025-04-26 19:28:05.701


#### Physical Plan and last progress of Streaming with `withEventTimeOrder`

In [None]:
target_streaming.explain(extended=True)

In [None]:
target_streaming.lastProgress['sources']

# Idempotent Fanout

This is event related to streaming pipeline that writes to **multiple target Delta tables** using a single source table.  
Idempotent Fanout ensures that each target table receives consistent and non-duplicated writes even if the streaming job restarts or receives same data again.  
It is especially useful for:
- Aggregations, Dimensional updates or multiple fact tables
- CDC Streams like Change Data Feed (CDF)
- exactly-once semantics across all outputs

### Test Scenario:

1. Create 1st source table, by copying `bronze.covid_nyt_by_date` and enable CDF feature.  
    a.  Simulate streaming-like incremental data arrival with loop and sample read from
   source and append samples to target table
2. Create readStream using `readChangeDataFeed` and `startingVersion = 1`  
3. Implement `write_to_delta_lake_table_idempotent` method that takes batch_df and
    batch_id and create DataFrameWriter / not streaming / that writes to idempotent_table_1   and idempotent_table_2  
    a. Apply options `txnVersion = batch_id` and `txnAppId = delta_idempotency_example`  
4. Create readStream from source table from `step 1` using `readChangeDataFeed = True` and `startingVersion = 1`  
5. Create writeStream using streaming query from `step 5` to a delta table using `forEachBatch` and applying  
   `write_to_delta_lake_table_idempotent` together with   `outputMode = update`
6. Observe and analyze the data/outcome across all tables and especially the target idempotent tables
7. Update 5 rows in source and see how data is propagated to both target tables

### Conclusion:
All steps were executed correctly, the idempotent fanout behavior works fine. It was a mystery why at both idempotent targets  
we have 1 insert row, 2 pre-image and 2 post-image records (total 5, 1 original, 2 pre and 2 post).   
Explanation:    
- On step 1.a we loop over source of streaming source and take 5% sample records which are appended after CDF of table  
    was turned on.  
- That was not writtent to target at 1st streaming micro batch  
- After 1st streaming micro batch, update is executed exactly on those 2 rows that are duplicate on streaming source table   
    and not processed to idempotent targets as 2 insert records. This leads to 2 preimage and 2 postimage updates, that are   
    looking as anomaly in both idempotent targets.  
    
**After entire observation & evaluation was done, test is SUCCESSFULLY DONE. Both target tables received exact same rows.**

In [2]:
# TODO: Implement the logic using example from chapter_7 notebook and following the pattern from othertests related to current setup

In [86]:
spark.sql(
    """
        SELECT 
        --COUNT(*) AS qty
        *,
        row_number() over(partition by state order by county) as rn
        FROM bronze.covid_nyt_by_date
        WHERE date = CAST('2020-04-15' AS DATE)
        AND state = "Alabama"
    """
)

date,county,state,fips,cases,deaths,rn
2020-04-15,Autauga,Alabama,1001,25,1,1
2020-04-15,Baldwin,Alabama,1003,98,2,2
2020-04-15,Barbour,Alabama,1005,13,0,3
2020-04-15,Bibb,Alabama,1007,19,0,4
2020-04-15,Blount,Alabama,1009,17,0,5
2020-04-15,Bullock,Alabama,1011,8,0,6
2020-04-15,Butler,Alabama,1013,11,0,7
2020-04-15,Calhoun,Alabama,1015,62,0,8
2020-04-15,Chambers,Alabama,1017,227,10,9
2020-04-15,Cherokee,Alabama,1019,10,0,10


In [87]:
"""
Create 1st source table, by copying bronze.covid_nyt_by_date and enable CDF feature.
a. Simulate streaming-like incremental data arrival with loop and sample read from 
source and append samples to target table
"""

# Create table's structure
spark.sql(
    """
        CREATE TABLE default.covid_cdf_source_1
        USING DELTA
        --PARTITIONED BY (date)
        AS 
        SELECT
            *
        FROM bronze.covid_nyt_by_date
        WHERE date = CAST('2020-04-15' AS DATE)
            AND state = "Alabama"
    """
)


In [76]:
spark.sql(
    """
        --SHOW DATABASES
        SHOW TABLES FROM silver;
    """
)

namespace,tableName,isTemporary


In [77]:
# Drop TABLE
spark.sql(
    """
        DROP TABLE default.covid_cdf_source_1
    """
)

In [88]:
# Enable ChangeDataFeed

spark.sql(
    """
        ALTER TABLE default.covid_cdf_source_1
        SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    """
)


In [None]:
# Validate Table got created and CDF is enabled
spark.sql(
    """
        DESCRIBE DETAIL default.covid_cdf_source_1
    """
)

In [91]:
# Mimic incremental batches of data. Append 5% subset from source table to 
# covid_nyt_idempotent_fanout_1
from datetime import datetime, timedelta
from pyspark.sql.functions import col

start_ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")

# date = CAST('2020-04-15' AS DATE)
            # AND state = "Alabama"

for i in range(5):
    (
        spark
        .read
        .format("delta")
        .load("./spark-warehouse/bronze.db/covid_nyt_by_date")
        .filter((col("state") == "Alabama") & (col("date") == "2020-04-15"))
        .sample(0.05)
        .write
        .format("delta")
        .mode("append")
        .saveAsTable("default.covid_cdf_source_1")
    )

    if i == 2:
        end_ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")


In [127]:
spark.sql(
    """
        DESCRIBE HISTORY default.covid_cdf_source_1
    """
)

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
7,2025-05-06 14:30:34.353,,,UPDATE,"{predicate -> [""((date#42791 = 2020-04-15) AND upper(state#42793) IN (ALABAMA,ALASKA))""]}",,,,6.0,Serializable,False,"{numRemovedFiles -> 6, numRemovedBytes -> 20182, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 3, executionTimeMs -> 2143, numDeletionVectorsUpdated -> 0, scanTimeMs -> 359, numAddedFiles -> 3, numUpdatedRows -> 84, numAddedBytes -> 7404, rewriteTimeMs -> 1784}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
6,2025-05-06 14:22:11.91,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,5.0,Serializable,True,"{numFiles -> 1, numOutputRows -> 4, numOutputBytes -> 1807}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
5,2025-05-06 14:22:11.306,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,4.0,Serializable,True,"{numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 1654}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
4,2025-05-06 14:22:10.66,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,3.0,Serializable,True,"{numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 1761}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
3,2025-05-06 14:22:09.981,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,2.0,Serializable,True,"{numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 1722}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
2,2025-05-06 14:22:09.232,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,1.0,Serializable,True,"{numFiles -> 1, numOutputRows -> 8, numOutputBytes -> 1888}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
1,2025-05-06 14:19:24.866,,,SET TBLPROPERTIES,"{properties -> {""delta.enableChangeDataFeed"":""true""}}",,,,0.0,Serializable,True,{},,Apache-Spark/3.5.4 Delta-Lake/3.2.0
0,2025-05-06 14:19:01.248,,,CREATE TABLE AS SELECT,"{partitionBy -> [], clusterBy -> [], description -> NULL, isManaged -> true, properties -> {}}",,,,,Serializable,True,"{numFiles -> 1, numOutputRows -> 67, numOutputBytes -> 2988}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0


In [None]:
spark.sql(
    """
        SELECT 
        *,
        row_number() over(partition by state order by county) as rn,
        rank() over(partition by state order by county) as rank, 
        input_file_name() as file_name
        FROM default.covid_cdf_source_1 version as of 6
    """
)

In [30]:
# Not sure why, but let it sleep 10 seconds!
from time import sleep
sleep(10)

In [97]:
"""
3. Implement `write_to_delta_lake_table_idempotent` method that takes batch_df and
    batch_id and create DataFrameWriter / not streaming / that writes to covid_nyt_idempotent_table_1 and covid_nyt_idempotent_table_2  
    a. Apply options `txnVersion = batch_id` and `txnAppId = delta_idempotency_example`  
"""
app_id = "delta_idempotency_example"

def write_to_delta_lake_table_idempotent(batch_df, batch_id):
    # Target 1: 
    (
        batch_df
        .write
        .format("delta")
        .mode("append")
        .option("txnVersion", batch_id)
        .option("txnAppId", app_id)
        .option("mergeSchema", "true")
        .save("./spark-warehouse/covid_cdf_idempotent_1")
    )
    # Target 2
    (
        batch_df
        .write
        .format("delta")
        .mode("append")
        .option("txnVersion", batch_id)
        .option("txnAppId", app_id)
        .option("mergeSchema", "true")
        .save("./spark-warehouse/covid_cdf_idempotent_2")
    )

In [98]:
"""
4. Create readStream from source table from 
`step 1` using `readChangeDataFeed = True` and `startingVersion = 1`
"""

cdf_stream = (
    spark
    .readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 1)
    .table("default.covid_cdf_source_1")
)

In [99]:
"""
5. Create writeStream using streaming query from `step 5` to a delta table using 
`forEachBatch` and applying `write_to_delta_lake_table_idempotent` together with   
`outputMode = update`
"""

(
    cdf_stream
    .writeStream
    .format("delta")
    .queryName("CDF Pipeline - Idempotent Fanout")
    .foreachBatch(write_to_delta_lake_table_idempotent)
    .outputMode("update")
    .start()
)

<pyspark.sql.streaming.query.StreamingQuery at 0x216d51006d0>

In [None]:
"""
6. Observe and analyze the data/outcome across all tables and especially 
the target idempotent tables
"""

In [100]:
from delta import DeltaTable
from pyspark.sql.functions import col

dt_covid_cdf_idempotent_1 = DeltaTable.forPath(spark, "./spark-warehouse/covid_cdf_idempotent_1")

dt_covid_cdf_idempotent_2 = DeltaTable.forPath(spark, "./spark-warehouse/covid_cdf_idempotent_2")

df_covid_cdf_idempotent_1 = dt_covid_cdf_idempotent_1.toDF()
df_covid_cdf_idempotent_2 = dt_covid_cdf_idempotent_2.toDF()


In [103]:
df_covid_cdf_idempotent_1.orderBy(col("county"))#.filter(col("date") == "2020-04-15")

date,county,state,fips,cases,deaths,_change_type,_commit_version,_commit_timestamp
2020-04-15,Bibb,Alabama,1007,19,0,insert,6,2025-05-06 14:22:11.91
2020-04-15,Cleburne,Alabama,1029,12,0,insert,4,2025-05-06 14:22:10.66
2020-04-15,Coffee,Alabama,1031,43,0,insert,3,2025-05-06 14:22:09.981
2020-04-15,Colbert,Alabama,1033,11,1,insert,2,2025-05-06 14:22:09.232
2020-04-15,Coosa,Alabama,1037,20,1,insert,3,2025-05-06 14:22:09.981
2020-04-15,Crenshaw,Alabama,1041,4,0,insert,6,2025-05-06 14:22:11.91
2020-04-15,Cullman,Alabama,1043,40,1,insert,2,2025-05-06 14:22:09.232
2020-04-15,Hale,Alabama,1065,20,0,insert,2,2025-05-06 14:22:09.232
2020-04-15,Jefferson,Alabama,1073,628,17,insert,6,2025-05-06 14:22:11.91
2020-04-15,Lee,Alabama,1081,285,12,insert,2,2025-05-06 14:22:09.232


In [104]:
df_covid_cdf_idempotent_2.orderBy(col("county"))#.filter(col("date") == "2020-04-15")

date,county,state,fips,cases,deaths,_change_type,_commit_version,_commit_timestamp
2020-04-15,Bibb,Alabama,1007,19,0,insert,6,2025-05-06 14:22:11.91
2020-04-15,Cleburne,Alabama,1029,12,0,insert,4,2025-05-06 14:22:10.66
2020-04-15,Coffee,Alabama,1031,43,0,insert,3,2025-05-06 14:22:09.981
2020-04-15,Colbert,Alabama,1033,11,1,insert,2,2025-05-06 14:22:09.232
2020-04-15,Coosa,Alabama,1037,20,1,insert,3,2025-05-06 14:22:09.981
2020-04-15,Crenshaw,Alabama,1041,4,0,insert,6,2025-05-06 14:22:11.91
2020-04-15,Cullman,Alabama,1043,40,1,insert,2,2025-05-06 14:22:09.232
2020-04-15,Hale,Alabama,1065,20,0,insert,2,2025-05-06 14:22:09.232
2020-04-15,Jefferson,Alabama,1073,628,17,insert,6,2025-05-06 14:22:11.91
2020-04-15,Lee,Alabama,1081,285,12,insert,2,2025-05-06 14:22:09.232


In [44]:
"""
7. Update 5 rows in source and see how data is propagated to both target tables
"""

'\n7. Update 5 rows in source and see how data is propagated to both target tables\n'

In [105]:
spark.sql(
    """
        UPDATE default.covid_cdf_source_1
        SET date=current_date()
        WHERE date = CAST('2020-04-15' AS date)
        AND UPPER(state) IN ("ALABAMA", "ALASKA")
    """
)

num_affected_rows
84


In [106]:
from delta import DeltaTable
from pyspark.sql.functions import col

dt2_covid_cdf_idempotent_1 = DeltaTable.forPath(spark, "./spark-warehouse/covid_cdf_idempotent_1")

dt2_covid_cdf_idempotent_2 = DeltaTable.forPath(spark, "./spark-warehouse/covid_cdf_idempotent_2")

df2_covid_cdf_idempotent_1 = dt2_covid_cdf_idempotent_1.toDF()
df2_covid_cdf_idempotent_2 = dt2_covid_cdf_idempotent_2.toDF()


In [107]:
df2_cdf_idmptnt_1_fo = (
    df2_covid_cdf_idempotent_1
    .filter(
        (col("state").isin("Alabama", "Alaska")) 
        & (col("county") == "Jefferson")
        & (col("date").isin("2020-04-15", "2025-05-06"))
    )
    .orderBy(
        col("date"),
        col("state"),
        col("county"),
        col("_commit_version"),
        col("_commit_timestamp")
    )
)

df2_cdf_idmptnt_1_fo

date,county,state,fips,cases,deaths,_change_type,_commit_version,_commit_timestamp
2020-04-15,Jefferson,Alabama,1073,628,17,insert,6,2025-05-06 14:22:11.91
2020-04-15,Jefferson,Alabama,1073,628,17,update_preimage,7,2025-05-06 14:30:34.353
2020-04-15,Jefferson,Alabama,1073,628,17,update_preimage,7,2025-05-06 14:30:34.353
2025-05-06,Jefferson,Alabama,1073,628,17,update_postimage,7,2025-05-06 14:30:34.353
2025-05-06,Jefferson,Alabama,1073,628,17,update_postimage,7,2025-05-06 14:30:34.353


In [108]:
df2_cdf_idmptnt_2_fo = (
    df2_covid_cdf_idempotent_2
    .filter(
        (col("state").isin("Alabama", "Alaska")) 
        & (col("county") == "Jefferson")
        & (col("date").isin("2020-04-15", "2025-05-06"))
    )
    .orderBy(
        col("date"),
        col("state"),
        col("county"),
        col("_commit_version"),
        col("_commit_timestamp")
    )
)

df2_cdf_idmptnt_2_fo

date,county,state,fips,cases,deaths,_change_type,_commit_version,_commit_timestamp
2020-04-15,Jefferson,Alabama,1073,628,17,insert,6,2025-05-06 14:22:11.91
2020-04-15,Jefferson,Alabama,1073,628,17,update_preimage,7,2025-05-06 14:30:34.353
2020-04-15,Jefferson,Alabama,1073,628,17,update_preimage,7,2025-05-06 14:30:34.353
2025-05-06,Jefferson,Alabama,1073,628,17,update_postimage,7,2025-05-06 14:30:34.353
2025-05-06,Jefferson,Alabama,1073,628,17,update_postimage,7,2025-05-06 14:30:34.353


In [122]:
dt2_covid_cdf_idempotent_1.history()

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2025-05-06 14:30:35.18,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,0.0,Serializable,True,"{numFiles -> 3, numOutputRows -> 168, numOutputBytes -> 10247}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
0,2025-05-06 14:27:46.544,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,,Serializable,True,"{numFiles -> 3, numOutputRows -> 17, numOutputBytes -> 8095}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0


In [123]:
dt2_covid_cdf_idempotent_2.history()

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2025-05-06 14:30:35.579,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,0.0,Serializable,True,"{numFiles -> 3, numOutputRows -> 168, numOutputBytes -> 10247}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
0,2025-05-06 14:27:47.089,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,,Serializable,True,"{numFiles -> 3, numOutputRows -> 17, numOutputBytes -> 8095}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0


In [None]:
# from delta.tables import DeltaTable
from pyspark.sql.functions import input_file_name

# Read version 6 of the Delta table named 'tableX'
with_file_name_df = (df2_covid_cdf_idempotent_1
    .filter("county = 'Jefferson' AND state = 'Alabama'")
    .withColumn("input_file_name", input_file_name())
)

with_file_name_df

In [125]:
with_file_name_df.groupBy("county", "state", "date").count().orderBy("count", ascending=False).show()


+---------+-------+----------+-----+
|   county|  state|      date|count|
+---------+-------+----------+-----+
|Jefferson|Alabama|2020-04-15|    3|
|Jefferson|Alabama|2025-05-06|    2|
+---------+-------+----------+-----+



# Streaming Updates

### Test Scenario:

1. Create table, using `bronze.covid_nyt_by_date` as source and dropDuplicates  
    a. This table will be used as target for `Streaming Upsert` test  
2. Implement merge/upsert logic using MergeBuilder  
    a. Target table covid_table_2 created in previous test  
    b. Use `fips`, `date`, `county` and `state` as a composite join key  
    c. `whenMatchedDelete` with condition `source._change_type = DELETE`  
    d. `whenMatchedUpdate` by updating all columns in target using source  
    e. `whenMatchedInsert` by inserting all column in target using source  
3.  Create readStream using `readChangeFeed = true` and `startingVersion = 1`
    a. Use source source Delta Table from `Idempotent Fanout` / step 1 / as a source
4. Create writeStream using streamin query from `step 1`  
    a. Format Delta  
    b. `foreachBatch` using method from `step 2`  
    c. **`outputMode = update`**  
       i. TODO: Read a bit more for this outputMode, since it will be first time to use it   

In [None]:
# TODO: Implement the logic using example from chapter_7 notebook and following the pattern from othertests related to current setup

In [132]:
# Create table, using bronze.covid_nyt_by_date as source and dropDuplicates
# a. This table will be used as target for Streaming Upsert test

(
    spark
    .read
    .format("delta")
    .load("./spark-warehouse/bronze.db/covid_nyt_by_date")
    .sample(0.01)
    .drop_duplicates(["fips", "date", "county", "state"])
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("covid_nyt_streaming_updates_target")
)

In [135]:
"""
Implement merge/upsert logic using MergeBuilder
a. Target table covid_table_2 created in previous test
b. Use fips, date, county and state as a composite join key
c. whenMatchedDelete with condition source._change_type = DELETE
d. whenMatchedUpdate by updating all columns in target using source
e. whenMatchedInsert by inserting all column in target using source
"""

from delta.tables import *

def upsert_to_covid_delta_table(micro_batch_df, batch_id):
    delta_covid_nyt_streaming_updates_target = DeltaTable.forName(
        spark,
        "default.covid_nyt_streaming_updates_target"
    )

    delta_covid_nyt_streaming_updates_target.alias("target") \
    .merge(
        source=micro_batch_df.alias("source").drop_duplicates(["fips", "date", "county", "state"]),
        condition = \
            """
                source.fips=target.fips
                AND source.date = target.date 
                AND source.county=target.county 
                AND source.state=target.state
            """    
    ) \
    .whenMatchedDelete(condition="source._change_type == 'DELETE'") \
    .whenMatchedUpdate(
        set = {
            "fips": "source.fips",
            "date": "source.date",
            "cases": "source.cases",
            "deaths": "source.deaths"
        }
    ) \
    .whenNotMatchedInsert(
        values = {
            "fips": "source.fips",
            "date": "source.date",
            "cases": "source.cases",
            "deaths": "source.deaths"
        }
    ) \
    .execute()
    
    


In [11]:
"""
Create readStream using readChangeFeed = true and startingVersion = 1 
a. Use source source Delta Table from Idempotent Fanout / step 1 / as a source
Create writeStream using streamin query from step 1
a. Format Delta
b. foreachBatch using method from step 2
c. outputMode = update
"""

'\nCreate readStream using readChangeFeed = true and startingVersion = 1 \na. Use source source Delta Table from Idempotent Fanout / step 1 / as a source\nCreate writeStream using streamin query from step 1\na. Format Delta\nb. foreachBatch using method from step 2\nc. outputMode = update\n'

In [136]:
(
    spark
    .readStream
    .format("delta")
    .option("readChangeDataFeed", "true")
    .option("startingVersion", 1)
    .table("default.covid_cdf_source_1")
    .writeStream
    .option("checkpointLocation", "streaming_checkpoints/ch07/streaming_updates/ws1")
    .format("delta")
    .queryName("CDF - Streaming Pipeline")
    .foreachBatch(upsert_to_covid_delta_table)
    .outputMode("update")
    .start()
)

<pyspark.sql.streaming.query.StreamingQuery at 0x216d504bb10>

In [143]:
spark.sql(
    """
        SELECT *
        FROM default.covid_nyt_streaming_updates_target
        ORDER BY date desc
    """
)

date,county,state,fips,cases,deaths
2021-02-12,Perry,Alabama,1105.0,1039,26
2021-03-08,Dale,Alabama,1045.0,4688,107
2021-03-08,DeKalb,Alabama,1049.0,8510,175
2021-02-02,Conecuh,Alabama,1035.0,1017,23
2021-02-22,Jackson,Alabama,1071.0,6442,99
2021-02-28,Blount,Alabama,1009.0,6097,127
2021-03-05,Sumter,Alabama,1119.0,1004,32
2021-02-04,Morgan,Alabama,1103.0,13143,216
2021-03-04,Lee,Alabama,1081.0,15039,157
2021-02-11,Conecuh,Alabama,1035.0,1045,23


In [148]:
spark.sql(
    """
        DESCRIBE HISTORY default.covid_nyt_streaming_updates_target
    """
)

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2025-05-06 16:32:05.578,,,CREATE OR REPLACE TABLE AS SELECT,"{partitionBy -> [], clusterBy -> [], description -> NULL, isManaged -> true, properties -> {}}",,,,,Serializable,False,"{numFiles -> 1, numOutputRows -> 11197, numOutputBytes -> 135399}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0


In [144]:
spark.sql("select * from default.covid_cdf_source_1")

date,county,state,fips,cases,deaths
2025-05-06,Autauga,Alabama,1001,25,1
2025-05-06,Baldwin,Alabama,1003,98,2
2025-05-06,Barbour,Alabama,1005,13,0
2025-05-06,Bibb,Alabama,1007,19,0
2025-05-06,Blount,Alabama,1009,17,0
2025-05-06,Bullock,Alabama,1011,8,0
2025-05-06,Butler,Alabama,1013,11,0
2025-05-06,Calhoun,Alabama,1015,62,0
2025-05-06,Chambers,Alabama,1017,227,10
2025-05-06,Cherokee,Alabama,1019,10,0


In [146]:
spark.sql(
    """
        UPDATE default.covid_cdf_source_1
        SET date=CAST('2025-05-05' AS DATE)
        WHERE date = CAST('2025-05-06' AS DATE)
        AND UPPER(state) = "ALABAMA"
        AND UPPER(county) = "CLAY"
    """
)

num_affected_rows
1


# Delta Live Tables (DLT)

A syntactical example of DLT code base

```
import dlt

@dlt.table
def autoloader_dlt_bronze():
    return (
        spark
        .readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .load("<source data path>")
    )

@dlt.table
def delta_dlt_silver():
    return (
        spark
        .read_stream("autoloader_dlt_bronze")
        # ...
        # <transformation logic>
    )

@dlt.table
def live_delta_gold():
    return (
        dlt
        .read("delta_dlt_silver")
        # ...
        # <transformation logic>
    )

```

# Definig Change Data Feed (CDF) read boundaries in a batch process

# Test Scenarios:
**/ Use CDF table from Idempotent Fanout and Streaming Upsert tests /**
`default.covid_cdf_source_1`
1. CDF batch processing with start and end versions specified  
    a. Observe the data
2. CDF batch processing with start and end timestamps specified  
    a. See history of Delta table to see any timestmaps that can be used as boudaries  
    b. Observe the data
4. CDF batch processing with only starting version and timestamps  
    a. Observe the data
5. CDF batch processing with with starting timestamp and file location  
    a. Observe the data

# Conclusion:
1. CDF batch processing with start and end versions specified  
   - Changes were transfered along with CDF metadata  
   - insert/update_preimage/update_postimage  
2. CDF batch processing with start and end timestamps specified  
    - Approach 1 - start timestamp - 1 minute and end timestamp + 1 minute
        - No metadata provided, only latest data is included in result
    - Approach 2 - Both timestamp match Delta Log Commit Timestamps
        - No metadata provided, only latest data is included in result
        - Tried to force the CDF with new update. **DID NOT WORKED!**
3. CDF batch processing with only starting version and timestamps
    - Using `startingTimestamp`
        - All change included in results initilly.
        - *After update in name of DF python object the results broke and SQL errored with not existing `_commit_timestamp` column*
    - Using `startingVersion`
        - **DID NOT WORKED AT ALL! MISSING `_commit_timestamp` metadta CDF column** WTH?!?!?! 🤔
4. CDF batch processing with with starting timestamp and file location
    - Again missing `_commit_timestamp`. Error?!?!!? 🤔


### TODO: Investigate if same issue will appear again!!!!

In [12]:
# CDF batch processing with start and end versions specified
cdf_batch_start_end_versions = (
    spark
    .read
    .format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 1)
    .option("endingVersion", 10)
    .table("default.covid_cdf_source_1")
)

In [13]:
cdf_batch_start_end_versions.createOrReplaceTempView(
    "cdf_batch_start_end_versions"
)

In [15]:
spark.sql(
    """
        SELECT *
        FROM cdf_batch_start_end_versions
        ORDER BY _commit_timestamp, state, county, date 
    """
)

date,county,state,fips,cases,deaths,_change_type,_commit_version,_commit_timestamp
2020-04-15,Colbert,Alabama,1033,11,1,insert,2,2025-05-06 14:22:09.232
2020-04-15,Cullman,Alabama,1043,40,1,insert,2,2025-05-06 14:22:09.232
2020-04-15,Hale,Alabama,1065,20,0,insert,2,2025-05-06 14:22:09.232
2020-04-15,Lee,Alabama,1081,285,12,insert,2,2025-05-06 14:22:09.232
2020-04-15,Perry,Alabama,1105,7,0,insert,2,2025-05-06 14:22:09.232
2020-04-15,Pickens,Alabama,1107,30,0,insert,2,2025-05-06 14:22:09.232
2020-04-15,Pike,Alabama,1109,27,0,insert,2,2025-05-06 14:22:09.232
2020-04-15,Tallapoosa,Alabama,1123,149,7,insert,2,2025-05-06 14:22:09.232
2020-04-15,Coffee,Alabama,1031,43,0,insert,3,2025-05-06 14:22:09.981
2020-04-15,Coosa,Alabama,1037,20,1,insert,3,2025-05-06 14:22:09.981


In [29]:
spark.sql(
    """
        UPDATE default.covid_cdf_source_1
        SET deaths = deaths + 1
        WHERE county = 'Autauga' AND state = 'Alabama'
    """
)

num_affected_rows
1


In [38]:
# CDF batch processing with start and end timestamps specified
# But first need to find timestamp candidates

from delta import DeltaTable

dt_cdf_source_1 = DeltaTable.forPath(spark, "./spark-warehouse/covid_cdf_source_1")

dt_cdf_source_1.history()

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
9,2025-05-12 20:59:21.219,,,UPDATE,"{predicate -> [""((county#3692 = Autauga) AND (state#3693 = Alabama))""]}",,,,8.0,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 5516, numCopiedRows -> 74, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 1, executionTimeMs -> 1756, numDeletionVectorsUpdated -> 0, scanTimeMs -> 1088, numAddedFiles -> 1, numUpdatedRows -> 1, numAddedBytes -> 3369, rewriteTimeMs -> 666}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
8,2025-05-06 16:45:02.314,,,UPDATE,"{predicate -> [""(((date#55249 = 2025-05-06) AND (upper(state#55251) = ALABAMA)) AND (upper(county#55250) = CLAY))""]}",,,,7.0,Serializable,False,"{numRemovedFiles -> 1, numRemovedBytes -> 5484, numCopiedRows -> 74, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 1, executionTimeMs -> 631, numDeletionVectorsUpdated -> 0, scanTimeMs -> 365, numAddedFiles -> 1, numUpdatedRows -> 1, numAddedBytes -> 3368, rewriteTimeMs -> 266}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
7,2025-05-06 14:30:34.353,,,UPDATE,"{predicate -> [""((date#42791 = 2020-04-15) AND upper(state#42793) IN (ALABAMA,ALASKA))""]}",,,,6.0,Serializable,False,"{numRemovedFiles -> 6, numRemovedBytes -> 20182, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 3, executionTimeMs -> 2143, numDeletionVectorsUpdated -> 0, scanTimeMs -> 359, numAddedFiles -> 3, numUpdatedRows -> 84, numAddedBytes -> 7404, rewriteTimeMs -> 1784}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
6,2025-05-06 14:22:11.91,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,5.0,Serializable,True,"{numFiles -> 1, numOutputRows -> 4, numOutputBytes -> 1807}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
5,2025-05-06 14:22:11.306,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,4.0,Serializable,True,"{numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 1654}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
4,2025-05-06 14:22:10.66,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,3.0,Serializable,True,"{numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 1761}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
3,2025-05-06 14:22:09.981,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,2.0,Serializable,True,"{numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 1722}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
2,2025-05-06 14:22:09.232,,,WRITE,"{mode -> Append, partitionBy -> []}",,,,1.0,Serializable,True,"{numFiles -> 1, numOutputRows -> 8, numOutputBytes -> 1888}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0
1,2025-05-06 14:19:24.866,,,SET TBLPROPERTIES,"{properties -> {""delta.enableChangeDataFeed"":""true""}}",,,,0.0,Serializable,True,{},,Apache-Spark/3.5.4 Delta-Lake/3.2.0
0,2025-05-06 14:19:01.248,,,CREATE TABLE AS SELECT,"{partitionBy -> [], clusterBy -> [], description -> NULL, isManaged -> true, properties -> {}}",,,,,Serializable,True,"{numFiles -> 1, numOutputRows -> 67, numOutputBytes -> 2988}",,Apache-Spark/3.5.4 Delta-Lake/3.2.0


In [31]:
# Timestamp Predicates
# START - 2025-05-06 14:22:09.232 - 1 minute 
# END   - 2025-05-06 14:30:34.353 + 1 minute
# IT IS NOT WELL WORKING, OR I AM USING WRONG TIMESTAMPS?
cdf_batch_start_end_timestamps = (
    spark
    .read
    .format("delta")
    .option("readChangeDataFeed", "true")
    # .option("startingTimestamp", "2025-05-06 14:21:09.232")
    # .option("endingTimestamp", "2025-05-06 16:31:34.353") - No metadata included, timestamp is not exact Delta event from transaction log
    .option("startingTimestamp", "2025-05-06 14:19:24.866") # CDF Enabled -> ts from v1/ There are updated in this timespan
    .option("endingTimestamp", "2025-05-12 20:59:21.219	") # v9 as upper boundary
    .table("default.covid_cdf_source_1")
)

cdf_batch_start_end_timestamps.createOrReplaceTempView("cdf_batch_start_end_timestamps")

In [32]:
spark.sql(
    """
        SELECT *
        FROM cdf_batch_start_end_timestamps
        ORDER BY state, county, date 
    """
)

date,county,state,fips,cases,deaths
2025-05-06,Autauga,Alabama,1001,25,2
2025-05-06,Baldwin,Alabama,1003,98,2
2025-05-06,Barbour,Alabama,1005,13,0
2025-05-06,Bibb,Alabama,1007,19,0
2025-05-06,Bibb,Alabama,1007,19,0
2025-05-06,Blount,Alabama,1009,17,0
2025-05-06,Bullock,Alabama,1011,8,0
2025-05-06,Butler,Alabama,1013,11,0
2025-05-06,Calhoun,Alabama,1015,62,0
2025-05-06,Chambers,Alabama,1017,227,10


In [10]:
spark.sql(
    """
        SELECT *
        FROM default.covid_cdf_source_1 VERSION AS OF 8
        WHERE county = 'Autauga' AND state = 'Alabama' 
    """
)

date,county,state,fips,cases,deaths
2025-05-06,Autauga,Alabama,1001,25,1


In [None]:
# CDF batch processing with only starting version and timestamps

# 1. starting timestamp only

cdf_batch_start_time_only = (
    spark
    .read
    .format("delta")
    .option("readChangeDataFeed", "true")
    .option("startingTimestamp", "2025-05-06 14:22:09.232")
    .table("default.covid_cdf_source_1")
)

cdf_batch_start_version_only.createOrReplaceTempView("cdf_batch_start_time_only")

In [None]:
spark.sql(
    """
        SELECT *
        FROM cdf_batch_start_time_only
        ORDER BY _commit_timestamp desc, state, county, date 
    """
)

In [80]:
# 1. starting version only

cdf_batch_start_version_only = (
    spark
    .read
    .format("delta")
    .option("readChangeDataFeed", "true")
    .option("startingVersion", 7)
    .table("default.covid_cdf_source_1")
)

cdf_batch_start_version_only.createOrReplaceTempView("cdf_batch_start_version_only")

In [83]:
spark.sql(
    """
        SELECT *
        FROM cdf_batch_start_version_only
        ORDER BY date DESC
        --ORDER BY _commit_timestamp desc, state, county, date 
    """
)

date,county,state,fips,cases,deaths
2025-05-06,Bibb,Alabama,1007,19,0
2025-05-06,Coffee,Alabama,1031,43,0
2025-05-06,Autauga,Alabama,1001,25,2
2025-05-06,Coosa,Alabama,1037,20,1
2025-05-06,Crenshaw,Alabama,1041,4,0
2025-05-06,Wilcox,Alabama,1131,38,0
2025-05-06,Baldwin,Alabama,1003,98,2
2025-05-06,Jefferson,Alabama,1073,628,17
2025-05-06,Barbour,Alabama,1005,13,0
2025-05-06,St. Clair,Alabama,1115,49,0


In [17]:
# CDF batch processing with with starting timestamp and file location

cdf_batch_start_version_only = (
    spark
    .read
    .format("delta")
    .option("readChangeDataFeed", "true")
    .option("startingVersion", 0)
    .load("./spark-warehouse/covid_cdf_source_1")
    .selectExpr("*")#, "_change_type", "_commit_version", "_commit_timestamp")
)

cdf_batch_start_version_only.createOrReplaceTempView("cdf_batch_start_version_with_path")

In [18]:
spark.sql(
    """
        SELECT * 
        FROM cdf_batch_start_version_with_path
        ORDER BY state, county, date 
    """
)

date,county,state,fips,cases,deaths
2025-05-06,Autauga,Alabama,1001,25,2
2025-05-06,Baldwin,Alabama,1003,98,2
2025-05-06,Barbour,Alabama,1005,13,0
2025-05-06,Bibb,Alabama,1007,19,0
2025-05-06,Bibb,Alabama,1007,19,0
2025-05-06,Blount,Alabama,1009,17,0
2025-05-06,Bullock,Alabama,1011,8,0
2025-05-06,Butler,Alabama,1013,11,0
2025-05-06,Calhoun,Alabama,1015,62,0
2025-05-06,Chambers,Alabama,1017,227,10


In [16]:
spark.sql("DESCRIBE DETAIL default.covid_cdf_source_1").select("properties").show(truncate=False)


+------------------------------------+
|properties                          |
+------------------------------------+
|{delta.enableChangeDataFeed -> true}|
+------------------------------------+



In [None]:
temp_views = spark.catalog.listTables()
for view in temp_views:
    if view.isTemporary:
        spark.catalog.dropTempView(view.name)


# Clean up Streams

In [70]:
for stream in spark.streams.active:
    print(stream)
    stream.stop()

<pyspark.sql.streaming.query.StreamingQuery object at 0x00000216D44EA490>
