# メダリオンアーキテクチャ実践


本ノートブックでは、 DQX によるデータ品質チェックを実施して、データを隔離する方法を紹介します。


## 事前準備

In [0]:
%run ./01_config

In [0]:
# スキーマを作成
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}")

In [0]:
# Volume の作成
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog_name}.{schema_name}.{volume_name}")

# Volume の初期化
dbutils.fs.rm(volume_dir, True)

## Landing（ソースデータ）の準備

In [0]:
%run ./02_medalion_data

In [0]:
print(data_01)

In [0]:
# ディレクトリを作成
landing_dir = volume_dir + "/landing"
dbutils.fs.mkdirs(landing_dir)
dbutils.fs.rm(landing_dir, True)

In [0]:
# Databrikcks (Spark) からアクセスする方法
display(dbutils.fs.ls(volume_dir))

In [0]:
# Python からも Volume にアクセス可能
import os

for f in os.listdir(volume_dir):
    full_path = os.path.abspath(os.path.join(volume_dir, f))
    print(full_path)

In [0]:
# データ取り込み日の Hive パーティションを含めたディレクトリをセット
ingest_timestamp = "/audit__ingest_timestamp=2025-01-01 00:00:00"
file_name = "data.csv"

file_dir = landing_dir + ingest_timestamp 
os.makedirs(file_dir, exist_ok=True)

file_path = file_dir + "/" + file_name
print(file_path)

In [0]:
# ファイルに書き込む
with open(file_path, "w", encoding="utf-8") as f:
    f.write(data_01)

In [0]:
# ディレクトリを確認
print("-- landing_dir")
display(dbutils.fs.ls(landing_dir))
print("-- file_dir")
display(dbutils.fs.ls(file_dir))

In [0]:
# ファイル内容を確認
print(dbutils.fs.head(file_path))

In [0]:
# file_path を指定して Spark でデータを読み込む
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(file_path)
)
df.printSchema()
df.display()

In [0]:
# landing_dir を指定して Spark でデータを読み込む
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(landing_dir)
)
df.printSchema()
df.display()

## Bronze の処理の準備

In [0]:
# テーブル名を指定
bronze_table_name = "bronze_01"
bronze_table_full_name = f"{catalog_name}.{schema_name}.{bronze_table_name}"

In [0]:
# チェックポイントのディレクトリを指定
checkpoint_dir_01 = f"{volume_dir}/bronze_01_checkpoint"
dbutils.fs.rm(checkpoint_dir_01, True)

In [0]:
# 連携項目を文字型の Bronze テーブルを作成
spark.sql(f"""
CREATE OR REPLACE TABLE {bronze_table_full_name} (
    n_nationkey STRING,
    n_name STRING,
    n_regionkey STRING,
    n_comment STRING,
    ingest_timestamp timestamp 
)
""")

In [0]:
df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("header", True)
    .option("inferSchema", False)
    .option("cloudFiles.schemaLocation", checkpoint_dir_01)
    .load(landing_dir)
)

In [0]:
(
    df.writeStream.format("delta")
    .option("mergeSchema", True)
    .option("checkpointLocation", checkpoint_dir_01)
    .outputMode("append")
    .trigger(availableNow=True)
    .table(bronze_table_full_name)
)

In [0]:
import time

# 数秒待機
time.sleep(2)

In [0]:
df = spark.table(bronze_table_full_name)
df.display()

In [0]:
# chekpoint のディレクトリを確認
display(dbutils.fs.ls(checkpoint_dir_01))



In [0]:
# 取り込み済みファイルを確認
df = spark.sql(
    f"""
    SELECT * FROM cloud_files_state('{checkpoint_dir_01}');
    """
)
df.display()

In [0]:
# _schemas 配下を確認
import json
import pprint

print("-- _schemas")
print(dbutils.fs.head(checkpoint_dir_01 + "/_schemas/0"))

text = dbutils.fs.head(checkpoint_dir_01 + "/_schemas/0")
lines = text.splitlines()
second_line = lines[1] if len(lines) > 1 else None

print("")
print("-- json")
second_line_dict = json.loads(second_line)
pprint.pprint(second_line_dict)

print("")
print("-- dataSchemaJson")
dataSchemaJson = json.loads(second_line_dict["dataSchemaJson"])
pprint.pprint(dataSchemaJson)

## Silver の処理

