In [0]:
%sql
SELECT current_catalog(), current_schema()

In [0]:
%sql
LIST '/Volumes/workspace/default/hospital_drug'

In [0]:
%sql
SELECT *
FROM read_files(
  '/Volumes/workspace/default/hospital_drug',
  format => "json"
)
-- LIMIT 10;

In [0]:
SELECT schema_of_json('[{"id":1,"user_id":1,"hospital_npi":"1326792961","hospital_name":"FAMILY MEDICAL CENTER OF MI, INC","hospital_city":"TEMPERANCE","drug_name":null,"drug_brand":"Orphenadrine Citrate","date_taken":"2025-05-04T10:11:28Z","drug_price":3.99}]')
AS schema

In [0]:
DROP TABLE IF EXISTS hospital_drug_bronze_raw;

-- Create the Delta table
-- CREATE OR REFRESH STREAMING TABLE hospital_drug_bronze_raw AS
CREATE OR REPLACE TABLE hospital_drug_bronze_raw AS
SELECT *,
  cast(from_unixtime(user_first_touch_timestamp / 1000000) AS DATE) AS first_touch_date,
  _metadata.file_modification_time AS file_modification_time,      -- Last data source file modification time
  _metadata.file_name AS source_file,                              -- Ingest data source file name
  current_timestamp() as ingestion_time                            -- Ingestion timestamp
FROM read_files(
  "/Volumes/workspace/default/hospital_drug/hospital_drug_MOCK_DATA.json",
  format => "json",
  rescueddatacolumn => "_rescued_data"    -- Create the _rescued_data column
);

DROP TABLE IF EXISTS transaction_bronze_raw;

-- Create the Delta table
-- CREATE OR REFRESH STREAMING TABLE transaction_bronze_raw AS
CREATE OR REPLACE TABLE transaction_bronze_raw AS
SELECT *,
  cast(from_unixtime(user_first_touch_timestamp / 1000000) AS DATE) AS first_touch_date,
  _metadata.file_modification_time AS file_modification_time,      -- Last data source file modification time
  _metadata.file_name AS source_file,                              -- Ingest data source file name
  current_timestamp() as ingestion_time                            -- Ingestion timestamp
FROM read_files(
  "/Volumes/workspace/default/transaction/transaction_MOCK_DATA.json",
  format => "json",
  rescueddatacolumn => "_rescued_data"    -- Create the _rescued_data column
);

DROP TABLE IF EXISTS users_bronze_raw;

-- Create the Delta table
-- CREATE OR REFRESH STREAMING TABLE users_bronze_raw AS
CREATE OR REPLACE TABLE users_bronze_raw AS
SELECT *,
  cast(from_unixtime(user_first_touch_timestamp / 1000000) AS DATE) AS first_touch_date,
  _metadata.file_modification_time AS file_modification_time,      -- Last data source file modification time
  _metadata.file_name AS source_file,                              -- Ingest data source file name
  current_timestamp() as ingestion_time                            -- Ingestion timestamp
FROM read_files(
  "/Volumes/workspace/default/users/users_MOCK_DATA.json",
  format => "json",
  rescueddatacolumn => "_rescued_data"    -- Create the _rescued_data column
);

In [0]:
-- Display the table
-- SELECT *
-- FROM hospital_drug_bronze_raw
-- LIMIT 25;

SHOW TABLES FROM default

In [0]:
DESCRIBE TABLE EXTENDED hospital_drug_bronze_raw;


In [0]:
-- COPY INTO tracks the files it has previously ingested. If the command is run again, no additional data is ingested because the files in the source directory haven't changed.

COPY INTO hospital_drug_bronze_raw
  FROM '/Volumes/workspace/default/hospital_drug'
  FILEFORMAT = json
  COPY_OPTIONS ('mergeSchema' = 'true');     -- Merge the schema of each file;

COPY INTO transaction_bronze_raw
  FROM '/Volumes/workspace/default/transaction'
  FILEFORMAT = json
  COPY_OPTIONS ('mergeSchema' = 'true');     -- Merge the schema of each file;

COPY INTO users_bronze_raw
  FROM '/Volumes/workspace/default/users'
  FILEFORMAT = json
  COPY_OPTIONS ('mergeSchema' = 'true');     -- Merge the schema of each file;

In [0]:
-- Only for appending new data if the schema has not changed
--REFRESH STREAMING TABLE hospital_drug_bronze_raw;

DESCRIBE HISTORY hospital_drug_bronze_raw;

In [0]:
-- CREATE OR REPLACE TABLE store_data AS SELECT
-- '{
--    "store":{
--       "fruit": [
--         {"weight":8,"type":"apple"},
--         {"weight":9,"type":"pear"}
--       ],
--       "basket":[
--         [1,2,{"b":"y","a":"x"}],
--         [3,4],
--         [5,6]
--       ],
--       "book":[
--         {
--           "author":"Nigel Rees",
--           "title":"Sayings of the Century",
--           "category":"reference",
--           "price":8.95
--         },
--         {
--           "author":"Herman Melville",
--           "title":"Moby Dick",
--           "category":"fiction",
--           "price":8.99,
--           "isbn":"0-553-21311-3"
--         },
--         {
--           "author":"J. R. R. Tolkien",
--           "title":"The Lord of the Rings",
--           "category":"fiction",
--           "reader":[
--             {"age":25,"name":"bob"},
--             {"age":26,"name":"jack"}
--           ],
--           "price":22.99,
--           "isbn":"0-395-19395-8"
--         }
--       ],
--       "bicycle":{
--         "price":19.95,
--         "color":"red"
--       }
--     },
--     "owner":"amy",
--     "zip code":"94025",
--     "fb:testid":"1234"
--  }' as raw;


 SELECT raw:store.bicycle FROM store_data;

-- Use backticks to escape special characters. References are case insensitive when you use backticks.
-- Use brackets to make them case sensitive.
SELECT raw:`zip code`, raw:`Zip Code`, raw:['fb:testid'] FROM store_data;

-- Use brackets
SELECT raw:store['bicycle'], raw:store['BICYCLE'] FROM store_data;

SELECT explode(from_json(raw:store.book, 'ARRAY<STRUCT<author:STRING,title:STRING,category:STRING,price:DOUBLE,isbn:STRING,reader:ARRAY<STRUCT<age:INT,name:STRING>>>>')) FROM store_data;

In [0]:
-- Refreshes the materialized view to reflect the latest available data
--REFRESH MATERIALIZED VIEW catalog.schema.view_name;

-- Refreshes the streaming table to process the latest available data
-- The current catalog and schema will be used to qualify the table
REFRESH STREAMING TABLE hospital_drug_bronze_raw;

-- Truncates the table and processes all data from scratch for the streaming table
REFRESH STREAMING TABLE hospital_drug_bronze_raw FULL;

In [0]:
-- Bedanya dengan refresh ini ada target dan source. Kalau merge satu2 lumayan juga

--MERGE WITH SCHEMA EVOLUTION INTO main_users_target target  -- Use the MERGE WITH SCHEMA EVOLUTION INTO statement
MERGE INTO main_users_target target
USING update_users_source source
ON target.id = source.id
WHEN MATCHED AND source.status = 'update' THEN
  UPDATE SET 
    target.email = source.email,
    target.status = source.status
WHEN MATCHED AND source.status = 'delete' THEN
  DELETE
WHEN NOT MATCHED THEN
  INSERT (id, first_name, email, sign_up_date, status)
  VALUES (source.id, source.first_name, source.email, source.sign_up_date, source.status);

In [0]:
select * FROM transaction_user_drugs_silver