In [None]:
# run first. then have fun.
from pyspark.sql.functions import col, current_timestamp, to_date, datediff
# stats and agg functions
from pyspark.sql.functions import count, session_window, window, sum, min, max, percentile_approx

from delta.tables import DeltaTable

# keep the default compression codec as zstd
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")

# common dirs, paths
dataset_dir = '/opt/spark/work-dir/hitchhikers_guide/datasets/ecomm_behavior_data'
delta_path = f"{dataset_dir}/delta"

# table information
dl_table_name = "ecomm_by_day"
dl_managed_table = f"default.{dl_table_name}"

# Intro to Delta Lake Streaming
The following section will reuse the **Delta Lake** `default.ecomm_by_day` table created during [Streaming First Steps](./streaming-first-steps.ipynb).

> note: run the following cell to check if you have the local table. You should see `[Table(name='ecomm_by_day', database='default', description=None, tableType='MANAGED', isTemporary=False)]` somewhere in the list (if you have more than one from the work in the Guide)

In [None]:
spark.catalog.setCurrentDatabase("default")
spark.catalog.listTables()

> Note: If you see `java.sql.SQLException: Failed to start database 'metastore_db' with class loader jdk.internal.loader.ClassLoaders$AppClassLoader...` then you need to detach the `kernel` from the other notebook you have open. You can only have one notebook running with the local Metastore.

## Successful Streaming Begins with Metadata (lots and lots of metadata)
> In other words, if you don't understand how the table is laid out, what the structure of the table is (columns, types, is the table narrow or wide? do you know what any of the columns actually are?

Remember, when in lost or in doubt, always consult the data (metadata). To Peek at the Table Metadata with `detail()`
* - Use `DeltaTable.forName(spark, 'catalog.schema.table|schema.table|table').detail()` 
* - or `DeltaTable.forPath(spark, '/path/to/table/).detail()` for Unmanaged tables.

In [None]:
## Starting Small (Baby Steps)
dt_ecomm = DeltaTable.forName(spark, dl_managed_table)
table_details = dt_ecomm.detail()

# go on, take a peek (no one's looking)
table_details.printSchema()

### Table Details. Providing you with all the ... well details
Scanning the StructType of the `detail()` dataframe gives you a lot of data. The following use cases can be solved with the metadata:

```
root
 |-- format: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)
 |-- createdAt: timestamp (nullable = true)
 |-- lastModified: timestamp (nullable = true)
 |-- partitionColumns: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- numFiles: long (nullable = true)
 |-- sizeInBytes: long (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- minReaderVersion: integer (nullable = true)
 |-- minWriterVersion: integer (nullable = true)
 |-- tableFeatures: array (nullable = true)
 |    |-- element: string (containsNull = true)
```

1. **Calculate Table Freshness**: `abs(current_time()-{table.lastModified})`: To answer the universal question of - "How Fresh Is It?".
2. **How Fast is the Table Growing?**: Size does matter. If we have two tables, tableA is 100gb and has `createdAt` of one year ago, and tableB is 100gb and was created yesterday, then we've got a scalability monster. Using the `freshness` technique, you can calculate the `days` a table has `existed`, and calculate the `avg` bytes per day using `sizeInBytes`.
3. **What is the Table Telling Us?**: Using the `properties` map, we can easily view ALL Table Properties, including those used to `automate` Delta Lake like `delta.logRetentionDuration` or those *we bring to the table* - pun truly intended. Like `catalog.team_name`

In [None]:
# Feel Free to Mess with the following cell to get used to the data available to you about the ecomm_by_day table.

tbl_dets = (
    table_details
    .withColumn("now", current_timestamp())
    .withColumn("todays_date", to_date(col("now")))
    .withColumn("age_in_days", datediff(col("todays_date"),to_date("createdAt")))
    .withColumn("stale_days", datediff(col("todays_date"),to_date("lastModified")))
)
# view all the time-based info on the table.
tbl_dets.select("now", "todays_date", "createdAt", "lastModified", "age_in_days", "stale_days").show(truncate=False)

# fetch the dataframe as a local Row
dets = tbl_dets.first()
# see it's a Row...<class 'pyspark.sql.types.Row'>
#print(type(dets))
team_name = dets['properties']['catalog.team_name']
team_slack = dets['properties']['catalog.engineering.comms.slack']

# stick to the details
print(f"Don't Panic!\nThe table {dets.name} is owned by {team_name}.\nWe can always contact them via slack @ {team_slack}")

# or remember not to panic, everything is under control
#print(f"""
#I am no longer panicking.\n 
#Why you ask?\n
#I know that I can count on {team_name} to deliver gold data, otherwise...\n
#to slack ({team_slack}) we ride questions in hand about the TABLE {dets.name}.\n
#Which happened to be created on {dets.createdAt} and last updated at {dets.lastModified}...
#""")

In [None]:
bytesToMB = 1000000
(tbl_dets
 .select(
     col("numFiles"),
     (col("sizeInBytes")/bytesToMB).alias("TableSizeInMegaBytes"),
     ((col("sizeInBytes")/bytesToMB)/col("numFiles")).alias("avgMBPerFile")
 ).show()
)