In [0]:
# テーブル名を指定
silver_table_name = "silver_01"
silver_table_full_name = f"{catalog_name}.{schema_name}.{silver_table_name}"

In [0]:
# Silver テーブルを作成
spark.sql(f"""
CREATE OR REPLACE TABLE {silver_table_full_name} (
    n_nationkey INT,
    n_name STRING,
    n_regionkey INT,
    n_comment STRING,
    audit__ingest_timestamp timestamp 
)
""")

In [0]:
# bronze table における主キーごとの最新レコードを取得
slv_records = f"""
SELECT
    n_nationkey,
    MAX(audit__ingest_timestamp) AS max_ingest_timestamp
    FROM
        {bronze_table_full_name}
    GROUP BY
        n_nationkey
"""
slv_records_df = spark.sql(slv_records)

# 一時ビューを定義
temp_view_name = "slv_records"
slv_records_df.createOrReplaceTempView(temp_view_name)

# データを確認
slv_records_df.display()

In [0]:
# Siver テーブルに反映対象のデータを取得
brz_to_slv_sql = f"""
SELECT
    TRY_CAST(brz.n_nationkey AS INT) AS n_nationkey,
    brz.n_name,
    TRY_CAST(brz.n_regionkey AS INT) AS n_regionkey,
    brz.n_comment,
    audit__ingest_timestamp AS audit__ingest_timestamp
    FROM
        {bronze_table_full_name} AS brz
    INNER JOIN 
        slv_records AS slv
        ON 
            brz.n_nationkey =  slv.n_nationkey
            AND brz.audit__ingest_timestamp =  slv.max_ingest_timestamp
"""
df = spark.sql(brz_to_slv_sql)

# dropDuplicates関数にて、主キーの一意性を保証。連携日ごとの一意性が保証されないことがあるため。
df = df.drop_duplicates(['n_nationkey'])

df.display()

In [0]:
# Merge処理を実行
temp_view_name_02 = f'_tmp_product2__silver'
df.createOrReplaceTempView(temp_view_name_02)

returned_df = spark.sql(f'''
MERGE INTO {silver_table_full_name} AS tgt
  USING {temp_view_name_02} AS src
  
  ON tgt.n_nationkey = src.n_nationkey 

  WHEN MATCHED
  AND tgt.audit__ingest_timestamp < src.audit__ingest_timestamp
    THEN UPDATE SET *
  WHEN NOT MATCHED
    THEN INSERT *
''')
returned_df.display()

In [0]:
df = spark.table(silver_table_full_name)
df.display()

## Gold の処理

In [0]:
# テーブル名を指定
gold_table_name = "gold_01"
gold_table_full_name = f"{catalog_name}.{schema_name}.{gold_table_name}"

## Gold の処理

In [0]:
# テーブル名を指定
gold_table_name = "gold_01"
gold_table_full_name = f"{catalog_name}.{schema_name}.{gold_table_name}"

In [0]:
# Gold テーブルを作成
res_df = spark.sql(f"""
CREATE OR REPLACE TABLE {gold_table_full_name}
AS SELECT
  n_regionkey AS region_key,
  count(n_nationkey) AS count_of_sales
FROM
  {silver_table_full_name}
GROUP BY
  n_regionkey;
""")
res_df.display()

In [0]:
# Gold テーブルを作成
res_df = spark.sql(f"""
CREATE OR REPLACE TABLE {gold_table_full_name}
AS SELECT
  n_regionkey AS region_key,
  count(n_nationkey) AS count_of_sales
FROM
  {silver_table_full_name}
GROUP BY
  n_regionkey;
""")
res_df.display()

In [0]:
# データの確認
df = spark.table(gold_table_full_name)
df.display()

In [0]:
# データの活用（region マスターと結合等）
df = spark.sql(
    f"""
    SELECT
        dm.region_key,
        rg.r_name,
        dm.count_of_sales
        FROM {gold_table_full_name} dm

        INNER JOIN samples.tpch.region rg
            ON dm.region_key = rg.r_regionkey

    """
)
df.display()

## Bronze -> Silver -> Gold の一連の処理処理

### Landing（ソースデータ）の準備

In [0]:
%run ./02_medalion_data

In [0]:
print("-- 新規データ")
print(data_02)
print()
print("-- 前回データ")
print(data_01)

In [0]:
import os

# ディレクトリを定義
landing_dir = volume_dir + "/landing"

