*Note:  This notebooks should be executed against a Serverelss SQL Warehouse.*

***
# Streaming Bronze Tables Ingestion Using Autoloader
***

## Set up the Declared Variables and the Schema to Use
***

In [0]:
DECLARE OR REPLACE VARIABLE schema_use STRING DEFAULT REPLACE(SPLIT(current_user(), '@')[0], '.', '_');

In [0]:
USE IDENTIFIER("fhir_workshop." || schema_use);

In [0]:
SELECT current_catalog(), current_schema();

## Reviewing Data in Volumes
***

In [0]:
SHOW VOLUMES;

We can print details about the files in volume using SQL with the `LIST` statement.  

`LIST <volume_path>;`

Note that we've added a `LIMIT` at the end of the `LIST` statement to only print the first 100 records.  

In [0]:
DECLARE OR REPLACE VARIABLE list_stmnt STRING;

SET VARIABLE list_stmnt = "LIST '/Volumes/fhir_workshop/" || schema_use || "/landing/' LIMIT 100;";

SELECT list_stmnt;

Here we've dynamically built a SQL statement by declaring and setting a string variable that contains our SQL statement we wish to execute simply by concatentating other SQL variables together.  We use EXECUTE IMMEDIATE to 

In [0]:
EXECUTE IMMEDIATE list_stmnt;

## Using DB Serverless SQL to Create or Refresh Streaming Tables
***

Executing `CREATE OR REFRESH STREAMING TABLE` statements against a Serverless SQL warehouse automatically creates a Lakeflow (DLT) pipeline beheind the scenes using Serverless compute.  This is the easiest way to create a streaming pipeline.  Databricks automatically takes care of capturing the state, checkpoints, cluster size, and pipeline creation to execute the streaming table.  Users only have to worry about writing SQL.  

All streaming tables are "batch" unless the `CONTINOUS` statement is added to the CRST statement:  `CREATE OR REFRESH CONTINOUS STREAMING TABLE`.  If a streaming table has already been defined then the `REFRESH` statement will process any data that is ready to be processed.  Once all data that is available has been processed, the stream is automatically stopped until the next `REFRESH` is trigged.  

Normally we would never use the `DROP TABLE` statement.  For normal managed tables the `CREATE OR REPLACE` statement does the same thing as a truncate and reload while preserving the delta log, allowing for a back out strategy should the load need to be reversed.  This allows for better **DataOps**.  However since streaming tables are essentially Lakeflow (DLT) pipelines, a `FULL REFRESH` is essentially the same as a `DROP TABLE` combined with a `REFRESH`. 

Thefore, with streaming tables (or materialized views), we'll use the `FULL REFRESH` statement when we wish to completely reload the data over, and `DROP TABLE` followed by a `CREATE OR REFRESH STREAMING TABLE` statement when we need to be explict with our schema evolution.  

In [0]:
DROP TABLE IF EXISTS fhir_bronze;

We're going to create two bronze tables.  The first bronze will use autoloader and therefore will need to be dynamic due to the differences in the volume path's here.  Recall that a Volume path always has a root of `/Volumes/<catalog>/<schema>/<volume_name>/`.  This means that pretty much any code that will go through a CI/CD process will need dynamic SQL to incorporate changing volume paths based on catalog or schema names based on environments.  

For this first bronze table, we'll use autoloaders SQL function `read_files`.  There are many different file formats that can be automatically loaded to a bronze table that will infer its schema, including JSON.  However this requires each JSON file to have the exact same schema, and due to the nature of FHIR data, no two JSONs are gurranteed to have the same schema.  This implies that normal methods for parsing JSON files are not possible with FHIR. 

