<a href="https://colab.research.google.com/github/syakesaba/jupyter-notebooks/blob/main/iceberg_duckdb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Secretの読み込み

In [170]:
from google.colab import userdata

S3A_ENDPOINT = userdata.get('S3A_ENDPOINT') # xxxxxxx.compat.objectstorage.{S3A_REGION}.oraclecloud.com
S3A_ACCESS_KEY = userdata.get('S3A_ACCESS_KEY') # xxxxxxxxxxxx
S3A_SECRET_KEY = userdata.get('S3A_SECRET_KEY') # xxxxxxxxxxxx
S3A_REGION = userdata.get('S3A_REGION') # ap-northeast-1

# NSRLJPデータセットの取得

In [171]:
!wget https://www.kazamiya.net/files/projects/NSRLJP_202408.7z
!7z x NSRLJP_202408.7z
!mv ./NSRLJP_202408/NSRLFile{.txt,.csv}
!mv ./NSRLJP_202408/NSRLMfg{.txt,.csv}
!mv ./NSRLJP_202408/NSRLOS{.txt,.csv}
!mv ./NSRLJP_202408/NSRLProd{.txt,.csv}

--2025-03-20 13:30:20--  https://www.kazamiya.net/files/projects/NSRLJP_202408.7z
Resolving www.kazamiya.net (www.kazamiya.net)... 203.143.110.17
Connecting to www.kazamiya.net (www.kazamiya.net)|203.143.110.17|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 319470599 (305M) [application/x-7z-compressed]
Saving to: ‘NSRLJP_202408.7z.1’


2025-03-20 13:30:48 (10.8 MB/s) - ‘NSRLJP_202408.7z.1’ saved [319470599/319470599]


7-Zip [64] 16.02 : Copyright (c) 1999-2016 Igor Pavlov : 2016-05-21
p7zip Version 16.02 (locale=en_US.UTF-8,Utf16=on,HugeFiles=on,64 bits,2 CPUs Intel(R) Xeon(R) CPU @ 2.20GHz (406F0),ASM,AES-NI)

Scanning the drive for archives:
  0M Scan         1 file, 319470599 bytes (305 MiB)

Extracting archive: NSRLJP_202408.7z
--
Path = NSRLJP_202408.7z
Type = 7z
Physical Size = 319470599
Headers Size = 275
Method = LZMA2:24
Solid = +
Blocks = 1

  0%      0% 1 - NSRLJP_202408/NSRLFile.txt   

## データセットをduckdbにimport

In [172]:
!pip install -qq duckdb

In [173]:
#!/usr/bin/env python
# encoding: utf-8