## What We've Learned about the Dataset
> Note: The following information is based on the 'complete' ecomm dataset. The full 15gb csv. 807mb is the size on disk after zstd compression and Delta encoding. 

1. The naive average megabytes per file is around `11.2mb`. If you run `ls -lh` across any given day, you'll see more of an odd split between say 3mb and 18mb due to non optimized, non-bin backed table data on disk.
2. There are `72` files taking up a `~807mb` for the `entire` table.
3. There are probably many more `rows` of data in the table, so if we wanted to get a 'quick' count, then that would be a good idea too. That can give us more `approximate` math to work with (rows/day) - even if we are off - we are better informed with approximate math than wild guesses and hopes and dreams.


In [None]:
# convert the DeltaTable reference to a DataFrame

dt_as_df = dt_ecomm.toDF()
total_rows = dt_as_df.count()

rows_per_day = total_rows/dets['numFiles']
avg_row_size = total_rows/dets['sizeInBytes']

print(f"""
The Table has {total_rows} rows.\n
(Maybe) Daily Rows of {rows_per_day}\n
(Maybe) Average Row Size {avg_row_size} in Bytes\n
(Maybe) Average Rows per Delta Lake File {total_rows/dets['numFiles']}
""")

# Our First Delta Lake Streaming Operation
> Clap your Hands! Or Celebrate However you want. It's time to be Streaming

Because we have potentially a gigantic amount of data - (Or depending on the adventure you chose a smaller set of 60, yes it should have been 42, but time...) - regardless, it is time to create our first streaming application.