Thankfully, we now have the VARIANT data type, which is able to parse every JSON file completely independently, including inside a stream.  To use the VARIANT data type we must either use a Databricks SQL Warehouse or Databricks Runtime 15.3LTS+.  VARIANT is a public preview data type in the open source Delta Lake file format which is the default managed table file format in Databricks.  In order to use VARIANT we'll need to turn on the feature in our Delta Tables, and set the Lakeflow Pipeline channel to preview.  VARIANT will be generally available fairly soon.  While in public preview its fully supported by Databricks and may be used for production workflows.  

To transform a JSON string to VARIANT we use the `parse_json` Spark SQL function.  One thing to note about parse_json is that it must be given well formed JSON strings.  If the JSON is malformed then `parse_json` will return an error.  To simply return a NULL instead of an error we may use the `try_parse_json` function instead.  

In order to have a record of everything that we've recieved from our FHIR source, we'll first ingest the bundles as text and then use try_parse_json in a following bronze table. 

Note that Synthea produces some malformed FHIR JSONs on purpose to simulate what you'd expect in a real world setting where you FHIR is coming form multiple sources originally.  This is where brokers like Redox come in-- Redox ensures that each transaction your to recieve is always well formed and follows the same general best practices as every Redox bundle produced.  This ensures better uniformity downstream.  

In [0]:
DECLARE OR REPLACE VARIABLE create_streaming_bronze_stmnt STRING;

SET VARIABLE create_streaming_bronze_stmnt = "
CREATE OR REFRESH STREAMING TABLE fhir_bronze (
  file_metadata STRUCT<
    file_path: STRING,
    file_name: STRING,
    file_size: BIGINT,
    file_block_start: BIGINT,
    file_block_length: BIGINT,
    file_modification_time: TIMESTAMP
  > NOT NULL COMMENT 'Original meta date of the file ingested from the volume.'
  ,ingest_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP() COMMENT 'The date timestamp the file was ingested.'
  ,bundle_uuid STRING NOT NULL COMMENT 'Unique identifier for the FHIR bundle.'
  ,value STRING COMMENT 'Original JSON record ingested from the volume as a full text string value.'
)
COMMENT 'Ingest FHIR JSON records as Full Text STRING'
TBLPROPERTIES (
  'delta.enableChangeDataFeed' = 'true',
  'delta.enableDeletionVectors' = 'true',
  'delta.enableRowTracking' = 'true',
  'quality' = 'bronze'
)
AS SELECT
  _metadata as file_metadata
  ,uuid() as bundle_uuid
  ,* 
FROM STREAM read_files(
  '/Volumes/fhir_workshop/" || schema_use || "/landing/'
  ,format => 'text'
  ,wholeText => true
)
";

SELECT create_streaming_bronze_stmnt;

In the above SQL note the use of the DDL to set the column types, defaults and comments.  We also apply a table comment, and table properties.  The table properties here are critical for the efficient processing of our streaming data to ensure we always apply incremental updates, including for later SQL that will require `PIVOT`s or materialized views.  Setting your table properties correctly vastly improves performance and lowers compute costs.  

- **Enabling Change Data Feed** allows the stream or a data engineer to be able to read exactly from a particular microbatch.  
- **Deletion vectors**, now enabled by default on new managed Delta tables performs a soft delete using meta data on initial processing, and then later a background process physcially deletes the records from the delta tables, vastly increasing performance during active loads.  
- **Row Tracking** adds additional metadata such that streams or materialized views can be sure which data has been recently updated, inserted or deleted, resulting in incremental loads or streams inside of full refreshes.  This is required to use `PIVOT` in streaming SQL.  

In [0]:
EXECUTE IMMEDIATE create_streaming_bronze_stmnt;

In [0]:
SELECT 
  file_metadata
  ,ingest_time
  ,bundle_uuid
  ,substr(value FROM 0 FOR 10000) as value -- note that these bundles are so large that attempting to print even one as a full string yields results that are too large for the notebook to display! 
  -- ,value
FROM 
  fhir_bronze 
limit 100;

In [0]:
select count(*) as rcrd_cnt from fhir_bronze;