import duckdb
db = duckdb.connect("duck.db")
db.sql("""
-- 1. メーカーマスタテーブルの作成
DROP TABLE IF EXISTS NSRLMfg;
CREATE TABLE IF NOT EXISTS NSRLMfg (
    MfgCode TEXT PRIMARY KEY,
    MfgName TEXT NOT NULL
);

-- 2. OSマスタテーブルの作成
DROP TABLE IF EXISTS NSRLOS;
CREATE TABLE IF NOT EXISTS NSRLOS (
    OpSystemCode TEXT PRIMARY KEY,
    OpSystemName TEXT NOT NULL,
    OpSystemVersion TEXT,
    MfgCode TEXT,
);

-- 3. 製品マスタテーブルの作成
DROP TABLE IF EXISTS NSRLProd;
CREATE TABLE IF NOT EXISTS NSRLProd (
    ProductCode TEXT PRIMARY KEY,
    ProductName TEXT NOT NULL,
    ProductVersion TEXT,
    OpSystemCode TEXT,
    MfgCode TEXT,
    Language TEXT,
    ApplicationType TEXT,
);

-- 4. NSRLファイル情報テーブルの作成
DROP TABLE IF EXISTS NSRLFile;
CREATE TABLE IF NOT EXISTS NSRLFile (
    SHA1 TEXT PRIMARY KEY,
    MD5 TEXT NOT NULL,
    CRC32 TEXT NOT NULL,
    FileName TEXT NOT NULL,
    FileSize TEXT NOT NULL,
    ProductCode TEXT,
    OpSystemCode TEXT,
    SpecialCode TEXT,
);

-- 5. インデックスの作成
CREATE INDEX IF NOT EXISTS idx_md5 ON NSRLFile (MD5);

-- 6. データのインポート
COPY NSRLMfg FROM './NSRLJP_202408/NSRLMfg.csv';
COPY NSRLProd FROM './NSRLJP_202408/NSRLProd.csv';
COPY NSRLOS FROM './NSRLJP_202408/NSRLOS.csv';
COPY NSRLFile FROM './NSRLJP_202408/NSRLFile.csv';
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

## データが入っているか確認

In [174]:
db.sql("""
SELECT
    *
FROM
    NSRLFile
LEFT JOIN
    NSRLProd ON NSRLFile.ProductCode = NSRLProd.ProductCode
LEFT JOIN
    NSRLOS ON NSRLFile.OpSystemCode = NSRLOS.OpSystemCode
LEFT JOIN
    NSRLMfg ON NSRLOS.MfgCode = NSRLMfg.MfgCode
WHERE
    SHA1 = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
AND
    NSRLFile.OpSystemCode = '1010'
""")

┌──────────────────────────────────────────┬──────────────────────────────────┬──────────┬─────────────┬──────────┬─────────────┬──────────────┬─────────────┬─────────────┬──────────────────┬────────────────┬──────────────┬─────────┬────────────┬────────────────────┬──────────────┬──────────────┬─────────────────┬─────────┬─────────┬───────────┐
│                   SHA1                   │               MD5                │  CRC32   │  FileName   │ FileSize │ ProductCode │ OpSystemCode │ SpecialCode │ ProductCode │   ProductName    │ ProductVersion │ OpSystemCode │ MfgCode │  Language  │  ApplicationType   │ OpSystemCode │ OpSystemName │ OpSystemVersion │ MfgCode │ MfgCode │  MfgName  │
│                 varchar                  │             varchar              │ varchar  │   varchar   │ varchar  │   varchar   │   varchar    │   varchar   │   varchar   │     varchar      │    varchar     │   varchar    │ varchar │  varchar   │      varchar       │   varchar    │   varchar    │     va

# ApacheSpark3.5.5, Iceberg 1.8.1の利用

## OpenJDK 17 (Java7)のインストール

In [175]:
# Java https://jdk.java.net/archive/
!wget https://download.java.net/java/GA/jdk17/0d483333a00540d886896bac774ff48b/35/GPL/openjdk-17_linux-x64_bin.tar.gz
!tar xzf openjdk-17_linux-x64_bin.tar.gz
%env JAVA_HOME=/content/jdk-17

--2025-03-20 13:33:01--  https://download.java.net/java/GA/jdk17/0d483333a00540d886896bac774ff48b/35/GPL/openjdk-17_linux-x64_bin.tar.gz
Resolving download.java.net (download.java.net)... 23.193.24.98
Connecting to download.java.net (download.java.net)|23.193.24.98|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 186661523 (178M) [application/x-gzip]
Saving to: ‘openjdk-17_linux-x64_bin.tar.gz.1’


2025-03-20 13:33:03 (105 MB/s) - ‘openjdk-17_linux-x64_bin.tar.gz.1’ saved [186661523/186661523]

env: JAVA_HOME=/content/jdk-17


## PySpark3.5.5のインストール

In [176]:
!pip install -qq pyspark==3.5.5

## SparkSessionの作成、icebergテーブルの作成とデータのアップロード

In [177]:
#!/usr/bin/env python
# encoding: utf-8

import os
from pyspark.sql import SparkSession

CATALOG_NAME = "catalog_iceberg"
WAREHOUSE_URL = "s3a://datalakehouse/iceberg"

spark: SparkSession = (
    SparkSession.builder.master("local[*,3]")
    .appName("app_nsrljp")
    # .config("spark.driver.memory", "2g")
    # .config("spark.executor.memory", "2g")
    # .config("spark.log.level", "DEBUG")
    .config(
        "spark.jars.packages",
        f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1"
        f",org.apache.hadoop:hadoop-aws:3.3.4",
    )
    .config(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    )
    .config(
        f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog"
    )
    .config(
        f"spark.sql.catalog.{CATALOG_NAME}.type", "hadoop"
    )  # https://iceberg.apache.org/docs/1.5.0/spark-configuration/
    .config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", f"{WAREHOUSE_URL}")
    .config(f"spark.sql.defaultCatalog", f"{CATALOG_NAME}")
    .config("spark.hadoop.fs.s3a.endpoint.region", S3A_REGION or "")
    .config("spark.hadoop.fs.s3a.endpoint", S3A_ENDPOINT or "")
    .config("spark.hadoop.fs.s3a.access.key", S3A_ACCESS_KEY or "")
    .config("spark.hadoop.fs.s3a.secret.key", S3A_SECRET_KEY or "")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.path.style.access", True)
    .getOrCreate()
)

DATABASE_NAME = "NSRLJP"

spark.sql(f"""DROP TABLE IF EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg PURGE""")

spark.sql(
    f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg (
    MfgCode STRING,
    MfgName STRING
) USING ICEBERG
LOCATION '{WAREHOUSE_URL}/{DATABASE_NAME}/NSRLMfg/'
"""
)

spark.sql(f"""DROP TABLE IF EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS PURGE""")

spark.sql(
    f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS (
    OpSystemCode STRING,
    OpSystemName STRING,
    OpSystemVersion STRING,
    MfgCode STRING
) USING ICEBERG
LOCATION '{WAREHOUSE_URL}/{DATABASE_NAME}/NSRLOS/'
"""
)

spark.sql(f"""DROP TABLE IF EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd PURGE""")

spark.sql(
    f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd (
    ProductCode STRING,
    ProductName STRING,
    ProductVersion STRING,
    OpSystemCode STRING,
    MfgCode STRING,
    Language STRING,
    ApplicationType STRING
) USING ICEBERG
LOCATION '{WAREHOUSE_URL}/{DATABASE_NAME}/NSRLProd/'
"""
)

spark.sql(f"""DROP TABLE IF EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile PURGE""")

