In [0]:
%sql
use catalog ecommerce;
use schema v01;

In [0]:
%sql
SELECT * 
FROM
text.`/Volumes/ecommerce/v01/raw/events-kafka/clickstream.json`

In [0]:
%sql
DROP TABLE IF EXISTS kafka_events_bronze_raw ;

CREATE TABLE kafka_events_bronze_raw AS 
SELECT * 
FROM read_files(
  "/Volumes/ecommerce/v01/raw/events-kafka/clickstream.json",
  format => "json"
);

select *
from kafka_events_bronze_raw;

In [0]:
%sql
SELECT 
key AS encoded_key,
cast(unbase64(key) AS STRING) AS decoded_key,
value AS encoded_value,
cast(unbase64(value) AS STRING)AS decoded_value
from kafka_events_bronze_raw;


In [0]:
%sql
CREATE OR REPLACE TABLE kafka_events_bronze_decoded AS 
SELECT 
cast(unbase64(key) AS STRING) AS decoded_key,
offset,
partition,
timestamp,
topic,
cast(unbase64(value) AS STRING ) AS decoded_value
FROM kafka_events_bronze_raw;

SELECT * FROM kafka_events_bronze_decoded;


In [0]:
%sql
SELECT 
decoded_value,
decoded_value:device,
decoded_value:event_name,
decoded_value:geo,
decoded_value:items 
FROM kafka_events_bronze_decoded;
  

In [0]:
%sql
CREATE OR REPLACE TABLE kafka_events_bronze_string_flattened AS 
SELECT 
decoded_key,
offset,
partition,
timestamp,
topic,
decoded_value:device,
decoded_value:event_name,
decoded_value:geo,
decoded_value:items 
FROM kafka_events_bronze_decoded;

SELECT * FROM kafka_events_bronze_string_flattened;

In [0]:
%sql
SELECT schema_of_json( '{"device":"iOS","ecommerce":{},"event_name":"add_item","event_previous_timestamp":1593880300696751,"event_timestamp":1593880922513100,"geo":{"city":"Westbrook","state":"ME"},"items":[{"item_id":"M_STAN_T","item_name":"Standard Twin Mattress","item_revenue_in_usd":595.0,"price_in_usd":595.0,"quantity":1}],"traffic_source":"google","user_first_touch_timestamp":1593880300696751,"user_id":"UA000000107392458"}' )
AS schema 

In [0]:
%sql
CREATE OR REPLACE TABLE kafka_events_bronze_struct AS 
SELECT 
* EXCEPT(decoded_value),
from_json(
  decoded_value, 
  "STRUCT<device: STRING, event_name: STRING, event_previous_timestamp: BIGINT, event_timestamp: BIGINT, geo: STRUCT<city: STRING, state: STRING>, items: ARRAY<STRUCT<item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source: STRING, user_first_touch_timestamp: BIGINT, user_id: STRING>") AS value 
FROM kafka_events_bronze_decoded;

SELECT * 
FROM kafka_events_bronze_struct;


In [0]:
%sql
CREATE OR REPLACE TABLE bronze_explode_array AS 
SELECT 
decoded_key, 
value.device as device,
value.geo.city,
explode(value.items) as item_in_array,
value.items
FROM kafka_events_bronze_struct
;

In [0]:
%sql
CREATE OR REPLACE TABLE kafka_events_bronze_variant AS 
SELECT  
 decoded_key,
 offset,
 partition,
 timestamp,
 topic,
 parse_json(decoded_value) AS json_variant_value
 FROM kafka_events_bronze_decoded;

 select *
 from  kafka_events_bronze_variant;


In [0]:
%sql
select * except(json_variant_value),
json_variant_value:device::STRING,
json_variant_value:items
FROM kafka_events_bronze_variant;