# Running a Streaming Pipeline from S3 to a Delta Table

In this notebook we are going to go through a step-by-step pipeline to stream files from an s3 bucket/folder to a Delta Table, using an append-only mode.  The [Spark Structured Streaming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) provides an introduction to all the bells and knobs of stream.  

The pipeline pattern we want to achieve to the reading, transformation, and writing of json objects in an S3 Bucket.  We want this pipeline to be idempotent with respect to reading (an s3 object is only read and processed once) and writing (the rows in the object are only appended to the Delta table once).  Structured streaming gives us a bunch of these guarantees (and more).

To make the process a little "easier" the Metis-Job (yes, another Ancient Greek Diety) provides a set of abstractions to encapsulate all the Spark and Databricks options.  Using Metis-Job, we can also test this process locally, with only a pyspark environment at our side.  This is not perfect; the stream reader on databricks (based on the [Autoloader](https://docs.databricks.com/en/ingestion/auto-loader/index.html) functionality; called cloud files) can't be run outside Databricks.  However, the [recursive file lookup](https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html#recursive-file-lookup) native to Spark, can.

We are going to be working in the `hacking_data` Unity catalogue, in a schema (or database, if you will) called `delta_test`.  The table we'll be writing to is `sketches`; data on various Monty Python sketches (big fans of Dead Parrots, registering fish, and Spark Streaming).  In the world of Unity catalogues, this makes the path to the table `hacking_data.delta_test.sketches`.

And we'll start by importing from pyspark functions, delta, and metis_job.

## Notes on Running the Demo

* There is event data loaded into a managed catalogue volume; here `/Volumes/hacking_data/delta_test/streaming_events`.  This contains 2 json-like files (actually JSON-lines), each file contains 2 rows.
* The destination delta table is here `hacking_data.delta_test.sketch`.  Before the start of the demo, just delete it.
* The checkpoints sit in another managed volume `/Volumes/hacking_data/delta_test/checkpoints/sketch_events/`.  Before starting the demo, the `sketch_events` folder needs to be removed.  To do this, `dbutils.fs.rm(job_cfg.checkpoint_path, True)`
* Also, trying running the stream a few times (without deleting the table and checkpoints between each run) see how the idempotent aspects work.

In [0]:

from pyspark.sql.functions import col, current_timestamp
import metis_job
from delta.tables import *

# sess = metis_job.build_spark_session('a', session.create_connect_session)


## The Constructors

If, if this was a job, alot of this metis_job plating-of-the-boiler would be hidden away in the model layer.  Here, we'll be explicit about defining the following:
* A term vocab to use for the schema; sort of like a simple term translater.
* We are going to define 2 schemas.  The schema definition used in the pipeline is a [Spark StructType instance](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.StructType.html).  However, instead of coding the schema drectly in Spark StructTypes, metis_job provides an abstraction; `metis_job.Schema`.  Schema is a fluent API for constructing the various Spark Struct types, like `StructField`, `StructString`, as well as nested `StructType`.
  * The event source schema.  The schema of the json docs in the s3 bucket; the function `sketches_event_schema`.
  * The destination table schema.  The schema of the delta table; the function `sketches_schema`.  This is slightly different from the event schema, as during the streaming we transform the source events.
* The `Sketch` class.  It is a type of `DomainTable`, provided by metis_job, which abstracts away a number of base Spark and Delta functions, such as creating the table, reading, performing Delta merges and upserts, and, in this example, providing table info to the streaming functions.  There is a bunch of configuration in the `Sketch` class.  Essentially, `DomainTable` provides common Spark table functions, such as setting properties.  More importantly, it defines DeltaTable functions, such as partitioning, pruning, and functions like the infamous Delta upsert (that `identity_merge_condition` function provides the upsert rule for the delta table).  We're not using Delta upserts in this example, ony a delta append.
* The `build_job_config` function is designed for a job.  It provides the base configuration for a Spark job, and is primarily used to describe the namespace (the Unity Catalogue and its component parts) for the job.  More about that below.  Notice that the catalogue is `hacking_data` as we mentioned above.  The schema is `delta_test`, except its called `data_product`, beacuse the Unity Catalogue schema is our Data Mesh data product concept.
* The namespace (the schema in the catalogue) is defined, this takes the job config and the current Spark session.
* Finally, we initialise the `Sketch` table object.  The `build_namespace()` call, creates the Cataloge schema if it doesn't exist.  Finally, we're asked `Sketch` to create (if not exists) a managed Delta Table (`table_creation_protocol = metis_job.CreateManagedDeltaTable`), which gets called as an after object initialisation callback (`after_initialise` function).

We're done with constructing our destination catalogue, schema and table.  Now for the streaming configuration.

In [0]:
def vocab():
    return {
        "sketches": {
            "isDeleted": {
                "term": "isDeleted"
            },
            "name": {
                "term": "name"
            },
            "pythons": {
                "term": "pythons"
            },
            "season": {
                "term": "season"
            },
            "source_file": {
                "term": "source_file"
            },
            "processing_time": {
                "term": "processing_time"
            }
        },
        "columns": {
            "column1": {
                "term": "column_one"
            },
            "column2": {
                "term": "column_two",
                "sub1": {
                    "term": "sub_one"
                },
                "sub2": {
                    "term": "sub_two"
                },
                "sub3": {
                    "term": "sub_three",
                    "sub3-1": {
                        "term": "sub_three_one"
                    },
                    "sub3-2": {
                        "term": "sub_three_two"
                    }
                }
            }
        },
        "*": {
            "@id": {
                "term": "id"
            },
            "@type": {
                "term": "type"
            }
        }
    }


def sketches_event_schema():
    sketch_event_table = (metis_job.Schema(vocab=vocab())
                          .column()  # id
                          .string("*.@id", nullable=True)

                          .column()  # is_deleted
                          .string("sketches.isDeleted", nullable=True)

                          .column()  # name
                          .string("sketches.name", nullable=True)

                          .column()  # pythons array
                          .array_struct("sketches.pythons", nullable=True)
                          .string("*.@id", nullable=True)
                          .end_struct()

                          .column()  # season
                          .string("sketches.season", nullable=True))
    return sketch_event_table


def sketches_schema():
    sketch_table = (metis_job.Table(vocab=vocab())
                    .column()  # id
                    .string("*.@id", nullable=True)

                    .column()  # is_deleted
                    .string("sketches.isDeleted", nullable=True)

                    .column()  # name
                    .string("sketches.name", nullable=True)

                    .column()  # pythons array
                    .array_struct("sketches.pythons", nullable=True)
                    .string("*.@id", nullable=True)
                    .end_struct()

                    .column()  # name
                    .string("sketches.season", nullable=True)

                    .column()  # source_file
                    .string("sketches.source_file", nullable=True)

                    .column()  # processing_time
                    .timestamp("sketches.processing_time", nullable=True))
    return sketch_table


class Sketch(metis_job.DomainTable):
    table_name = "sketch"

    table_creation_protocol = metis_job.CreateManagedDeltaTable

    schema = sketches_schema().hive_schema()

    partition_columns = ("name",)

    pruning_column = 'name'

    table_properties = [
        metis_job.TableProperty(metis_job.DataAgreementType.SCHEMA_VERSION, "0.0.1", "my_namespace"),
        metis_job.TableProperty(metis_job.DataAgreementType.PARTITION_COLUMNS, "identity", "my_namespace"),
        metis_job.TableProperty(metis_job.DataAgreementType.PRUNE_COLUMN, "identity", "my_namespace"),
        metis_job.TableProperty(metis_job.DataAgreementType.PORT, "superTable", "my_namespace"),
        metis_job.TableProperty(metis_job.DataAgreementType.UPDATE_FREQUENCY, "daily", "my_namespace"),
        metis_job.TableProperty(metis_job.DataAgreementType.DESCRIPTION, "Some description", "my_namespace"),
    ]

    def after_initialise(self):
        self.perform_table_creation_protocol()

    def identity_merge_condition(self, name_of_baseline, update_name):
        return f"{name_of_baseline}.id = {update_name}.id"

def build_job_config():
    checkpoint_volume = "/Volumes/hacking_data/delta_test/checkpoints/sketch_events"
    return metis_job.JobConfig(
        service_name="mock_job",
        catalogue="hacking_data",
        data_product="delta_test",
        job_mode=metis_job.JobMode.UNITY,
        checkpoint_location=checkpoint_volume)

def build_namespace(cfg):  
    return metis_job.NameSpace(session=spark,
                               job_config=cfg)


job_cfg = build_config()

sketches = Sketch(namespace=build_namespace(job_cfg))

## 

## Constructing the Streaming Pipeline

We're using the Databricks Autoloader functionality to stream files from S3.  This is configured in Databricks Spark as "cloudFiles".  Metis_job has a simple abstraction for this called `CloudFiles` (well, why not?). This works both in a local PySpark environment (by injecting different dependencies and configuration) and in Databricks.

The main configuration aspects of the stream are:
* `stream_reader`.  An instance of `stream_reader=metis_job.DatabricksCloudFilesStreamer`.  We initialise it with a Spark option defining the type of files to be streamed; in this case JSON.
* `cloud_source`.  This is the location of the source events.  In the case of this example, this is a managed volume within the `delta_test` schema (which is still s3 under the rather vast covers), but it could easily be an external volume configured as a Volume in the catalogue.  For an example of one of those, see `/Volumes/domain/telemetry/telemetry-events`
* `checkpoint_location`.  For our S3 volume the checkpoint location is where the stream writes progress data for fault-tolerance and idempotency purposes.
* `schema` is the previously defined event schema.  Streaming processign can infer the schema by reading ahead.  However, more complex schema (our 1 has an array struct) this doesn't work well.  Making the schema explicit reduces cdata compatibility issues as well.
* `stream_writer` is an instance of the `metis_job.DeltaStreamingTableWriter` class.  It is currently opinionated about 1 thing; the [trigger condition](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) is `{'availableNow': True}`
* Finally, we tell it to stream to the `Sketch` Delta table we created above.  In Delta Stream, all the stream writer needs is the fully qualified table name; `hacking_data.delta_test.sketch` in our case.



In [0]:
stream_reader_opts = [metis_job.SparkOption.JSON_CLOUD_FILES_FORMAT]
stream_from_location = "/Volumes/hacking_data/delta_test/streaming_events"

# dbutils.fs.rm(job_cfg.checkpoint_location, True)`

cloud_files = metis_job.CloudFiles(spark_session=spark,
                                   stream_reader=metis_job.DatabricksCloudFilesStreamer(stream_reader_opts),
                                   cloud_source=stream_from_location,
                                   checkpoint_location=job_cfg.checkpoint_location,
                                   schema=sketches_event_schema().hive_schema(),
                                   stream_writer=metis_job.DeltaStreamingTableWriter(),
                                   stream_to_table=sketches)


# Setting up the Stream

The `read_stream` function sets up the stream by invoking the `read_stream` on `metis_job.DatabricksCloudFilesStreamer`.  This goves us back a Dataframe.  But a DF which is in streaming mode.  This means that we ony use a subset of Spark functions on the DF; for instance, we can't use any of the action functions (like `collect` or `take`).

In [0]:
df = cloud_files.read_stream()

df.isStreaming

## Transformations

What we can do, however, is to perform transformation functions.  There are some limits when streaming, but most [transformations are supported](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#operations-on-streaming-dataframesdatasets)

In [0]:
df2 = df.withColumns({'source_file': col("_metadata.file_path"),
                          'processing_time': current_timestamp()})


## Write The Stream

We are using the `metis_job.DeltaStreamingTableWriter` stream writer, which appends the transformed rows generated by the stream to the delta table defined in the `cloud_files` configuration. Writing to the stream essentially starts the stream processing; the previous 2 steps didn't activate the stream.  

The started stream returns a `StreamingQuery` object to allow tracking of the job.  In `DeltaStreamingTableWriter`, it calls `awaitTermination()` on the resulting StreamingQuery object. Then returns the object.  We can then check the stream's status.

In [0]:
streaming_query = cloud_files.write_stream(df2)

In [0]:
streaming_query.status

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

## Checking our Stream

Finally, we can check the data written from the events to the Delta table with a common Spark read operation.  Our `Sketch` object takes care of the details via the `read` function, which returns a DF of the Delta Table.  Finally we show what's there. 

In [0]:
sdf = sketches.read() 

sdf.show()


+--------------------+---------+--------------------+--------------------+------+--------------------+--------------------+
|                  id|isDeleted|                name|             pythons|season|         source_file|     processing_time|
+--------------------+---------+--------------------+--------------------+------+--------------------+--------------------+
|https://example.n...|    false|The Spanish Inqui...|[{https://example...|S02E02|/Volumes/hacking_...|2024-04-19 01:45:...|
|https://example.n...|    false|The Piranha Brothers|[{https://example...|S02E01|/Volumes/hacking_...|2024-04-19 01:45:...|
|https://example.n...|    false|     The Cheese Shop|[{https://example...|S02E02|/Volumes/hacking_...|2024-04-19 01:45:...|
|https://example.n...|    false|   Eric the Half Bee|[{https://example...|S02E01|/Volumes/hacking_...|2024-04-19 01:45:...|
+--------------------+---------+--------------------+--------------------+------+--------------------+--------------------+



## The Raw Delta + Spark Functions

Finally, we get the same result using raw Delta and Spark functions, without Mets-Job (well, just a little help from the schema definitions), like so....

In [0]:
checkpoint_location = "/Volumes/hacking_data/delta_test/checkpoints"
fully_qualified_table_name = "hacking_data.delta_test.sketch"

spark.sql(f"DROP TABLE IF EXISTS {fully_qualified_table_name}")
dbutils.fs.rm(checkpoint_path, True)

(DeltaTable.createIfNotExists(spark_session)
           .tableName(fully_qualified_table_name)
           .addColumns(sketches_schema().hive_schema())
           .partitionedBy((name,))
           .execute())


streaming_query_raw = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_location)
  .schema(sketches_event_schema().hive_schema())
  .load("/Volumes/hacking_data/delta_test/streaming_events")
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_location)
  .trigger(availableNow=True)
  .toTable(fully_qualified_table_name))

streaming_query_raw.awaitTermination()