spark.sql(
    f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile (
    `SHA-1` STRING,
    MD5 STRING,
    CRC32 STRING,
    FileName STRING,
    FileSize STRING,
    ProductCode STRING,
    OpSystemCode STRING,
    SpecialCode STRING
) USING ICEBERG
PARTITIONED BY (OpSystemCode)
LOCATION '{WAREHOUSE_URL}/{DATABASE_NAME}/NSRLFile/'
"""
)

spark.read.option("inferSchema", True).option("header", True).csv(
    "./NSRLJP_202408/NSRLMfg.csv"
).writeTo(f"{CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg").overwritePartitions()

spark.read.option("inferSchema", True).option("header", True).csv(
    "./NSRLJP_202408/NSRLOS.csv"
).writeTo(f"{CATALOG_NAME}.{DATABASE_NAME}.NSRLOS").overwritePartitions()

spark.read.option("inferSchema", True).option("header", True).csv(
    "./NSRLJP_202408/NSRLProd.csv"
).writeTo(f"{CATALOG_NAME}.{DATABASE_NAME}.NSRLProd").overwritePartitions()

spark.read.option("inferSchema", True).option("header", True).csv(
    "./NSRLJP_202408/NSRLFile.csv"
).writeTo(f"{CATALOG_NAME}.{DATABASE_NAME}.NSRLFile").overwritePartitions()

`You are using approximately 326.79 MiB of the 20 GiB limit of free combined Object Storage and Archive Storage. Upgrade to use unlimited storage.`

## SQLの試験的な実行

### PySparkによるクエリ

In [178]:
# 通常のクエリ
spark.sql(
    f"""
SELECT
    *
FROM
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile
LEFT JOIN
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.ProductCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd.ProductCode
LEFT JOIN
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.OpSystemCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS.OpSystemCode
LEFT JOIN
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS.MfgCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg.MfgCode
WHERE
    `SHA-1` = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
"""
).show()

+--------------------+--------------------+--------+-----------+--------+-----------+------------+-----------+-----------+--------------+--------------+------------+-------+--------+----------------+------------+------------+---------------+-------+-------+---------+
|               SHA-1|                 MD5|   CRC32|   FileName|FileSize|ProductCode|OpSystemCode|SpecialCode|ProductCode|   ProductName|ProductVersion|OpSystemCode|MfgCode|Language| ApplicationType|OpSystemCode|OpSystemName|OpSystemVersion|MfgCode|MfgCode|  MfgName|
+--------------------+--------------------+--------+-----------+--------+-----------+------------+-----------+-----------+--------------+--------------+------------+-------+--------+----------------+------------+------------+---------------+-------+-------+---------+
|003f1b9a60927b2c4...|220a7215a63faa737...|5E9769BF|Desktop.ini|     798|      50047|        1010|       NULL|      50047|Windows 10 x86|         2015-|        1010|   5001|Japanese|Operating Syst

In [179]:
# パーティション（フォルダ）による最適化が走るクエリ
spark.sql(
    f"""
SELECT
    *
FROM
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile
LEFT JOIN
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.ProductCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd.ProductCode
LEFT JOIN
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.OpSystemCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS.OpSystemCode
LEFT JOIN
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS.MfgCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg.MfgCode
WHERE
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.`SHA-1` = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
AND
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.OpSystemCode = '1010'
"""
).show()

+--------------------+--------------------+--------+-----------+--------+-----------+------------+-----------+-----------+--------------+--------------+------------+-------+--------+----------------+------------+------------+---------------+-------+-------+---------+
|               SHA-1|                 MD5|   CRC32|   FileName|FileSize|ProductCode|OpSystemCode|SpecialCode|ProductCode|   ProductName|ProductVersion|OpSystemCode|MfgCode|Language| ApplicationType|OpSystemCode|OpSystemName|OpSystemVersion|MfgCode|MfgCode|  MfgName|
+--------------------+--------------------+--------+-----------+--------+-----------+------------+-----------+-----------+--------------+--------------+------------+-------+--------+----------------+------------+------------+---------------+-------+-------+---------+
|003f1b9a60927b2c4...|220a7215a63faa737...|5E9769BF|Desktop.ini|     798|      50047|        1010|       NULL|      50047|Windows 10 x86|         2015-|        1010|   5001|Japanese|Operating Syst

### DuckDBによるクエリ

In [180]:
# Extensionsをロード
import duckdb
duckdb.sql("""
INSTALL httpfs;
INSTALL iceberg;
LOAD iceberg;
LOAD httpfs;
""")

# S3用クレデンシャルを作成

duckdb.sql(f"""
DROP SECRET IF EXISTS oci_storage;
CREATE SECRET IF NOT EXISTS  oci_storage (
    TYPE s3,
    REGION '{S3A_REGION}',
    ENDPOINT '{S3A_ENDPOINT}',
    KEY_ID '{S3A_ACCESS_KEY}',
    SECRET '{S3A_SECRET_KEY}',
    URL_STYLE 'path'
);
""")

┌─────────┐
│ Success │
│ boolean │
├─────────┤
│ true    │
└─────────┘

In [181]:
# 通常のクエリ
duckdb.sql("""
WITH NSRLFile AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLFile',
            allow_moved_paths = true
        )
), NSRLProd AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLProd',
            allow_moved_paths = true
        )
), NSRLOS AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLOS',
            allow_moved_paths = true
        )
), NSRLMfg AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLMfg',
            allow_moved_paths = true
        )
)
SELECT
    *
