## Exploring The Source dDirectory

In [0]:
use catalog hive_metastore

In [0]:
%python
files = dbutils.fs.ls(f"dbfs:/mnt/demo-datasets/bookstore/orders-raw")
display(files)

## Auto Loader

In [0]:
%fs ls "dbfs:/mnt/demo/checkpoints/orders_raw"

In [0]:
%python
(spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "parquet")
            .option("cloudFiles.schemaLocation","dbfs:/mnt/demo/checkpoints/orders_raw")
            .load("dbfs:/mnt/demo-datasets/bookstore/orders-raw")
            .createOrReplaceTempView("orders_raw_temp"))

In [0]:
-- (spark.readStream
--               .format("cloudFiles")
--               .option("cloudFiles.format","parquet")
--               .option("cloudFiles.schemaLocation", "dbfs:/mnt/demo/orders_checkpoint")
--               .load(f"dbfs:/mnt/demo-datasets/bookstore/orders-raw")
--               .writeStream
--               .option("checkpointLocation", "dbfs:/mnt/demo/orders_checkpoint")
--               .table("orders_updates")
--             )

In [0]:
select count(*) from orders_raw_temp

In [0]:
%python
import re

dir_path = "dbfs:/mnt/demo-datasets/bookstore/orders-raw/"
source_file = "dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet"

# 1. 디렉토리 내 파일 목록 조회
files = dbutils.fs.ls(dir_path)

# 2. 숫자.parquet 패턴에서 숫자만 추출
numbers = []
pattern = re.compile(r"(\d+)\.parquet$")

for f in files:
    match = pattern.search(f.name)
    if match:
        numbers.append(int(match.group(1)))

# 3. 다음 파일 번호 계산
next_number = max(numbers) + 1 if numbers else 1
next_file_name = f"{next_number:02d}.parquet"

target_file = dir_path + next_file_name

# 4. 파일 복사
dbutils.fs.cp(source_file, target_file)

print(f"Copied to: {target_file}")


In [0]:
%fs ls "dbfs:/mnt/demo/checkpoints/orders_raw"

In [0]:
%fs ls "dbfs:/mnt/demo/checkpoints/orders_raw/_schemas/"

In [0]:
%fs head "dbfs:/mnt/demo/checkpoints/orders_raw/_schemas/0"

In [0]:
describe orders_raw_temp

## Enriching Raw Data

In [0]:
CREATE OR REPLACE TEMPORARY VIEW orders_tmp AS (
  SELECT *, current_timestamp() arrival_time, input_file_name() source_file
  FROM orders_raw_temp
)

In [0]:
select count(*) from orders_tmp

In [0]:
select * from orders_tmp limit 5

## Creating Bronze Table

In [0]:
%python
(spark.table("orders_tmp")
      .writeStream
      .format("delta")
      .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/orders_bronze")
      .outputMode("append")
      .table("orders_bronze"))

In [0]:
-- (spark.readStream
--             .format("cloudFiles")
--             .option("cloudFiles.format", "parquet")
--             .option("cloudFiles.schemaLocation","dbfs:/mnt/demo/checkpoints/orders_raw")
--             .load("dbfs:/mnt/demo-datasets/bookstore/orders-raw")
--             .createOrReplaceTempView("orders_raw_temp"))

In [0]:
SELECT count(*) FROM orders_bronze

## Creating Static Lookup Table

In [0]:
select * from json.`dbfs:/mnt/demo-datasets/bookstore/customers-json`

In [0]:
%python
(spark.read
      .format("json")
      .load(f"dbfs:/mnt/demo-datasets/bookstore/customers-json")
      .createOrReplaceTempView("customers_lookup"))

In [0]:
SELECT * FROM customers_lookup limit 3

## Creating Silver Table

In [0]:
%python
(spark.readStream
    .table("orders_bronze")
    .createOrReplaceTempView("orders_bronze_tmp"))

In [0]:
select * from orders_bronze_tmp limit 3

In [0]:
create or replace temporary view orders_enriched_tmp as (
  select order_id, quantity, o.customer_id, c.profile:first_name as f_name, c.profile:last_name as l_name, 
         cast(from_unixtime(order_timestamp, 'yyyy-MM-dd HH:mm:ss') as timestamp) order_timestamp, books
  from orders_bronze_tmp o
  inner join customers_lookup c
  on o.customer_id = c.customer_id
  where quantity > 0)

In [0]:
%python
(spark.table("orders_enriched_tmp")
        .writeStream
        .format("delta")
        .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/orders_silver")
        .outputMode("append")
        .table("orders_silver"))

In [0]:
select * from orders_silver limit 5

In [0]:
select count(*) from orders_silver;

## Creating Gold Table

In [0]:
%python
(spark.readStream
    .table("orders_silver")
    .createOrReplaceTempView("orders_silver_tmp"))

In [0]:
select * from orders_silver_tmp limit 3

In [0]:
create or replace temp view daily_customer_books_tmp as (
  select  customer_id, f_name, l_name, date_trunc("DD", order_timestamp) order_date, sum(quantity) books_counts
  from orders_silver_tmp
  group by customer_id, f_name, l_name, date_trunc("DD", order_timestamp)
)

In [0]:
%python
# 임시 뷰에 쌓여 있는 데이터를 지금 한 번에 전부 계산해서 결과를 Delta 테이블에 저장하고 작업을 끝내는 코드
(spark.table("daily_customer_books_tmp")
      .writeStream        # 이 데이터는 스트리밍 방식으로 쓸 것이라고 선언 “한 번에 끝날 수도 있지만 스트리밍 규칙을 쓰겠다”
      .format("delta")    # 일반 파일 말고, 관리 가능한 테이블로 저장해라
      .outputMode("complete")  # 전체 결과를 매번 다시 계산해서 저장
      .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/daily_customer_books") # 어디까지 처리했는지 기록장
      .trigger(availableNow=True)  # 지금까지 들어온 데이터만 전부 처리 끝나면 작업 종료 / 오늘 쌓인 데이터 다 처리하고 퇴근
      .table("daily_customer_books"))

In [0]:
SELECT * FROM daily_customer_books

In [0]:
for s in spark.streams.active:
    print("Stopping stream: " + s.id)
    s.stop()
    s.awaitTermination()