In [0]:
DECLARE OR REPLACE VARIABLE catalog_name STRING DEFAULT "main";
DECLARE OR REPLACE VARIABLE schema_name STRING DEFAULT "synthea";
DECLARE OR REPLACE VARIABLE full_refresh BOOLEAN DEFAULT false;
DECLARE OR REPLACE VARIABLE table_name STRING;

In [0]:
SET VARIABLE catalog_name = :catalog_name; 
SET VARIABLE schema_name = :schema_name;
SET VARIABLE full_refresh = CASE WHEN :full_refresh = 'true' THEN true ELSE false END; 
SET VARIABLE table_name = :table_name;

In [0]:
select catalog_name, schema_name, full_refresh, table_name;

catalog_name,schema_name,full_refresh,table_name
mgiglia,synthea,True,allergies


In [0]:
USE IDENTIFIER(catalog_name || "." || schema_name);

In [0]:
SELECT current_catalog(), current_schema();

current_catalog(),current_schema()
mgiglia,synthea


In [0]:
DECLARE OR REPLACE VARIABLE landing_volume_path STRING DEFAULT "/Volumes/" || catalog_name || "/" || schema_name || "/landing/";

SELECT landing_volume_path;

landing_volume_path
/Volumes/mgiglia/synthea/landing/


## CREATE STREAMING TABLES 
*** 

`{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }  
  table_name  
  [ table_specification ]  
  [ table_clauses ]  
  [ AS query ]`

`table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )`

`column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]`

`table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]`

`schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }`


### Allergies
***

#### Bronze

In [0]:
DECLARE OR REPLACE VARIABLE bronze_table_name STRING; 

SET VARIABLE bronze_table_name = table_name || "_bronze";

SELECT table_name, bronze_table_name;

table_name,bronze_table_name
allergies,allergies_bronze


In [0]:
DECLARE OR REPLACE VARIABLE drop_bronze_stmnt STRING; 

SET VARIABLE drop_bronze_stmnt = CASE 
  WHEN full_refresh = true THEN "DROP TABLE IF EXISTS " || bronze_table_name || ";"
  ELSE "SELECT 'Skipping drop bronze table statement.' AS message;" 
END;

SELECT drop_bronze_stmnt;

drop_bronze_stmnt
DROP TABLE IF EXISTS allergies_bronze;


In [0]:
EXECUTE IMMEDIATE drop_bronze_stmnt;

In [0]:
DECLARE OR REPLACE VARIABLE bronze_table_specification STRING;

SET VARIABLE bronze_table_specification = "
  (
    file_metadata STRUCT < file_path: STRING,
    file_name: STRING,
    file_size: BIGINT,
    file_block_start: BIGINT,
    file_block_length: BIGINT,
    file_modification_time: TIMESTAMP > NOT NULL COMMENT 'Metadata about the file ingested.',
    ingest_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP() COMMENT 'The date timestamp the file was ingested.',
    value STRING COMMENT 'The raw CSV file contents.'
  )
"

In [0]:
DECLARE OR REPLACE VARIABLE bronze_table_clauses STRING; 

SET VARIABLE bronze_table_clauses = "
  COMMENT 'Raw snythethic patient data CSV files ingested from the landing volume for the " || table_name || " data set.'
  TBLPROPERTIES (
    'delta.enableChangeDataFeed' = 'true',
    'delta.enableDeletionVectors' = 'true',
    'delta.enableRowTracking' = 'true',
    'quality' = 'bronze'
  )
"

In [0]:
DECLARE OR REPLACE VARIABLE crst_bronze_stmnt STRING; 

SET VARIABLE crst_bronze_stmnt = "CREATE OR REFRESH STREAMING TABLE IDENTIFIER(bronze_table_name) 
" || bronze_table_specification || bronze_table_clauses || " 
AS SELECT
  _metadata as file_metadata
  ,* 
FROM STREAM read_files(
  '" || landing_volume_path || table_name || "/'
  ,format => 'csv'
  ,header => true
  ,schema => 'value STRING'
  ,delimiter => '~'
  ,multiLine => false
  ,encoding => 'UTF-8'
  ,ignoreLeadingWhiteSpace => true
  ,ignoreTrailingWhiteSpace => true
  ,mode => 'FAILFAST'
)";

SELECT crst_bronze_stmnt;

crst_bronze_stmnt
"CREATE OR REFRESH STREAMING TABLE IDENTIFIER(bronze_table_name) (  file_metadata STRUCT < file_path: STRING,  file_name: STRING,  file_size: BIGINT,  file_block_start: BIGINT,  file_block_length: BIGINT,  file_modification_time: TIMESTAMP > NOT NULL COMMENT 'Metadata about the file ingested.',  ingest_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP() COMMENT 'The date timestamp the file was ingested.',  value STRING COMMENT 'The raw CSV file contents.'  )  COMMENT 'Raw snythethic patient data CSV files ingested from the landing volume for the allergies data set.'  TBLPROPERTIES (  'quality' = 'bronze'  ,delta.enableRowTracking = true  ,delta.enableDeletionVectors = true  ,delta.enableChangeDataFeed = true  )  AS SELECT  _metadata as file_metadata  ,* FROM STREAM read_files(  '/Volumes/mgiglia/synthea/landing/allergies/'  ,format => 'csv'  ,header => true  ,schema => 'value STRING'  ,delimiter => '~'  ,multiLine => false  ,encoding => 'UTF-8'  ,ignoreLeadingWhiteSpace => true  ,ignoreTrailingWhiteSpace => true  ,mode => 'FAILFAST' )"


In [0]:
EXECUTE IMMEDIATE crst_bronze_stmnt;

message
The operation was successfully executed.


In [0]:
SHOW CREATE TABLE IDENTIFIER(bronze_table_name);

createtab_stmt
"CREATE STREAMING TABLE mgiglia.synthea.allergies_bronze (  file_metadata STRUCT NOT NULL COMMENT 'Metadata about the file ingested.',  ingest_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP() COMMENT 'The date timestamp the file was ingested.',  value STRING COMMENT 'The raw CSV file contents.') COMMENT 'Raw snythethic patient data CSV files ingested from the landing volume for the allergies data set.' TBLPROPERTIES (  'delta.enableChangeDataFeed' = 'true',  'delta.enableDeletionVectors' = 'true',  'delta.enableRowTracking' = 'true',  'quality' = 'bronze',  'spark.internal.pipelines.top_level_entry.user_specified_name' = '*********(redacted)') AS SELECT  _metadata as file_metadata  ,* FROM STREAM read_files(  '/Volumes/mgiglia/synthea/landing/allergies/'  ,format => 'csv'  ,header => true  ,schema => 'value STRING'  ,delimiter => '~'  ,multiLine => false  ,encoding => 'UTF-8'  ,ignoreLeadingWhiteSpace => true  ,ignoreTrailingWhiteSpace => true  ,mode => 'FAILFAST' )"