FROM
    NSRLFile
LEFT JOIN
    NSRLProd ON NSRLFile.ProductCode = NSRLProd.ProductCode
LEFT JOIN
    NSRLOS ON NSRLFile.OpSystemCode = NSRLOS.OpSystemCode
LEFT JOIN
    NSRLMfg ON NSRLOS.MfgCode = NSRLMfg.MfgCode
WHERE
    NSRLFile."SHA-1" = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

┌──────────────────────────────────────────┬──────────────────────────────────┬──────────┬─────────────┬──────────┬─────────────┬──────────────┬─────────────┬─────────────┬────────────────┬────────────────┬──────────────┬─────────┬──────────┬──────────────────┬──────────────┬──────────────┬─────────────────┬─────────┬─────────┬───────────┐
│                  SHA-1                   │               MD5                │  CRC32   │  FileName   │ FileSize │ ProductCode │ OpSystemCode │ SpecialCode │ ProductCode │  ProductName   │ ProductVersion │ OpSystemCode │ MfgCode │ Language │ ApplicationType  │ OpSystemCode │ OpSystemName │ OpSystemVersion │ MfgCode │ MfgCode │  MfgName  │
│                 varchar                  │             varchar              │ varchar  │   varchar   │ varchar  │   varchar   │   varchar    │   varchar   │   varchar   │    varchar     │    varchar     │   varchar    │ varchar │ varchar  │     varchar      │   varchar    │   varchar    │     varchar     │ varcha

