# Bronze と Siver の処理

## 利用するライブラリをインストール

In [0]:
%pip install python-pptx lxml -q

In [0]:
%pip install "markitdown[pdf, docx, pptx]" -q

In [0]:
dbutils.library.restartPython()

## 共通設定の読み取りとライブラリのインポート

In [0]:
%run ./00_config

In [0]:
from pyspark.sql import functions as F

In [0]:
## Bronze (File Context) の処理

In [0]:
landing_dir = source_dir + "/landing/file_context"

In [0]:
# 参照する json ファイルを表示
print("-- medalion_site.metadata.json")
metadata_file_path = source_dir + "/landing/file_context"
metadata_file_path += "/audit__ingest_timestamp=2025-10-14T10:07:33Z"
metadata_file_path += "/medalion_site.metadata.json"
print(dbutils.fs.head(metadata_file_path))

In [0]:
# File Context の json ファイルを単一列として読み込む
df = spark.read.format("text").option("wholetext", True).load(landing_dir)

# Value 列を variant 型に変換
varinat__value_col = "varinat__value"
df = df.withColumn(varinat__value_col, F.expr("try_parse_json(value)"))

# ファイルに関する情報を展開
file_context_cols_01 = {
    "raw_file_path": F.expr("try_variant_get(varinat__value, '$.file_dir', 'string')"),
    "file_sensitivity_label": F.expr(
        "try_variant_get(varinat__value, '$.sensitivity_label', 'string')"
    ),
}
df = df.withColumns(file_context_cols_01)

#
df = df.withColumn(
    varinat__value_col,
    F.explode(F.variant_get(F.col("varinat__value"), "$.value", "array<string>")),
)
df = df.withColumn(varinat__value_col, F.expr(f"try_parse_json({varinat__value_col})"))


# ファイルに関する情報を展開
file_context_cols_02 = {
    "file_id": F.concat(
        F.lit("/driveId/"),
        F.expr(
            "try_variant_get(varinat__value, '$.parentReference.driveId', 'string')"
        ),
        F.lit("/id/"),
        F.expr("try_variant_get(varinat__value, '$.id', 'string')"),
    ),
    "file_version": F.expr(
        "try_variant_get(varinat__value, '$.lastModifiedDateTime', 'string')"
    ),
    "file_dir": F.expr(
        "try_variant_get(varinat__value, '$.parentReference.path', 'string')"
    ),
    "file_name": F.expr("try_variant_get(varinat__value, '$.name', 'string')"),
    "file_url": F.expr("try_variant_get(varinat__value, '$.webUrl', 'string')"),
    "file_extension": F.regexp_extract(
        F.expr("try_variant_get(varinat__value, '$.name', 'string')"), r"\.([^.]+)$", 1
    ),
    "file_mime_type": F.expr(
        "try_variant_get(varinat__value, '$.file.mimeType', 'string')"
    ),
    "size_in_bytes": F.expr("try_variant_get(varinat__value, '$.size', 'string')").cast(
        "bigint"
    ),
    "raw_file_path": F.concat(
        F.col("raw_file_path"),
        F.expr("try_variant_get(varinat__value, '$.parentReference.path', 'string')"),
        F.lit("/"),
        F.expr("try_variant_get(varinat__value, '$.name', 'string')"),
    ),
    "raw_file_path_by_service": F.struct(
        F.concat(
            F.expr(
                f"""
                regexp_replace(
                    raw_file_path,
                    '^abfss://[^/]+/sharepoint',
                    '/Volumes/{catalog_name}/{schema_name}/{volume_name}/source_data'
                )
                """
            ),
            F.expr(
                "try_variant_get(varinat__value, '$.parentReference.path', 'string')"
            ),
            F.lit("/"),
            F.expr("try_variant_get(varinat__value, '$.name', 'string')"),
        ).alias("databricks_volumes_file_path")
    ),
    "file_created_timestamp": F.expr(
        "try_variant_get(varinat__value, '$.createdDateTime', 'timestamp')"
    ),
    "file_update_timestamp": F.expr(
        "try_variant_get(varinat__value, '$.lastModifiedDateTime', 'timestamp')"
    ),
}
df = df.withColumns(file_context_cols_02)