In [0]:
DROP TABLE IF EXISTS fhir_bronze_variant;

Now we'll execute our second streaming bronze table and turn the string value column into a fhir VARIANT column.  

In [0]:
CREATE OR REFRESH STREAMING TABLE fhir_bronze_variant (
  bundle_uuid STRING NOT NULL COMMENT 'Unique identifier for the FHIR bundle.'
  ,ingest_time TIMESTAMP NOT NULL COMMENT 'The date timestamp the file was ingested.'
  ,file_metadata STRUCT<
    file_path: STRING,
    file_name: STRING,
    file_size: BIGINT,
    file_block_start: BIGINT,
    file_block_length: BIGINT,
    file_modification_time: TIMESTAMP
  > NOT NULL COMMENT 'Original meta date of the file ingested from the volume.'
  ,fhir VARIANT COMMENT 'Original JSON record fully parsed as a variant data type.'
)
COMMENT 'Evaluate FHIR JSON records as VARIANT'
TBLPROPERTIES (
  'delta.enableChangeDataFeed' = 'true'
  ,'delta.enableDeletionVectors' = 'true' 
  ,'delta.enableRowTracking' = 'true'
  ,'quality' = 'bronze'
  ,'pipelines.channel' = 'PREVIEW'
  ,'delta.feature.variantType-preview' = 'supported'
)
AS SELECT
  bundle_uuid
  ,ingest_time
  ,file_metadata
  ,try_parse_json(value) as fhir 
FROM STREAM fhir_bronze;

In [0]:
select 
  bundle_uuid
  ,ingest_time
  ,file_metadata
  ,fhir
from 
  fhir_bronze_variant 
where fhir is not null 
limit 1;

Swith to the SQL Editior Queries to Review the Variant parsed JSON if results are too large.

***
### Writing SQL With the Variant Data Type

With Variant we not write SQL statements against any element of the original JSON file without knowing anything ahead of time about its original schema.  For traditional from_json methods to work, including inferring the JSON schema, each file should have the same consistent schema.  

However with FHIR, every bundle recieved could have any number of resource arrays and in any order.  For example, there is no guarantee that the first reosource in the entry array is the Patient array.  Synthea, used here, and Redox (a Databricks partner) have very different standard practices for what it used or not used in the bundle, yet both are valid FHIR bundles.  

For example, Redox always includes a bundle level meta object, and the first resource in their entry array is typically also a meta resource.  Synthea does not include the meta bundle object, and typically has the Patient resource first in the entry array for C-CDA converted bundles (though this need need be the case for all types of bundles).  Additonally Synthea occasionally produces malformed JSON files, whereas Redox only sends perfectly formed JSON files.  

Below we show a few ways to write SQL against fully parsed JSON files in VARIANT.  

- Using the `<variant_colname>:<path>::<datatype>` method where a single colon `:` indicates an entry path for the data selected, and the double colon `::` allows a shorthand for casting the value returned as the desired datatype.  
- If we prefer to be versbose, we can always use the `CAST(expr as datatype)` function outright.  
- And `variant_get` combines the familiar JSON paths that many are familiar with, including the `$.` notation, with a `CAST` where we provide the desired datatype as a string value. 

Note that if we don't cast the selected VARIANT data to a different datatype, then it will remain as a VARIANT column.  This is desirable to continue to use for structs until a final schema may be applied, however VARIANT column types can not be used as group by clauses for aggregation functions.  

In [0]:
select 
  bundle_uuid
  ,CAST(fhir:entry[0].resource.address[0].extension[0].extension[0].valueDecimal as double) as latitude
  ,fhir:entry[0].resource.address[0].extension[0].extension[1].valueDecimal::double as longitude
from 
  fhir_bronze_variant
where 
  fhir is not null
  and variant_get(fhir, '$.entry[0].resource.resourceType', 'STRING') = 'Patient';