## Incremental Process Example 


This notebook does the following:
1. Downloads sample retail data and saves as csv files in one lake. 
1. Loops and loads each of the files as a table
    - **Note**: that we load data serially for demo purposes. 
1. Demonstrates three different types of incremental processing in Fabric. 
    - Incremental Batch Ingest
    - In-Flight Transformations
    - Continuous Streaming


Tables in the Lakehouse are [Delta Lake](https://delta.io), an open table format. Delta Lake supports incremental and stream processing out of the box. 

![](https://invisodemo.blob.core.windows.net/public/delta_reliability.png)



##### Import libraries

In [None]:
from pyspark.sql.functions import *

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 48, Finished, Available, Finished)

In [None]:
schema_name = 'dbo'
checkpoint_folder = 'Files/silver_checkpoints'

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 49, Finished, Available, Finished)

In [None]:
files_path = 'Files/dunnhumby/'

files = [f.name for f in mssparkutils.fs.ls(files_path)]
files

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 14, Finished, Available, Finished)

['campaign_desc.csv',
 'campaign_table.csv',
 'causal_data.csv',
 'coupon.csv',
 'coupon_redempt.csv',
 'hh_demographic.csv',
 'product.csv',
 'transaction_data.csv']

#### Load Data

In [None]:
for f in files:
    # read the csv file
    df = (
        spark
            .read
            .option('header', 'true')
            .option('inferSchema', 'true') # or you can provide a schema
            .csv(f'{files_path}/{f}')
        )

    # save as a table and use the filename
    df.write.saveAsTable(f"{f.replace('.csv','')}")

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 8, Finished, Available, Finished)

In [None]:
display(spark.sql(f"DESCRIBE HISTORY {schema_name}.product"))

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d7c6e319-0eba-4f0c-a662-6fc7a4091d5c)

In [None]:
# re-read the product csv and append to the product table.
# all transactions are tracked  
product_df = (
    spark
    .read
    .table('dbo.product')
    .limit(1000)
)

product_df.write.mode('append').saveAsTable(f'{schema_name}.product')

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 10, Finished, Available, Finished)

In [None]:
display(spark.sql(f"DESCRIBE HISTORY {schema_name}.product"))

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c29f2c26-4998-432a-83f2-75551d9e24bb)

In [None]:

# read the table with the streaming apis
# the .trigger(availableNow()) makes it so that it shutsdown after one batch of data. 
# we will need to make sure our settings for records processed or data size are appropriate

product_stream_df = (
    spark
    .readStream
    .table(f'{schema_name}.product')
)

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 36, Finished, Available, Finished)

#### Spark Streaming Writes
1. Available now Trigger
1. Foreach Batch
1. Continuous


Summary of `outputMode` options:
1. `append`: Only new rows since the last trigger are written (no updates or deletions).
1. `update`: Only rows that changed are written since the last trigger. Requires aggregations or stateful operations.
1. `complete`: All rows in the result are written every time. Useful for full-table aggregates.


##### AvailableNow

In [None]:
table_name = 'product'
(product_stream_df
    .writeStream
    .format("delta")
    .option("checkpointLocation", f'{checkpoint_folder}/ckpt_{table_name}')
    .outputMode("append")
    .trigger(availableNow=True)
    .toTable(f'{schema_name}.{table_name}')
    )

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 37, Finished, Available, Finished)

<pyspark.sql.streaming.query.StreamingQuery at 0x7124046f0b50>

In [None]:
display(spark.sql(f"select * from {schema_name}.{table_name}"))

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 38, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 182f0b5b-fa88-403c-ba90-aaf8991378b7)

##### Foreach Batch

In [None]:
# you can define complex transforms on incremental and streaming data. 

def transform_stream(microBatchDF, batchId):
    # GROCERY
    (microBatchDF.filter(col("department") == "GROCERY")
     .write
     .format("delta")
     .mode("append")
     .saveAsTable(f"{schema_name}.zz_groceries") )
    
    # Everything Else
    (microBatchDF.filter(col("department") != "GROCERY")
     .write
     .format("delta")
     .mode("append")
     .saveAsTable(f"{schema_name}.zz_non_groceries") )

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 51, Finished, Available, Finished)

In [None]:
s1 = (product_stream_df
    .writeStream
    .format("delta")
    .option("checkpointLocation", f'{checkpoint_folder}/ckpt_product_grocery_split')
    .trigger(availableNow=True)
    .foreachBatch(transform_stream)
    .start()
    )

s1.awaitTermination()

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 52, Finished, Available, Finished)

In [None]:
display(spark.sql(f'select * from {schema_name}.zz_groceries'))

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 53, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, db824994-ce2d-4a2f-9b01-13b6caa314ac)

In [None]:
table_name = 'zz_product_2'
(product_stream_df
    .writeStream
    .format("delta")
    .option("checkpointLocation", f'{checkpoint_folder}/ckpt_{table_name}_streaming')
    .outputMode("append")
    .toTable(f'{schema_name}.{table_name}')
    )

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 55, Finished, Available, Finished)

<pyspark.sql.streaming.query.StreamingQuery at 0x7124016d4f50>

In [None]:
# notice that its non-blocking
1+1

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 56, Finished, Available, Finished)

2

In [None]:
all_streams = spark.streams.active
all_streams

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 58, Finished, Available, Finished)

[<pyspark.sql.streaming.query.StreamingQuery at 0x71240142d6d0>]

In [None]:
# just to be double safe and a stream stays active
for s in all_streams:
  s.stop()

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 59, Finished, Available, Finished)

In [None]:
spark.streams.active

StatementMeta(, 25ad2faa-b340-4bd7-8174-2444440b9f8e, 60, Finished, Available, Finished)

[]