# 監査列を付与
audit_cols = {
    "audit__ingest_timestamp": F.try_to_timestamp("audit__ingest_timestamp"),
    "audit__update_timestamp": F.expr("current_timestamp()"),
    # value が json フォーマットでない場合には delete_flg を 1 とする
    "audit__delete_flg": F.when(
        F.col("varinat__value").isNotNull(), F.lit(0)
    ).otherwise(F.lit(1)),
    # deleted.state の値が NULL の場合には 0 する
    "audit__source_delete_flg": F.when(
        F.expr("try_variant_get(varinat__value, '$.deleted.state', 'string')").isNull(),
        F.lit(0),
    ).otherwise(F.lit(1)),
}
df = df.withColumns(audit_cols)

# Variant に変換したカラムを削除
df = df.drop(varinat__value_col)

df.display()

In [0]:
df.write.mode("overwrite").saveAsTable(
    f"{catalog_name}.{schema_name}.{file_context_input_table_name}"
)

In [0]:
file_context_df = spark.table(
    f"{catalog_name}.{schema_name}.{file_context_input_table_name}"
)
file_context_df.display()

## Bronze (File Info) の処理

In [0]:
df = file_context_df.where("audit__source_delete_flg = 0")
df = df.select(
    "_metadata.row_id",
    F.col("raw_file_path_by_service.databricks_volumes_file_path").alias(
        "raw_file_path"
    ),
)
df.display()

In [0]:
from pptx import Presentation
import re
from markitdown import MarkItDown


def can_open_with_python_pptx(local_path: str) -> bool:
    try:
        _ = Presentation(local_path)
        return True
    except Exception:
        return False


def get_pptx_slice_contents(file_path):
    markitdown = MarkItDown()
    res = markitdown.convert(file_path)
    text = res.text_content
    slides = re.split(r"<!--\s*Slide number:\s*(\d+)\s*-->", text)
    result = []
    for i in range(1, len(slides), 2):
        slide_number = int(slides[i])
        slide_content = slides[i + 1].strip()
        result.append((slide_number, slide_content))
    return result

In [0]:
raw_file_data = df.collect()

output_raw_file_data = []
for raw_d in raw_file_data:
    row_id = raw_d["row_id"]
    raw_file_path = raw_d["raw_file_path"]
    file_corrupt_check = can_open_with_python_pptx(raw_file_path)
    page_contents = get_pptx_slice_contents(raw_file_path)

    output_result = {
        "row_id": row_id,
        "raw_file_path": raw_file_path,
        "file_check": {
            "file_corrupt": file_corrupt_check,
        },
        "file_md_content": page_contents,
    }

    any_false = any(v is False for v in output_result["file_check"].values())
    output_result["audit__delete_flg"] = 1 if any_false else 0
    output_raw_file_data.append(output_result)

print(output_raw_file_data)

In [0]:
file_info_schema = """
row_id LONG,
raw_file_path STRING,
file_check STRUCT<
    file_corrupt BOOLEAN
>,
file_md_content ARRAY<STRUCT<
    slide_number INT,
    slide_content STRING
>>,
audit__delete_flg INT
"""

file_info_df = spark.createDataFrame(output_raw_file_data, file_info_schema)

file_info_df = file_info_df.withColumn("audit__update_timestamp", F.current_timestamp())

file_info_df.write.mode("overwrite").saveAsTable(
    f"{catalog_name}.{schema_name}.{file_info_table_name}"
)

In [0]:
file_info_df = spark.table(f"{catalog_name}.{schema_name}.{file_info_table_name}")
file_info_df.display()

## Bronze (File Context Output と File Context Error) の処理

In [0]:
file_context_df = spark.table(
    f"{catalog_name}.{schema_name}.{file_context_input_table_name}"
)
file_context_df = file_context_df.withColumn("row_id", F.col("_metadata.row_id"))
file_context_df.createOrReplaceTempView("_file_context")

# file_info の重複を考慮して row_id ごとに最大の更新時刻のレコードのみを連携対象とする
file_info_df = spark.table(f"{catalog_name}.{schema_name}.{file_info_table_name}")
max_file_info_df = file_info_df.groupBy("row_id").agg(
    F.max("audit__update_timestamp").alias("max_audit__update_timestamp")
)
latest_file_info_df = (
    file_info_df.alias("d")
    .join(
        max_file_info_df.alias("m"),
        (F.col("d.row_id") == F.col("m.row_id"))
        & (
            F.col("d.audit__update_timestamp") == F.col("m.max_audit__update_timestamp")
        ),
        "inner",
    )
    .select("d.*")  # 元のカラムだけ残す
)
latest_file_info_df = latest_file_info_df.dropDuplicates(["row_id"])
latest_file_info_df.createOrReplaceTempView("_file_info")

