In [0]:
DECLARE OR REPLACE VARIABLE catalog_name STRING DEFAULT "main";
DECLARE OR REPLACE VARIABLE schema_name STRING DEFAULT "synthea";
DECLARE OR REPLACE VARIABLE full_refresh BOOLEAN DEFAULT false;
DECLARE OR REPLACE VARIABLE table_name STRING DEFAULT "encounters";

In [0]:
SET VARIABLE catalog_name = :catalog_name; 
SET VARIABLE schema_name = :schema_name;
SET VARIABLE full_refresh = CASE WHEN :full_refresh = 'true' THEN true ELSE false END;  
SET VARIABLE table_name = :table_name; 

In [0]:
select catalog_name, schema_name, full_refresh, table_name;

In [0]:
USE IDENTIFIER(catalog_name || "." || schema_name);

In [0]:
SELECT current_catalog(), current_schema();

In [0]:
DECLARE OR REPLACE VARIABLE full_refresh_stmnt STRING; 

SET VARIABLE full_refresh_stmnt = CASE 
  WHEN full_refresh = true THEN "DROP TABLE IF EXISTS " || table_name || ";"
  ELSE "SELECT 'Performing Standard Refresh of " || table_name || ".' AS message;" 
END;

SELECT full_refresh_stmnt;

In [0]:
EXECUTE IMMEDIATE full_refresh_stmnt;

In [0]:
DECLARE OR REPLACE VARIABLE table_specification STRING;

SET VARIABLE table_specification = "
(
  encounter_id STRING
  ,encounter_start_date TIMESTAMP
  ,encounter_end_date TIMESTAMP
  ,patient_id STRING
  ,organization_id STRING
  ,provider_id STRING
  ,payer_id STRING
  ,encounter_class STRING
  ,code BIGINT
  ,description STRING
  ,base_encounter_cost DOUBLE
  ,total_claim_cost DOUBLE
  ,payer_coverage DOUBLE
  ,reason_code BIGINT
  ,reason_description STRING
  ,_rescued_data STRING
  ,CONSTRAINT encounter_patient_id_null EXPECT (patient_id IS NOT NULL)
)"

In [0]:
DECLARE OR REPLACE VARIABLE table_clauses STRING; 

SET VARIABLE table_clauses = "
--COMMENT 'Raw snythethic patient data CSV files ingested from the landing volume for the " || table_name || " data set.'
TBLPROPERTIES (
  'quality' = 'bronze'
  ,'delta.enableChangeDataFeed' = 'true'
  ,'delta.enableDeletionVectors' = 'true'
  ,'delta.enableRowTracking' = 'true'
)
--CLUSTER BY AUTO"

In [0]:
DECLARE OR REPLACE VARIABLE crst_stmnt STRING; 

SET VARIABLE crst_stmnt = "CREATE OR REFRESH STREAMING TABLE " || table_name ||  
table_specification || 
table_clauses || " 
AS SELECT
  ID as encounter_id
  ,START as encounter_start_date
  ,STOP as encounter_end_date
  ,PATIENT as patient_id
  ,ORGANIZATION as organization_id
  ,PROVIDER as provider_id
  ,PAYER as payer_id
  ,ENCOUNTERCLASS as encounter_class
  ,CODE as code
  ,DESCRIPTION as description 
  ,BASE_ENCOUNTER_COST as base_encounter_cost
  ,TOTAL_CLAIM_COST as total_claim_cost
  ,PAYER_COVERAGE as payer_coverage
  ,REASONCODE as reason_code
  ,REASONDESCRIPTION as reason_description
  ,_rescued_data
FROM STREAM ("|| table_name || "_bronze);";

SELECT crst_stmnt;

In [0]:
EXECUTE IMMEDIATE crst_stmnt;

In [0]:
SHOW CREATE TABLE IDENTIFIER(table_name);

In [0]:
DESCRIBE SCHEMA IDENTIFIER(catalog_name || "." || schema_name);

In [0]:
ANALYZE TABLE table_name;

In [0]:
CREATE OR REPLACE FUNCTION table_properties(table_name STRING)
RETURNS TABLE(key STRING, value STRING)
RETURN SHOW TABLE IDENTIFIER(table_name);

In [0]:
WITH tbl_prop as (
  SELECT key, value FROM table_properties(table_name)
)
SELECT 
  value
FROM 
  tbl_prop
WHERE 
  key = 'pipelines.pipelineId'
;

In [0]:
SELECT 
  *
  ,parse_json(details) as details_parsed
FROM 
  __databricks_internal.__dlt_materialization_schema_d2b14e06_d391_4f6f_93b9_77ddafdee0e4.__event_log
-- WHERE 
--   event_type = 'data_quality'
ORDER BY 
  timestamp DESC;