In [182]:
# パーティション（フォルダ）による最適化が走るクエリ
duckdb.sql("""
WITH NSRLFile AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLFile',
            allow_moved_paths = true
        )
), NSRLProd AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLProd',
            allow_moved_paths = true
        )
), NSRLOS AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLOS',
            allow_moved_paths = true
        )
), NSRLMfg AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLMfg',
            allow_moved_paths = true
        )
)
SELECT
    *
FROM
    NSRLFile
LEFT JOIN
    NSRLProd ON NSRLFile.ProductCode = NSRLProd.ProductCode
LEFT JOIN
    NSRLOS ON NSRLFile.OpSystemCode = NSRLOS.OpSystemCode
LEFT JOIN
    NSRLMfg ON NSRLOS.MfgCode = NSRLMfg.MfgCode
WHERE
    NSRLFile."SHA-1" = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
AND
    NSRLFile.OpSystemCode = '1010'
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

┌──────────────────────────────────────────┬──────────────────────────────────┬──────────┬─────────────┬──────────┬─────────────┬──────────────┬─────────────┬─────────────┬────────────────┬────────────────┬──────────────┬─────────┬──────────┬──────────────────┬──────────────┬──────────────┬─────────────────┬─────────┬─────────┬───────────┐
│                  SHA-1                   │               MD5                │  CRC32   │  FileName   │ FileSize │ ProductCode │ OpSystemCode │ SpecialCode │ ProductCode │  ProductName   │ ProductVersion │ OpSystemCode │ MfgCode │ Language │ ApplicationType  │ OpSystemCode │ OpSystemName │ OpSystemVersion │ MfgCode │ MfgCode │  MfgName  │
│                 varchar                  │             varchar              │ varchar  │   varchar   │ varchar  │   varchar   │   varchar    │   varchar   │   varchar   │    varchar     │    varchar     │   varchar    │ varchar │ varchar  │     varchar      │   varchar    │   varchar    │     varchar     │ varcha

# Iceberg1.8.1の機能

## Sparkセッションを再作成

In [183]:
###### sparkセッションを念の為再度作成
import os
from pyspark.sql import SparkSession
from google.colab import userdata

S3A_ENDPOINT = userdata.get('S3A_ENDPOINT') # xxxxxxx.compat.objectstorage.{S3A_REGION}.oraclecloud.com
S3A_ACCESS_KEY = userdata.get('S3A_ACCESS_KEY') # xxxxxxxxxxxx
S3A_SECRET_KEY = userdata.get('S3A_SECRET_KEY') # xxxxxxxxxxxx
S3A_REGION = userdata.get('S3A_REGION') # ap-northeast-1
CATALOG_NAME = "catalog_iceberg"
DATABASE_NAME = "NSRLJP"
WAREHOUSE_URL = "s3a://datalakehouse/iceberg"
PATH_NSRLFILE = f'{WAREHOUSE_URL}/{DATABASE_NAME}/NSRLFile/'

spark: SparkSession = (
    SparkSession.builder.master("local[*,3]")
    .appName("app_nsrljp")
    # .config("spark.driver.memory", "2g")
    # .config("spark.executor.memory", "2g")
    # .config("spark.log.level", "DEBUG")
    .config(
        "spark.jars.packages",
        f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1"
        f",org.apache.hadoop:hadoop-aws:3.3.4",
    )
    .config(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    )
    .config(
        f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog"
    )
    .config(
        f"spark.sql.catalog.{CATALOG_NAME}.type", "hadoop"
    )  # https://iceberg.apache.org/docs/1.5.0/spark-configuration/
    .config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", f"{WAREHOUSE_URL}")
    .config(f"spark.sql.defaultCatalog", f"{CATALOG_NAME}")
    .config("spark.hadoop.fs.s3a.endpoint.region", S3A_REGION or "")
    .config("spark.hadoop.fs.s3a.endpoint", S3A_ENDPOINT or "")
    .config("spark.hadoop.fs.s3a.access.key", S3A_ACCESS_KEY or "")
    .config("spark.hadoop.fs.s3a.secret.key", S3A_SECRET_KEY or "")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.path.style.access", True)
    .getOrCreate()
)

###### duckdbセッションも念の為再度作成
import duckdb
# Extensionsをロード
duckdb.sql("""
INSTALL httpfs;
INSTALL iceberg;
LOAD iceberg;
LOAD httpfs;
""")

# S3用クレデンシャルを作成
duckdb.sql(f"""
DROP SECRET IF EXISTS oci_storage;
CREATE SECRET IF NOT EXISTS  oci_storage (
    TYPE s3,
    REGION '{S3A_REGION}',
    ENDPOINT '{S3A_ENDPOINT}',
    KEY_ID '{S3A_ACCESS_KEY}',
    SECRET '{S3A_SECRET_KEY}',
    URL_STYLE 'path'
);
""")


┌─────────┐
│ Success │
│ boolean │
├─────────┤
│ true    │
└─────────┘

### 確認の為boto3ライブラリをインストール

### デフォルトではCopy on Write

In [184]:
# Copy on Writeであることを確認するには　SHOW TBLPROPERTIES　を使う。デフォルトがCoWである。
spark.sql(f'SHOW TBLPROPERTIES {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile').show(truncate=False)

+-------------------------------+-------------------+
|key                            |value              |
+-------------------------------+-------------------+
|current-snapshot-id            |2667036507751922659|
|format                         |iceberg/parquet    |
|format-version                 |2                  |
|write.parquet.compression-codec|zstd               |
+-------------------------------+-------------------+



In [185]:
!pip install -qq boto3

#### s3a://datalakehouse/iceberg/NSRLJP/NSRLFile の中身を確認

In [186]:
import json
import boto3
S3A_BUCKET = "datalakehouse"
S3A_URL = "https://" + S3A_ENDPOINT
s3 = boto3.client(service_name="s3",endpoint_url=S3A_URL, region_name=S3A_REGION, aws_access_key_id=S3A_ACCESS_KEY, aws_secret_access_key=S3A_SECRET_KEY)
objects = s3.list_objects(Bucket=S3A_BUCKET, Prefix="iceberg/NSRLJP/NSRLFile")
for obj in objects["Contents"]:
    print(obj["Key"])

iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1001/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00013.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1002/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00012.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1003/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00011.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1004/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00003.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1005/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00007.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1006/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00005.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1007/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00006.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1008/00000-80-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00001.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1008/00001-81-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00001.parquet
iceberg/NSRLJP/NSRL

#### Copy-on-Writeの挙動の確認

##### データの追加

In [187]:
# Copy on Writeでは削除も更新も追加も関係ない。全てのレコードを新しいファイルにとして別ファイルとしてコピーする。
spark.sql(f"""
INSERT INTO {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile
VALUES ('test', 'test', 'test', 'test', 'test', 'test', 'test', 'test')
""")

DataFrame[]

In [188]:
# OpSystemCode=testのパーティションが作成されている
objects = s3.list_objects(Bucket=S3A_BUCKET, Prefix="iceberg/NSRLJP/NSRLFile")
for obj in objects["Contents"]:
    print(obj["Key"])

iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1001/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00013.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1002/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00012.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1003/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00011.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1004/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00003.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1005/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00007.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1006/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00005.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1007/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00006.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1008/00000-80-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00001.parquet
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1008/00001-81-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00001.parquet
iceberg/NSRLJP/NSRL

##### データの削除

In [189]:
# 先ほど追加したデータを削除してみる
spark.sql(f"""
DELETE FROM {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile
WHERE `SHA-1` = 'test'
""")

DataFrame[]

In [190]:
# OpSystemCode=testのレコードを削除したにもかかわらず、パーティションが残っている
objects = s3.list_objects(Bucket=S3A_BUCKET, Prefix="iceberg/NSRLJP/NSRLFile")
for obj in objects["Contents"]:
    print(obj["Key"],obj["LastModified"])

iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1001/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00013.parquet 2025-03-20 13:35:15+00:00
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1002/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00012.parquet 2025-03-20 13:35:15+00:00
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1003/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00011.parquet 2025-03-20 13:35:14+00:00
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1004/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00003.parquet 2025-03-20 13:35:13+00:00
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1005/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00007.parquet 2025-03-20 13:35:13+00:00
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1006/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00005.parquet 2025-03-20 13:35:13+00:00
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1007/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00006.parquet 2025-03-20 13:35:12+00:00
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1008/00000-80

##### データの参照

###### PySparkによるデータの参照

In [191]:
# スナップショットを確認する
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.snapshots;
""").show(truncate=False)
# 上から、committed_atとoperationに注目。
# overwriteは最初のデータ一括登録の時のスナップショット
# appendはtestデータを追加したときのスナップショット
# deleteはtestデータを削除したときのスナップショット

+-----------------------+-------------------+-------------------+---------+-------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                            |summary                                                                                       

In [192]:
# 先ほど削除したデータをクエリしてみると、削除されているように見える。
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile
WHERE `SHA-1` = 'test'
""").show()

+-----+---+-----+--------+--------+-----------+------------+-----------+
|SHA-1|MD5|CRC32|FileName|FileSize|ProductCode|OpSystemCode|SpecialCode|
+-----+---+-----+--------+--------+-----------+------------+-----------+
+-----+---+-----+--------+--------+-----------+------------+-----------+



###### PySparkによるデータの参照（タイムトラベル）

In [194]:
# 先ほど削除したデータを一個前のスナップショットに遡ってクエリしてみると、復活してる。
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile VERSION AS OF 2320022768625959900
WHERE `SHA-1` = 'test'
""").show()

+-----+----+-----+--------+--------+-----------+------------+-----------+
|SHA-1| MD5|CRC32|FileName|FileSize|ProductCode|OpSystemCode|SpecialCode|
+-----+----+-----+--------+--------+-----------+------------+-----------+
| test|test| test|    test|    test|       test|        test|       test|
+-----+----+-----+--------+--------+-----------+------------+-----------+



###### DuckDBによるデータの参照

In [195]:
# 先ほど削除したデータをクエリしてみると、削除されているように見える。
duckdb.sql(f"""
WITH NSRLFile AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLFile',
            allow_moved_paths = true
        )
)
SELECT
    *
FROM
    NSRLFile
WHERE
    "SHA-1" = 'test';
""").show()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

┌─────────┬─────────┬─────────┬──────────┬──────────┬─────────────┬──────────────┬─────────────┐
│  SHA-1  │   MD5   │  CRC32  │ FileName │ FileSize │ ProductCode │ OpSystemCode │ SpecialCode │
│ varchar │ varchar │ varchar │ varchar  │ varchar  │   varchar   │   varchar    │   varchar   │
├─────────┴─────────┴─────────┴──────────┴──────────┴─────────────┴──────────────┴─────────────┤
│                                            0 rows                                            │
└──────────────────────────────────────────────────────────────────────────────────────────────┘



###### DuckDBによるデータの参照（タイムトラベル）

In [196]:
# DuckDBでタイムスタンプとsnapshot-idを探す。
# ここで、ハマリポイントとして、sequence_numberとversionは紐づいていない点に注意する。
# 例えば、timestamp_msとmetadataはファイルの最終更新日のタイムスタンプを照合するとv3.metadata.jsonが該当する。
duckdb.sql("""
WITH NSRLFileSnapshots AS (
    SELECT
        *
    FROM
        iceberg_snapshots(
            's3://datalakehouse/iceberg/NSRLJP/NSRLFile'
        )
)
SELECT
    *
FROM
    NSRLFileSnapshots
""").show()

┌─────────────────┬─────────────────────┬─────────────────────────┬───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ sequence_number │     snapshot_id     │      timestamp_ms       │                                                       manifest_list                                                       │
│     uint64      │       uint64        │        timestamp        │                                                          varchar                                                          │
├─────────────────┼─────────────────────┼─────────────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│               1 │ 2667036507751922659 │ 2025-03-20 13:35:18.834 │ s3a://datalakehouse/iceberg/NSRLJP/NSRLFile/metadata/snap-2667036507751922659-1-0c0fc988-9655-4483-9808-533c60bf0e20.avro │
│               2 │ 2320022768625959900 

In [197]:
# 先ほど削除したデータを一個前のスナップショットに遡ってクエリしてみると、復活してる。
# ハマリポイントとして、duckdb SQLではカラムをブラケットで包む際は`ではなく"を使う。
duckdb.sql(f"""
WITH NSRLFile AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLFile',
            version = 3,
            allow_moved_paths = true
        )
)
SELECT
    *
FROM
    NSRLFile
WHERE
    "SHA-1" = 'test';
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

┌─────────┬─────────┬─────────┬──────────┬──────────┬─────────────┬──────────────┬─────────────┐
│  SHA-1  │   MD5   │  CRC32  │ FileName │ FileSize │ ProductCode │ OpSystemCode │ SpecialCode │
│ varchar │ varchar │ varchar │ varchar  │ varchar  │   varchar   │   varchar    │   varchar   │
├─────────┼─────────┼─────────┼──────────┼──────────┼─────────────┼──────────────┼─────────────┤
│ test    │ test    │ test    │ test     │ test     │ test        │ test         │ test        │
└─────────┴─────────┴─────────┴──────────┴──────────┴─────────────┴──────────────┴─────────────┘

##### データの更新

データの更新とは、ここでは、特定のカラムの値の更新のことを指す。  
GDPR対応などで値をマスキングする際などに使用する。

In [198]:
# 通常のUPDATE文を発行できる
# UPDATE (表名) SET (カラム名1) = (値1) WHERE (条件);
spark.sql(f"""
UPDATE {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile
SET FileName = 'REDACTED'
WHERE `SHA-1` = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
""")

DataFrame[]

In [199]:
# スナップショットを確認する
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.snapshots;
""").show(truncate=False)
# 上から、committed_atとoperationに注目。
# overwriteは最初のデータ一括登録の時のスナップショット
# appendはtestデータを追加したときのスナップショット
# deleteはtestデータを削除したときのスナップショット
# 最後のoverwriteはデータを更新したときのスナップショット

+-----------------------+-------------------+-------------------+---------+-------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                            |summary                                   

In [203]:
# OpSystemCode=testのレコードを更新したので、OpSystemCode=1010のparquetが追加されている。
# Copy on Writeなので、ほぼ同一のparquetが差分更新された状態で保存されているのが分かる。
objects = s3.list_objects(Bucket=S3A_BUCKET, Prefix="iceberg/NSRLJP/NSRLFile")
for obj in objects["Contents"]:
    print(obj["Key"],obj["LastModified"],obj["Size"])

iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1001/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00013.parquet 2025-03-20 13:35:15+00:00 4288624
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1002/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00012.parquet 2025-03-20 13:35:15+00:00 3203324
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1003/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00011.parquet 2025-03-20 13:35:14+00:00 7039101
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1004/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00003.parquet 2025-03-20 13:35:13+00:00 1746465
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1005/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00007.parquet 2025-03-20 13:35:13+00:00 2854162
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1006/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00005.parquet 2025-03-20 13:35:13+00:00 6749538
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1007/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00006.parquet 2025-03-20 13:35:12+00:00 1828949

In [207]:
# 最後のMetadataを見てみる。
# "sequence-number" : 4のスナップショットでは以下のような記録がされており
# .avroファイルの中身を見なくとも、更新処理がどのように行われたか推測できる。
#   "added-data-files" : "1",
#   "deleted-data-files" : "1",
#   "added-records" : "694619",
#   "deleted-records" : "694619",
#   "added-files-size" : "44661715",
#   "removed-files-size" : "44661711",
last_metadata = s3.get_object(Bucket=S3A_BUCKET, Key="iceberg/NSRLJP/NSRLFile/metadata/v5.metadata.json").get("Body").read().decode()
print(last_metadata)

{
  "format-version" : 2,
  "table-uuid" : "6c65519e-81ee-4a05-b02c-0ad694466bc9",
  "location" : "s3a://datalakehouse/iceberg/NSRLJP/NSRLFile",
  "last-sequence-number" : 4,
  "last-updated-ms" : 1742478227246,
  "last-column-id" : 8,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "SHA-1",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 2,
      "name" : "MD5",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 3,
      "name" : "CRC32",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 4,
      "name" : "FileName",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 5,
      "name" : "FileSize",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 6,
      "name" : "ProductCode",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 7,
      "name" : "OpSystemCode",
     

In [208]:
# 更新されたレコードを参照する
spark.sql(f"""
SELECT
    *
FROM
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile
WHERE
    `SHA-1` = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
""").show(truncate=False)

+----------------------------------------+--------------------------------+--------+--------+--------+-----------+------------+-----------+
|SHA-1                                   |MD5                             |CRC32   |FileName|FileSize|ProductCode|OpSystemCode|SpecialCode|
+----------------------------------------+--------------------------------+--------+--------+--------+-----------+------------+-----------+
|003f1b9a60927b2c429412cb4698907e0a0c68ce|220a7215a63faa737f49f2d5e6a6f8ca|5E9769BF|REDACTED|798     |50047      |1010        |NULL       |
+----------------------------------------+--------------------------------+--------+--------+--------+-----------+------------+-----------+



##### データのロールバック

In [210]:
# ここまでの処理をロールバックしたい時はタイムトラベルの際にも使用したスナップショットを使う。
# まずはどこまで戻したいかを調べる為、スナップショットを確認する。
# 今回は最初のデータに戻したいと思うので、最初のoverwriteのsnapshot_idをメモ。
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.snapshots;
""").show(truncate=False)

+-----------------------+-------------------+-------------------+---------+-------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                            |summary                                   

In [213]:
# ロールバック自体は非常に簡単
# ハマリポイントは、最後の;が要らないという点。
spark.sql(f"""
CALL {CATALOG_NAME}.system.rollback_to_snapshot('{CATALOG_NAME}.{DATABASE_NAME}.NSRLFile', 2667036507751922659)
""")

DataFrame[previous_snapshot_id: bigint, current_snapshot_id: bigint]

In [216]:
# 更新したレコードが元に戻っているか確認
spark.sql(f"""
SELECT
    *
FROM
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile
WHERE
    `SHA-1` = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
""").show(truncate=False)

+----------------------------------------+--------------------------------+--------+-----------+--------+-----------+------------+-----------+
|SHA-1                                   |MD5                             |CRC32   |FileName   |FileSize|ProductCode|OpSystemCode|SpecialCode|
+----------------------------------------+--------------------------------+--------+-----------+--------+-----------+------------+-----------+
|003f1b9a60927b2c429412cb4698907e0a0c68ce|220a7215a63faa737f49f2d5e6a6f8ca|5E9769BF|Desktop.ini|798     |50047      |1010        |NULL       |
+----------------------------------------+--------------------------------+--------+-----------+--------+-----------+------------+-----------+



In [217]:
# 追加したレコードが無くなったか確認
spark.sql(f"""
SELECT
    *
FROM
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile
WHERE
    `SHA-1` = 'test'
""").show(truncate=False)

+-----+---+-----+--------+--------+-----------+------------+-----------+
|SHA-1|MD5|CRC32|FileName|FileSize|ProductCode|OpSystemCode|SpecialCode|
+-----+---+-----+--------+--------+-----------+------------+-----------+
+-----+---+-----+--------+--------+-----------+------------+-----------+



In [215]:
# ロールバックした後もスナップショットは保持される。
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.snapshots;
""").show(truncate=False)
# ロールバックした後もデータは残る。
objects = s3.list_objects(Bucket=S3A_BUCKET, Prefix="iceberg/NSRLJP/NSRLFile")
for obj in objects["Contents"]:
    print(obj["Key"],obj["LastModified"],obj["Size"])

+-----------------------+-------------------+-------------------+---------+-------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                            |summary                                   

In [220]:
# ロールバックのロールバックはset_current_snapshotを使用する。
# この場合、未来のsnapshotが適用されるので、それまでの新しい変更を破壊的に変更する。
spark.sql(f"""
CALL {CATALOG_NAME}.system.set_current_snapshot('{CATALOG_NAME}.{DATABASE_NAME}.NSRLFile', 1867896334512928496)
""")

DataFrame[previous_snapshot_id: bigint, current_snapshot_id: bigint]

In [221]:
# 更新したレコードを確認
spark.sql(f"""
SELECT
    *
FROM
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile
WHERE
    `SHA-1` = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
""").show(truncate=False)

+----------------------------------------+--------------------------------+--------+--------+--------+-----------+------------+-----------+
|SHA-1                                   |MD5                             |CRC32   |FileName|FileSize|ProductCode|OpSystemCode|SpecialCode|
+----------------------------------------+--------------------------------+--------+--------+--------+-----------+------------+-----------+
|003f1b9a60927b2c429412cb4698907e0a0c68ce|220a7215a63faa737f49f2d5e6a6f8ca|5E9769BF|REDACTED|798     |50047      |1010        |NULL       |
+----------------------------------------+--------------------------------+--------+--------+--------+-----------+------------+-----------+



In [222]:
# ロールバックした後もスナップショットは保持される。
spark.sql(f"""
SELECT * FROM {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.snapshots;
""").show(truncate=False)
# ロールバックした後もデータは残る。
objects = s3.list_objects(Bucket=S3A_BUCKET, Prefix="iceberg/NSRLJP/NSRLFile")
for obj in objects["Contents"]:
    print(obj["Key"],obj["LastModified"],obj["Size"])

+-----------------------+-------------------+-------------------+---------+-------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                            |summary                                   

##### データのコンパクト化(コンパクション)

In [225]:
# OpSystemCode=1008は2つのファイルに分割されているが、サイズの偏りが激しい
# iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1008/00000-80-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00001.parquet 2025-03-20 13:35:12+00:00 167274974
# iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1008/00001-81-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00001.parquet 2025-03-20 13:34:51+00:00 53543417
# 元のサイズが大したことないが・・・167MBと53MBなので、最大ファイルサイズを50MBにして均等配分してみる。
objects = s3.list_objects(Bucket=S3A_BUCKET, Prefix="iceberg/NSRLJP/NSRLFile")
for obj in objects["Contents"]:
    print(obj["Key"],obj["LastModified"],obj["Size"])

iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1001/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00013.parquet 2025-03-20 13:35:15+00:00 4288624
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1002/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00012.parquet 2025-03-20 13:35:15+00:00 3203324
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1003/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00011.parquet 2025-03-20 13:35:14+00:00 7039101
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1004/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00003.parquet 2025-03-20 13:35:13+00:00 1746465
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1005/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00007.parquet 2025-03-20 13:35:13+00:00 2854162
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1006/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00005.parquet 2025-03-20 13:35:13+00:00 6749538
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1007/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00006.parquet 2025-03-20 13:35:12+00:00 1828949

In [228]:
# 個々のparquetファイルのサイズを最大50MBにするには、max-file-size-bytesオプションを使用する
FSIZE_MAX = 52428800 # 50MB
spark.sql(f"""
CALL {CATALOG_NAME}.system.rewrite_data_files(table => '{CATALOG_NAME}.{DATABASE_NAME}.NSRLFile', options => map('max-file-size-bytes','{str(FSIZE_MAX)}'))
""")

IllegalArgumentException: 'target-file-size-bytes' (536870912) must be < 'max-file-size-bytes' (52428800), all new files will be larger than the max threshold

In [227]:
# ちゃんと分割されているか見てみる
objects = s3.list_objects(Bucket=S3A_BUCKET, Prefix="iceberg/NSRLJP/NSRLFile")
for obj in objects["Contents"]:
    print(obj["Key"],obj["LastModified"],obj["Size"])

iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1001/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00013.parquet 2025-03-20 13:35:15+00:00 4288624
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1002/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00012.parquet 2025-03-20 13:35:15+00:00 3203324
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1003/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00011.parquet 2025-03-20 13:35:14+00:00 7039101
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1004/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00003.parquet 2025-03-20 13:35:13+00:00 1746465
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1005/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00007.parquet 2025-03-20 13:35:13+00:00 2854162
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1006/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00005.parquet 2025-03-20 13:35:13+00:00 6749538
iceberg/NSRLJP/NSRLFile/data/OpSystemCode=1007/00002-82-ec6cf2b1-556d-498e-b6f0-5db33c6ee7dc-0-00006.parquet 2025-03-20 13:35:12+00:00 1828949