# Merge the CDC Files to Bronze

Note that in Databricks, we could use the Auto Loader. In this case, our **readStream** would have additional options:

```python
 .format("cloudFiles")
 .option("cloudFiles.format", "parquet")
 .option("cloudFiles.useNotifications", "true") # Use for SQS/SNS
 .option("cloudFiles.region", "eu-west-1")      # Use for SQS/SNS
```

My thesis includes examples of this in Finnish. This script uses file listing method.

In [1]:
import glob
import os
import pprint
import pyspark.sql.functions as F
from helpers.paths import PathMerger
from pyspark.sql import SparkSession
from IPython.display import HTML

In [2]:
spark = (SparkSession.builder
         .appName("MergeCDCtoBronze")
         .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0")
         .config('spark.sql.extensions', "io.delta.sql.DeltaSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
         .config("spark.sql.session.timeZone", "UTC")
         .getOrCreate())


# This cannot be imported before initializing the SparkSession.
from delta import DeltaTable

## Imagine orchestator here

If this was a worker Notebook in Databricks (or a worker Python script orchestrated by Airflow) the parameters below would be fed while executing this script.

In [4]:
# Params
db, table = "devices", "device_models"
all_pks = ["id"]

# Init
pm = PathMerger(db, table)

In [5]:
print("[INFO] The following Parquet files exist in this staging path: ")

for f in glob.glob(pm.staging + os.sep + "**/*.parquet", recursive=True):
    print(f)

[INFO] The following Parquet files exist in this staging path: 
S3\staging\dms\abc\devices\device_models\LOAD00000001.parquet
S3\staging\dms\abc\devices\device_models\2021\9\11\20210911_113913.parquet


## Load

Based on my testing, the pathGlobFilter applies to the filename, not to the whole path. 

Thus, a glob filter such as...
* `**/*.parquet` returns no files
* `[L]*.parquet` returns all files starting with an `L` letter and ending to `.parquet`.
* `[!L]*.parquet` returns all files NOT starting with an `L` letter.

## Define Functions

In [6]:
def with_ordering_cols(input_df, batch_id):
    output_df = ( 
        input_df
        .withColumn("op_numeral", F.when(F.col("Op") == "I", 1)
                                     .when(F.col("Op") == "U", 2)
                                     .when(F.col("Op") == "D", 3).cast("int"))
        .withColumn('dms_temp', F.to_timestamp(F.col("dms_timestamp")))
        #.withColumn("par", F.col("*all_pks[0]) % n_pars)
        #.withColumn("src_file", F.input_file_name())
        .withColumn("src_batch_id", F.lit(batch_id).cast("int"))
    )
    return output_df 


def log_compact(input_df, cols_to_drop=["aaa", "bbb"]):
    output_df = (
        input_df
            .selectExpr(*all_pks, "struct(dms_temp as aaa, op_numeral as bbb, *) as others")
            .groupBy(*all_pks)
            .agg(F.max("others").alias("latest"))
            .select("latest.*")
            .drop(*cols_to_drop)
        
    )
    return output_df


def merge_to_delta(batch_df, batch_id):
    
    # Add op_numeral and dms_temp
    batch_df = with_ordering_cols(batch_df, batch_id)
    
    # Compact change log to one item per id
    latest_uniques = log_compact(batch_df)
    
    # Load Delta Table
    target = DeltaTable.forName(spark, pm.hive)
    
    # Using target schema, format to: { "id": "s.id" }
    col_map = {x.name: f"s.{x.name}" for x in target.toDF().schema}
    
    # Format the list of primary keys 
    # into SQL join condition like "t.id = s.id AND t.foo = s.foo"
    join_cond = " AND ".join([f"t.{pk} = s.{pk}" for pk in all_pks])
    
    (
      target.alias("t")
      .merge(
          source = latest_uniques.alias("s"), 
          condition = f"{join_cond}"
      )
      .whenMatchedDelete(condition = "s.Op = 'D'")
      .whenMatchedUpdate(condition = "s.Op = 'U'", set = col_map)
      .whenNotMatchedInsert(condition = "s.Op != 'D'", values = col_map)
      .execute()
    )    

## Register to Hive

The Hive in single-node test environment is not persistent, so we will need to create a new database and a new (EXTERNAL) table each time we restart our Python kernel and create a new SparkSession.

In production, this would not be needed.

In [7]:
# Create BRONZE
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")

# Even on Windows, Spark SQL requires a POSIX path with /-symbol as path separator.
abs_path = os.path.abspath(pm.bronze).replace("\\", "/")

# Register to Hive
spark.sql(f"""
CREATE TABLE {pm.hive}
  USING DELTA
  LOCATION '{abs_path}'
""")

DataFrame[]

In [9]:
spark.sql(f"DESCRIBE HISTORY {pm.hive}").toPandas()

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
0,0,2021-09-12 06:26:03.746,,,CREATE OR REPLACE TABLE AS SELECT,"{'description': None, 'partitionBy': '[]', 'pr...",,,,,,False,"{'numOutputRows': '4', 'numOutputBytes': '2569...",


## RESTORE to VERSION 0 ?

If you need to start from beginning, you can either run this cell with `restore` being True. This cell will:
* Overwrite the VERSION 0 on top of the current version

Note that this does not empty the checkpoint_path that is creater below. If you need to reset the checkpoint, delete all files in that directory manually or point it to some other directory.

In [10]:
# Change me
restore = False


if restore:
    # Get version 0    
    df_full_load = spark.read.format("delta").option("versionAsOf", 0).load(pm.bronze)

    # Load
    (
        df_full_load
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema', 'true')
        .option('path', os.path.abspath(pm.bronze))
        .saveAsTable(pm.hive)
    )
else:
    print("[INFO] Cell skipped due to boolean value.")

[INFO] Cell skipped due to boolean value.


## Stream

In [11]:
    
# Schema is forced to match the Bronze, but with an extra field. DMS does not add Op to full load.
readers_schema = spark.read.format("delta").load(pm.bronze).schema.add("Op", "string")

# Checkpoints will be written to...
checkpoint_path = os.path.join('S3', 'bronze', '_checkpoints', 'abc', db, table)


# Prepare Spark Auto Loader
df = ( spark.readStream
        .format("parquet")
        .option("recursiveFileLookup", "true")
        .option("pathGlobFilter", "[!L][!O][!A][!D]*.parquet")
        .schema(readers_schema)
        .load(pm.staging)
  )


# Stream
streamingquery = ( 
    df
    .writeStream
    .trigger(once=True)
    .foreachBatch(merge_to_delta)
    .option("checkpointLocation", os.path.abspath(checkpoint_path))
    .start()
)
 
streamingquery.awaitTermination()

In [12]:
# Init
pp = pprint.PrettyPrinter()

# Print what the query performed
pp.pprint(streamingquery.lastProgress)

# Use for forging the compacted DataFrame later on
bid = streamingquery.lastProgress['batchId']

{'batchId': 0,
 'durationMs': {'addBatch': 5975,
                'getBatch': 37,
                'latestOffset': 159,
                'queryPlanning': 28,
                'triggerExecution': 6531,
                'walCommit': 148},
 'id': '3bf251f8-8ed4-4328-b4f0-d679cf4b28be',
 'inputRowsPerSecond': 0.0,
 'name': None,
 'numInputRows': 12,
 'processedRowsPerSecond': 1.837109614206981,
 'runId': 'fa2910c0-a334-469c-b717-fe438629588b',
 'sink': {'description': 'ForeachBatchSink', 'numOutputRows': -1},
 'sources': [{'description': 'FileStreamSource[file:/C:/Users/soura/PycharmProjects/opinnaytetyo/S3/staging/dms/abc/devices/device_models]',
              'endOffset': {'logOffset': 0},
              'inputRowsPerSecond': 0.0,
              'numInputRows': 12,
              'processedRowsPerSecond': 1.837109614206981,
              'startOffset': None}],
 'stateOperators': [],
 'timestamp': '2021-09-12T06:29:08.028Z'}


# Examine before-after compaction

**Note:** The HTML class from IPython.core.display is being used to display the Pandas Dataframe without Pandas' index. This index would be an extra row numbering that does not exist in the original dataset, and might cause confusion with the `id` field.

### Bronze before Merge

In [13]:
# Show the VERSION 0 - The original FULL LOAD.
df_orig = spark.table(pm.hive).toPandas()

display(HTML(df_orig.to_html(index=False)))

dms_timestamp,id,release_date,name,color,description,created,modified,src_batch_id
2021-09-11 11:39:29,2,2010-05-15,Super Gadget 100,Black,update id 2,2010-03-21 12:00:02,2021-09-11 11:39:29,0.0
2021-09-11 11:37:17,6,2021-12-31,Super Gadget 300,Pink,new device,2021-09-11 11:37:17,2021-09-11 11:37:17,0.0
2021-09-11 11:37:47,1,2010-05-15,Super Gadget 100,Red,update id 1,2010-03-21 12:00:01,2021-09-11 11:37:47,0.0
2021-09-11 11:30:04,4,2018-05-13,Super Gadget 200,White,lorem ipsum,2018-03-20 12:01:01,2018-03-20 12:01:01,
2021-09-11 11:30:04,3,2010-11-01,Super Gadget 100,Pink,lorem ipsum,2010-08-05 07:00:00,2010-08-05 07:00:00,


### CDC Before Log Compaction

In [37]:
# Load the original files from staging.
df_cdc = (
    spark.read.option("recursiveFileLookup", "true")
    .option("pathGlobFilter", "[!L][!O][!A][!D]*.parquet")
    .load(pm.staging)
).orderBy("dms_timestamp")

#Show
display(HTML(df_cdc.toPandas().to_html(index=False)))

Op,dms_timestamp,id,release_date,name,color,description,created,modified
I,2021-09-11 11:37:17,5,2021-12-31,Super Gadget 300,Black,new device,2021-09-11 11:37:17,2021-09-11 11:37:17
I,2021-09-11 11:37:17,6,2021-12-31,Super Gadget 300,Pink,new device,2021-09-11 11:37:17,2021-09-11 11:37:17
U,2021-09-11 11:37:47,1,2010-05-15,Super Gadget 100,Red,update id 1,2010-03-21 12:00:01,2021-09-11 11:37:47
U,2021-09-11 11:37:47,2,2010-05-15,Super Gadget 100,Black,upddddddate id 2,2010-03-21 12:00:02,2021-09-11 11:37:47
U,2021-09-11 11:39:29,2,2010-05-15,Super Gadget 100,Black,update id 2,2010-03-21 12:00:02,2021-09-11 11:39:29
D,2021-09-11 11:43:31,5,2021-12-31,Super Gadget 300,Black,new device,2021-09-11 11:37:17,2021-09-11 11:43:31


### CDC After Log Compaction

In [34]:
# Add ordering columns and perform compaction
df_latest_uniques = log_compact(with_ordering_cols(df_cdc, bid)).orderBy("id")

# Show
display(HTML(df_latest_uniques.toPandas().to_html(index=False)))

Op,dms_timestamp,id,release_date,name,color,description,created,modified,op_numeral,dms_temp,src_batch_id
U,2021-09-11 11:37:47,1,2010-05-15,Super Gadget 100,Red,update id 1,2010-03-21 12:00:01,2021-09-11 11:37:47,2,2021-09-11 11:37:47,0
U,2021-09-11 11:39:29,2,2010-05-15,Super Gadget 100,Black,update id 2,2010-03-21 12:00:02,2021-09-11 11:39:29,2,2021-09-11 11:39:29,0
D,2021-09-11 11:43:31,5,2021-12-31,Super Gadget 300,Black,new device,2021-09-11 11:37:17,2021-09-11 11:43:31,3,2021-09-11 11:43:31,0
I,2021-09-11 11:37:17,6,2021-12-31,Super Gadget 300,Pink,new device,2021-09-11 11:37:17,2021-09-11 11:37:17,1,2021-09-11 11:37:17,0


### Final Result

In [32]:
display(HTML(spark.sql(f"SELECT * FROM {pm.hive} ORDER BY id").toPandas().to_html(index=False)))

dms_timestamp,id,release_date,name,color,description,created,modified,src_batch_id
2021-09-11 11:37:47,1,2010-05-15,Super Gadget 100,Red,update id 1,2010-03-21 12:00:01,2021-09-11 11:37:47,0.0
2021-09-11 11:39:29,2,2010-05-15,Super Gadget 100,Black,update id 2,2010-03-21 12:00:02,2021-09-11 11:39:29,0.0
2021-09-11 11:30:04,3,2010-11-01,Super Gadget 100,Pink,lorem ipsum,2010-08-05 07:00:00,2010-08-05 07:00:00,
2021-09-11 11:30:04,4,2018-05-13,Super Gadget 200,White,lorem ipsum,2018-03-20 12:01:01,2018-03-20 12:01:01,
2021-09-11 11:37:17,6,2021-12-31,Super Gadget 300,Pink,new device,2021-09-11 11:37:17,2021-09-11 11:37:17,0.0


## Delta History

In [23]:
dt = DeltaTable.forName(spark, pm.hive)

dt_hist = dt.history().toPandas()

In [41]:
cols_to_display = ['version', 'timestamp', 'operation', 'operationParameters', 'operationMetrics']

dt_hist[cols_to_display]


# Show
display(HTML(dt_hist[cols_to_display].sort_values('version').to_html(index=False)))

version,timestamp,operation,operationParameters,operationMetrics
0,2021-09-12 06:26:03.746,CREATE OR REPLACE TABLE AS SELECT,"{'description': None, 'partitionBy': '[]', 'properties': '{}', 'isManaged': 'false'}","{'numOutputRows': '4', 'numOutputBytes': '2569', 'numFiles': '1'}"
1,2021-09-12 06:29:13.334,MERGE,"{'matchedPredicates': '[{""predicate"":""(s.`Op` = 'D')"",""actionType"":""delete""},{""predicate"":""(s.`Op` = 'U')"",""actionType"":""update""}]', 'predicate': '(t.`id` = s.`id`)', 'notMatchedPredicates': '[{""predicate"":""(NOT (s.`Op` = 'D'))"",""actionType"":""insert""}]'}","{'numOutputRows': '5', 'numTargetRowsInserted': '1', 'numTargetRowsUpdated': '2', 'numTargetFilesAdded': '6', 'numTargetFilesRemoved': '1', 'numTargetRowsDeleted': '0', 'scanTimeMs': '2358', 'numSourceRows': '4', 'executionTimeMs': '4477', 'numTargetRowsCopied': '2', 'rewriteTimeMs': '2114'}"
