In [0]:
%python
dbutils.fs.ls("dbfs:/mnt/dbacademy-datasets/data-engineer-learning-path/v02/ecommerce/raw/events-kafka/")

In [0]:
%python
# volume for json files
sql = """
CREATE VOLUME IF NOT EXISTS test_catalog.landing.json_source_volume;
"""
spark.sql(sql)

dbfs_path = "dbfs:/mnt/dbacademy-datasets/data-engineer-learning-path/v02/ecommerce/raw/events-kafka/"
volume_path = "/Volumes/test_catalog/landing/json_source_volume"
files = dbutils.fs.ls(volume_path)

if len(files) == 0:
    dbutils.fs.cp(
        dbfs_path,
        f"{volume_path}/events",
        recurse=True,
    )

# checkpoint location
sql = """
CREATE VOLUME IF NOT EXISTS test_catalog.landing.json_checkpoint_volume
"""
spark.sql(sql)

In [0]:
%sql

-- create table
CREATE TABLE IF NOT EXISTS test_catalog.bronze.kafka_events_raw AS
SELECT
	*
FROM
	READ_FILES("/Volumes/test_catalog/landing/json_source_volume/events/", format => "json");

-- review
SELECT
	*
FROM
	test_catalog.bronze.kafka_events_raw
LIMIT 5

In [0]:
-- create
CREATE TABLE IF NOT EXISTS test_catalog.bronze.kafka_events_decoded AS
SELECT
	CAST(UNBASE64(key) AS STRING) AS decoded_key,
	offset,
	partition,
	timestamp,
	topic,
	CAST(UNBASE64(value) AS STRING) AS decoded_value
FROM
	test_catalog.bronze.kafka_events_raw;

--review
SELECT
	*
FROM
	test_catalog.bronze.kafka_events_decoded
WHERE decoded_value:ecommerce != '{}'
;

In [0]:
CREATE OR REPLACE TABLE test_catalog.bronze.kafka_events_flat AS
SELECT
  offset,
  partition,
  timestamp,
  topic,
  decoded_value:device,
  decoded_value:traffic_source,
  decoded_value:geo,
  decoded_value:items
FROM
  test_catalog.bronze.kafka_events_decoded;

--display the table
SELECT * FROM test_catalog.bronze.kafka_events_flat
LIMIT 5;

In [0]:
SELECT 
  schema_of_json('{"device":"iOS","ecommerce":{"purchase_revenue_in_usd":1045.0,"total_item_quantity":1,"unique_items":1},"event_name":"finalize","event_previous_timestamp":1593878151318620,"event_timestamp":1593880248917941,"geo":{"city":"Palm Coast","state":"FL"},"items":[{"item_id":"M_STAN_Q","item_name":"Standard Queen Mattress","item_revenue_in_usd":1045.0,"price_in_usd":1045.0,"quantity":1}],"traffic_source":"google","user_first_touch_timestamp":1593869826469659,"user_id":"UA000000107305675"}') 
AS schema


In [0]:
CREATE OR REPLACE TABLE test_catalog.bronze.kafka_events_struct AS
SELECT
  * EXCEPT (decoded_value),
  from_json(
    decoded_value,
    'STRUCT<device: STRING, ecommerce: STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name: STRING, event_previous_timestamp: BIGINT, event_timestamp: BIGINT, geo: STRUCT<city: STRING, state: STRING>, items: ARRAY<STRUCT<item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source: STRING, user_first_touch_timestamp: BIGINT, user_id: STRING>'  
  ) AS value
FROM test_catalog.bronze.kafka_events_decoded;

--display the table
SELECT * FROM test_catalog.bronze.kafka_events_struct
LIMIT 5;

In [0]:
CREATE OR REPLACE TABLE test_catalog.bronze.kafka_events_exploded AS
SELECT
  decoded_key,
  array_size(value.items) AS number_of_items,
  explode(value.items) AS item,
  value.items
FROM
  test_catalog.bronze.kafka_events_struct
ORDER BY number_of_items DESC;

--display
SELECT * FROM test_catalog.bronze.kafka_events_exploded
LIMIT 5;

In [0]:
CREATE OR REPLACE TABLE test_catalog.bronze.kafka_events_variant AS
SELECT
  decoded_key,
  offset,
  partition,
  timestamp,
  topic,
  parse_json(decoded_value) AS json_value
FROM test_catalog.bronze.kafka_events_decoded;

--review and cast type
SELECT
  json_value,
  json_value:device :: STRING,
  variant_get(exploded.value, '$.item_id') :: STRING AS item_id,
  variant_get(exploded.value, '$.item_name') :: STRING AS item_name,
  variant_get(exploded.value, '$.item_revenue_in_usd') :: DOUBLE AS item_revenue_in_usd,
  variant_get(exploded.value, '$.price_in_usd') :: DOUBLE AS price_in_usd,
  variant_get(exploded.value, '$.quantity') :: BIGINT AS quantity
FROM test_catalog.bronze.kafka_events_variant,
  LATERAL variant_explode(json_value:items) AS exploded
LIMIT 5;