file_context_output_sql = f"""
SELECT
  fc.* EXCEPT (
    fc.row_id,
    fc.audit__delete_flg,
    fc.audit__update_timestamp
  ),
  concat(
    'abfss://unstureddata@medalion.dfs.core.windows.net/sharepoint/enriched_files/',
    nullif(fc.file_sensitivity_label, 'none'),
    '/',
    sha2(fc.file_id, 256),
    '.',
    fc.file_extension
  ) AS file_path,
  if.file_check,
  if.file_md_content,
  named_struct(
    'databricks_volumes_file_path',
    concat(
      '/Volumes/{catalog_name}/{schema_name}/{volume_name}/enriched_files/',
      nullif(fc.file_sensitivity_label, 'none'),
      '/',
      sha2(fc.file_id, 256),
      '.',
      fc.file_extension
    )
  ) AS file_path_by_service,
  CASE
    WHEN fc.audit__delete_flg = 1 THEN 1
    WHEN if.audit__delete_flg = 1 THEN 1
    ELSE 0
  END AS audit__delete_flg,
  current_timestamp() AS audit__update_timestamp
  FROM
    _file_context fc
  LEFT OUTER JOIN _file_info if
  ON
    fc.row_id = if.row_id
"""
file_context_output_src_df = spark.sql(file_context_output_sql)
display(file_context_output_src_df)

In [0]:
file_context_output_src_df.write.mode("append").saveAsTable(
    f"{catalog_name}.{schema_name}.{file_context_output_table_name}"
)

In [0]:
file_context_output_df = spark.table(
    f"{catalog_name}.{schema_name}.{file_context_output_table_name}"
)
file_context_output_df.display()

## Silver (Enriched Files) の処理

In [0]:
enriched_files_src_df = file_context_output_df.where("audit__source_delete_flg = 0")
enriched_files_src_df = enriched_files_src_df.select(
    F.col("file_id"),
    F.col("raw_file_path_by_service.databricks_volumes_file_path").alias(
        "src_dbx_volume_path"
    ),
    F.col("file_path_by_service.databricks_volumes_file_path").alias(
        "tgt_dbx_volume_path"
    ),
)
enriched_files_src_df = enriched_files_src_df.dropDuplicates(["file_id"])
src_files = enriched_files_src_df.collect()

enriched_files_src_df.display()

In [0]:
import shutil
from pathlib import Path
import shutil

for row in src_files:
    src_path = row["src_dbx_volume_path"]
    tgt_path = row["tgt_dbx_volume_path"]
    dst = Path(tgt_path)
    dst.parent.mkdir(parents=True, exist_ok=True)
    shutil.copyfile(src_path, tgt_path)

In [0]:
# ファイルが書きこまれたことを確認
display(dbutils.fs.ls(tgt_path))

## Sivler (Enriched) の処理

In [0]:
src_enriched_df = file_context_output_df.withColumn(
    "audit__update_timestamp", F.current_timestamp()
)
src_enriched_df = src_enriched_df.dropDuplicates(["file_id"])
src_enriched_df.createOrReplaceTempView("_enriched_src")

src_enriched_df.display()

In [0]:
# Merge処理を実行
res_df = spark.sql(
    f"""
MERGE INTO {catalog_name}.{schema_name}.{enriched_table_name} AS tgt
USING _enriched_src AS src
ON tgt.file_id = src.file_id
  
WHEN MATCHED
  THEN UPDATE SET *

WHEN NOT MATCHED
  -- audit__ingest_timestamp を string にしてしまったので、テーブル修正後になおそう 
  -- AND tgt.audit__ingest_timestamp < src.audit__ingest_timestamp
  THEN INSERT *
"""
)
res_df.display()

In [0]:
enriched_table_df = spark.table(f"{catalog_name}.{schema_name}.{enriched_table_name}")
enriched_table_df.display()

In [0]:
# end