## What We'll Need
1. A Place to Store our Application Metadata. Luckily we have our Local File Sytem, so we can just store the application data there for now. (See [common application directory](../../applications/README.md) to understand a little more.
2. A [Way of Restricting the Volume of Data We Read](https://docs.delta.io/latest/delta-streaming.html#limit-input-rate)
3. A [Means of Ignoring Things](https://docs.databricks.com/structured-streaming/delta-lake.html#ignore-updates-and-deletes) we don't currently care about.
3. A Way of Limiting the Frequency in which our Application Runs (just like we want to limit the volume of data, when we start learning how to work with Streaming Data, it is better to slowly increase the rate which we will learn how to do.)

In [None]:
#spark.sql("drop table default.ecomm_aggs_table")

In [None]:
# read from the `default.ecomm_by_day` table, modify the read options to limit the maxFilesPerTrigger
# read up to 4 files, do a simple projection (select colA, colB)
# write out to a new Delta Lake table. 
# Checkpoint the progress so we can `pick up where we left off`

app_name = "dl_streaming_aggs"
app_version = "v0.0.1"
checkpoint_dir = "../../applications"
checkpoint_path = f"{checkpoint_dir}/{app_name}/{app_version}/_checkpoints"
#print(f"checkpoint_path={checkpoint_path}")
ecomm_aggs_table = 'default.ecomm_aggs_table'

spark.conf.set("spark.sql.shuffle.partitions", "32")
# create the streaming Delta source object
ecomm_by_day_limited = (
    spark.readStream
    .format("delta")
    .option("maxFilesPerTrigger", 1)
    .option("ignoreChanges", True)
    .table(dl_managed_table)
)

# view the schema for the table (since we know everything else about it now too)
ecomm_by_day_limited.printSchema()

# next select the columns we care about (feel free to switch things up here too)
ecomm_aggs = (
    ecomm_by_day_limited
    .withWatermark("event_time", '10 minutes')
    .select("event_time", "event_type", "product_id", "user_session", "user_id", "event_date")
    .groupBy(window("event_time", "30 minutes"), "user_id", "product_id", "event_date")
    .agg(count("event_type").alias('session_events'))
)

# next create the streaming sink

streamingQuery = (
    ecomm_aggs.writeStream
    .format("delta")
    .option("checkpointLocation", checkpoint_path)
    .outputMode("append")
    .partitionBy("event_date")
    .option("overwriteSchema", True)
    # triggers allow us to control the frequency in which a job will run. 
    # For the java nerds (me included) triggers run like scheduledThreadPools when using `processingTime` 
    # and once, will fire once and then the job will complete.
    .trigger(processingTime='30 seconds')
    .toTable(ecomm_aggs_table)
)

## Controlling the StreamingQuery
1. We returned a `streamingQuery` object when we executed the last cell before. The Streaming Query object provides you with a gateway into the realtime metrics and behavior of your Delta-Spark based application performance.

2. Given the application is `triggering` every `30s` that means twice a minute we'll have more data, as the job slowly chews through the 72 files of the data set, pulling in 600k files per tick.

Take a look at the metadata provided to you by the `streamingQuery`. Think about how impressive the numbers are.

In [None]:
streamingQuery.lastProgress

^^ The prior output from the StreamingQueryListener is an aggregation of the collected runtime metadata, and statistical
behavior captured during the last microBatch. You'll notice that we started on index 16, and endingOffset was 17.

# Viewing the Delta Lake Information in the Streaming Query Stats
```
'startOffset': {
  'sourceVersion': 1,
  'reservoirId': '027b3701-5c07-46d4-9d96-e5539f81e8bf',
  'reservoirVersion': 33,
  'index': 16,
  'isStartingVersion': True},
'endOffset': {
  'sourceVersion': 1,
  'reservoirId': '027b3701-5c07-46d4-9d96-e5539f81e8bf',
  'reservoirVersion': 33,
  'index': 17,
  'isStartingVersion': True
}
```

This means we can take a look at the operations in the `/_checkpoints/offsets/17` directory. 

```
v1
{"batchWatermarkMs":1570578599000,"batchTimestampMs":1687853100013,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"sourceVersion":1,"reservoirId":"027b3701-5c07-46d4-9d96-e5539f81e8bf","reservoirVersion":33,"index":17,"isStartingVersion":true}
```

In [80]:
streamingQuery.stop()

In [79]:
streamingQuery.status


{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

## View the Checkpoint Data

In [85]:
%%sh
cat ../../applications/dl_streaming_aggs/v0.0.1/_checkpoints/offsets/17

v1
{"batchWatermarkMs":1570578599000,"batchTimestampMs":1687853100013,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"sourceVersion":1,"reservoirId":"027b3701-5c07-46d4-9d96-e5539f81e8bf","reservoirVersion":33,"index":17,"isStartingVersion":true}

## Applications have State in the form of Checkpoints. 
> Delta maintains its state in the terms of completed atomic transactions.

The application checkpoints track where the application has last successfully read from the Delta Lake table (source), and the application also keeps track of the delta version based on the resulting transformation and insert into the (sink). In our case we read from the `default.ecomm_by_day` and did some windowed aggregations for events per session, and then recorded the results in a new table named `default.ecomm_aggs_table`.

Let's peak at the checkpoint data. Open up `

## The Fruits of our Quick Labor
The shopping aggregations is our own 'sessionization' based on things that would work for the hitchhikers guide to Delta Lake streaming. Have we learned a lot from the data? Maybe. Have we learned a lot more about how Delta Lake works? Surely.

In [66]:
(spark.read
 .table("default.ecomm_aggs_table")
 .where(col("event_date").isin("2019-10-01","2019-10-02"))
 .show(10, truncate=False))

+------------------------------------------+---------+----------+----------+--------------+
|window                                    |user_id  |product_id|event_date|session_events|
+------------------------------------------+---------+----------+----------+--------------+
|{2019-10-01 17:30:00, 2019-10-01 18:00:00}|536922877|3101045   |2019-10-01|2             |
|{2019-10-01 20:00:00, 2019-10-01 20:30:00}|521595164|3601405   |2019-10-01|4             |
|{2019-10-01 19:00:00, 2019-10-01 19:30:00}|542537982|26401412  |2019-10-01|1             |
|{2019-10-01 19:00:00, 2019-10-01 19:30:00}|544624772|26204073  |2019-10-01|1             |
|{2019-10-01 17:30:00, 2019-10-01 18:00:00}|555732683|28715756  |2019-10-01|1             |
|{2019-10-01 22:00:00, 2019-10-01 22:30:00}|554327957|1005008   |2019-10-01|1             |
|{2019-10-01 19:00:00, 2019-10-01 19:30:00}|515086886|1005104   |2019-10-01|1             |
|{2019-10-01 20:00:00, 2019-10-01 20:30:00}|555779608|1005105   |2019-10-01|1   

## Extra Homework: Finding Neat Patterns in the Data
> shopping is fun. We all do it, some of us even enjoy it. Regardless of your style, the one thing we have in common is that not one of us really shops the same. Investigate the 42 million shopping data points from this dataset to understand how people are shopping. 

In [None]:
(spark.read
 .table(dl_managed_table)
 .select("event_time", "event_type", "product_id", "user_session", "user_id")
 .show(100, truncate=False)
)

In [None]:
# find a user who has an interesting shopping pattern
# this user comes back frequently, views, comes back, and 10 days from the first
# view finally makes a purchase

(spark.read
 .table(dl_managed_table)
 .select("event_time", "event_type", "product_id", "user_id", "user_session")
 .where(col("user_id").eqNullSafe(516224384))
 .show(100, truncate=False)
)

# Cleaning up with Vacuum.
We are done with the introduction to Streaming. The First steps covers creating tables, and modifying the table properties, as well as understanding a little more about the structure of a Delta Lake table. During normal processing, you most likely overwrote, or deleted some data, for each transaction that affects the data in a given Delta Lake table, there are some artifacts (call it orphaned data or files) that are no longer needed for the *CURRENT* version of the Delta Lake table. We will learn more about using `vacuum` while preserving enough history to `undo`, `rewind`, or `time-travel` to a particular point in Table Time under 

In [None]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled","false")
DeltaTable.forName(spark, ecomm_aggs_table).vacuum(retentionHours=0)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled","true")