# データ取り込み日の Hive パーティションを含めたディレクトリをセット
ingest_timestamp = "/audit__ingest_timestamp=2025-02-01 00:00:00"
file_name = "data.csv"

file_dir = landing_dir + ingest_timestamp 
os.makedirs(file_dir, exist_ok=True)

file_path = file_dir + "/" + file_name
print("-- csv ファイルのディレクトリ")
print(file_path)

# ファイルに書き込む
with open(file_path, "w", encoding="utf-8") as f:
    f.write(data_02)

# ファイル内容を確認
print()
print("-- csv ファイル内")
print(dbutils.fs.head(file_path))


# landing_dir を指定して Spark でデータを読み込む
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(landing_dir)
)
print()
print("-- Spark Dataframe")
df.printSchema()
df.display()

### Bronze の処理

In [0]:
# テーブル名を指定
bronze_table_name = "bronze_01"
bronze_table_full_name = f"{catalog_name}.{schema_name}.{bronze_table_name}"

# チェックポイントのディレクトリを指定
checkpoint_dir_01 = f"{volume_dir}/bronze_01_checkpoint"


df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("header", True)
    .option("inferSchema", False)
    .option("cloudFiles.schemaLocation", checkpoint_dir_01)
    .load(landing_dir)
)

(
    df.writeStream.format("delta")
    .option("mergeSchema", True)
    .option("checkpointLocation", checkpoint_dir_01)
    .outputMode("append")
    .trigger(availableNow=True)
    .table(bronze_table_full_name)
)

In [0]:
import time

# 数秒待機
time.sleep(2)

In [0]:
df = spark.table(bronze_table_full_name).orderBy("n_nationkey")
df.display()

### Silver の処理

In [0]:
# bronze table における主キーごとの最新レコードを取得
slv_records = f"""
SELECT
    n_nationkey,
    MAX(audit__ingest_timestamp) AS max_ingest_timestamp
    FROM
        {bronze_table_full_name}
    GROUP BY
        n_nationkey
"""
slv_records_df = spark.sql(slv_records)

# 一時ビューを定義
temp_view_name = "slv_records"
slv_records_df.createOrReplaceTempView(temp_view_name)

# データを確認
slv_records_df.display()

In [0]:
# Siver テーブルに反映対象のデータを取得
brz_to_slv_sql = f"""
SELECT
    TRY_CAST(brz.n_nationkey AS INT) AS n_nationkey,
    brz.n_name,
    TRY_CAST(brz.n_regionkey AS INT) AS n_regionkey,
    brz.n_comment,
    audit__ingest_timestamp AS audit__ingest_timestamp
    FROM
        {bronze_table_full_name} AS brz
    INNER JOIN 
        slv_records AS slv
        ON 
            brz.n_nationkey =  slv.n_nationkey
            AND brz.audit__ingest_timestamp =  slv.max_ingest_timestamp
"""
df = spark.sql(brz_to_slv_sql)

# dropDuplicates関数にて、主キーの一意性を保証。連携日ごとの一意性が保証されないことがあるため。
df = df.drop_duplicates(['n_nationkey'])

df.display()

In [0]:
# Merge処理を実行
temp_view_name_02 = f'_tmp_product2__silver'
df.createOrReplaceTempView(temp_view_name_02)

returned_df = spark.sql(f'''
MERGE INTO {silver_table_full_name} AS tgt
  USING {temp_view_name_02} AS src
  
  ON tgt.n_nationkey = src.n_nationkey 

  WHEN MATCHED
  AND tgt.audit__ingest_timestamp < src.audit__ingest_timestamp
    THEN UPDATE SET *
  WHEN NOT MATCHED
    THEN INSERT *
''')
returned_df.display()

In [0]:
df = spark.table(silver_table_full_name).orderBy("n_nationkey")
df.display()


### Gold の処理

In [0]:
# テーブル名を指定
gold_table_name = "gold_01"
gold_table_full_name = f"{catalog_name}.{schema_name}.{gold_table_name}"

In [0]:
# Gold テーブルを作成
res_df = spark.sql(f"""
CREATE OR REPLACE TABLE {gold_table_full_name}
AS SELECT
  n_regionkey AS region_key,
  count(n_nationkey) AS count_of_sales
FROM
  {silver_table_full_name}
GROUP BY
  n_regionkey;
""")
res_df.display()

In [0]:
# データの確認
df = spark.table(gold_table_full_name)
df.display()

In [0]:
# end