# Iceberg Workshop

## 环境准备

#### 设置环境

请提替换 `spark.sql.catalog.glue_catalog.warehouse` 为自己创建的 S3 桶

In [1]:
%%configure -f
{
"conf":{
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.catalog.glue_catalog":"org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.glue_catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
    "spark.sql.catalog.glue_catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "spark.sql.catalog.glue_catalog.warehouse":"s3://myemr-bucket-01/data/iceberg-folder/"
    }
}

#### 创建数据库

In [4]:
%%sql
create database iceberg_db;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

### 建表
创建MOR表，设置参数 <br>
`'write.metadata.delete-after-commit.enabled' = 'true'`  控制是否在提交后删除最旧的跟踪版本元数据文件 <br>
`'write.metadata.previous-versions-max' = '5'` 提交后删除之前保留的先前版本元数据文件的最大数量 <br>
`'history.expire.max-snapshot-age-ms' = '86400'`  快照过期时，在表及其所有分支上保留快照的默认最大时间，默认 432000000 (5 days) <br>
`'history.expire.min-snapshots-to-keep' = '1'`  在快照过期时，在表及其所有分支上保留快照的默认最小数量 默认 1 <br>
`'write.update.mode' = 'merge-on-read'` 写更新模式 <br>
`'write.delete.mode' = 'merge-on-read'` 写删除模式 <br>
`'write.merge.mode' = 'merge-on-read'`   写合并模式 <br>

**需要注意，在SparkSQL中，读写iceberg表需要带上环境配置指定的 Catalog 名称**

对于其他参数，可以参考社区
<url>https://iceberg.apache.org/docs/latest/spark-configuration/</url>

In [17]:
%%sql
CREATE TABLE glue_catalog.iceberg_db.sample_table (
id int,
a string,
b int
)
USING iceberg
LOCATION 's3://myemr-bucket-01/data/iceberg-folder/iceberg_db.db/sample_table/'
TBLPROPERTIES (
    'format' = 'iceberg/parquet',
    'format-version' = '2',
    'write.metadata.delete-after-commit.enabled' = 'true',
    'write.metadata.previous-versions-max' = '5',
    'history.expire.max-snapshot-age-ms' = '86400',
    'history.expire.min-snapshots-to-keep' = '1',
    'write.update.mode' = 'merge-on-read',
    'write.delete.mode' = 'merge-on-read',
    'write.merge.mode' = 'merge-on-read'
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

### 插入数据

In [18]:
%%sql
insert into glue_catalog.iceberg_db.sample_table values 
(1,'test01',10),
(2,'test02',20),
(3,'test03',30);

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

#### 查看插入的数据

In [19]:
%%sql
select * from glue_catalog.iceberg_db.sample_table;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

#### 更新数据

In [20]:
%%sql
update glue_catalog.iceberg_db.sample_table set a = 'test01_update' where id = 1;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

#### 查看更新后的结果

In [21]:
%%sql
select * from glue_catalog.iceberg_db.sample_table where id = 1;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

### 查看写入的文件
1. 通过 iceberg 自带的命令，查看文件

In [34]:
%%sql
select file_path,file_format,file_size_in_bytes from glue_catalog.iceberg_db.sample_table.files

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

2. 查看 S3 上的文件 <br>
我们可以看到在 S3 的目录中已经写入了文件

In [26]:
%%sh
aws s3 ls s3://myemr-bucket-01/data/iceberg-folder/iceberg_db.db/sample_table/data/

2024-01-26 13:25:10       1433 00000-2b226b11-4a8d-4d64-9bed-15c0e7c89a47.metadata.json
2024-01-26 13:25:48       2575 00001-99a6103e-d04c-4f83-9953-680ea7774214.metadata.json
2024-01-26 13:27:56       3779 00002-b2dc8357-17cf-4d6f-bb74-4b84a6023808.metadata.json
2024-01-26 13:25:48       6795 97ce98ba-e01b-46fa-81ed-980654d11064-m0.avro
2024-01-26 13:27:56       6755 edc8a54a-fffd-46de-a005-5319e6f01ba3-m0.avro
2024-01-26 13:27:56       6797 edc8a54a-fffd-46de-a005-5319e6f01ba3-m1.avro
2024-01-26 13:27:56       4370 snap-7375920400988091942-1-edc8a54a-fffd-46de-a005-5319e6f01ba3.avro
2024-01-26 13:25:48       4281 snap-9013261095390007612-1-97ce98ba-e01b-46fa-81ed-980654d11064.avro


3. 尝试删除一些记录，再来观察

In [27]:
%%sql
delete from glue_catalog.iceberg_db.sample_table where id = 3

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [29]:
%%sql
select * from glue_catalog.iceberg_db.sample_table

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

这时我们可以看到在当前目录下已经有了一个 delete 后缀的文件

In [35]:
%%sh
aws s3 ls s3://myemr-bucket-01/data/iceberg-folder/iceberg_db.db/sample_table/data/

                           PRE /
2024-01-26 13:25:47        863 00000-0-65694c46-1036-4114-8a83-908c0f7318de-00001.parquet
2024-01-26 13:27:56        924 00000-4-db86d939-5b1d-4e79-b9f6-e89306f906e1-00001.parquet
2024-01-26 13:25:47        854 00001-1-65694c46-1036-4114-8a83-908c0f7318de-00001.parquet


可以看到元数据目录下，有三个版本的元数据文件，两个 manifest文件，两个manifestlist文件

In [40]:
%%sh
aws s3 ls s3://myemr-bucket-01/data/iceberg-folder/iceberg_db.db/sample_table/metadata/

2024-01-26 13:25:10       1433 00000-2b226b11-4a8d-4d64-9bed-15c0e7c89a47.metadata.json
2024-01-26 13:25:48       2575 00001-99a6103e-d04c-4f83-9953-680ea7774214.metadata.json
2024-01-26 13:27:56       3779 00002-b2dc8357-17cf-4d6f-bb74-4b84a6023808.metadata.json
2024-01-26 13:31:20       4922 00003-771ee2fe-909d-4476-9da9-937445f0436a.metadata.json
2024-01-26 13:25:48       6795 97ce98ba-e01b-46fa-81ed-980654d11064-m0.avro
2024-01-26 13:31:20       6796 cde2727e-9990-498f-8469-753d630541ec-m0.avro
2024-01-26 13:27:56       6755 edc8a54a-fffd-46de-a005-5319e6f01ba3-m0.avro
2024-01-26 13:27:56       6797 edc8a54a-fffd-46de-a005-5319e6f01ba3-m1.avro
2024-01-26 13:31:20       4415 snap-3579094904354232443-1-cde2727e-9990-498f-8469-753d630541ec.avro
2024-01-26 13:27:56       4370 snap-7375920400988091942-1-edc8a54a-fffd-46de-a005-5319e6f01ba3.avro
2024-01-26 13:25:48       4281 snap-9013261095390007612-1-97ce98ba-e01b-46fa-81ed-980654d11064.avro


### Iceberg 文件&元数据管理

以下介绍了通过 spark 对 iceberg 的小文件合并，过期文件清理等方法

### expire_snapshots

清理过期的 snapshot<br>

Iceberg 的每次 write/update/delete/upsert/compaction 都会产生一个新快照，同时保留旧数据和元数据，以实现快照隔离和时间旅行。expire_snapshots 程序可用于删除不再需要的旧快照及其文件。<br>

该存储过程将删除旧快照和这些旧快照唯一需要的数据文件。这意味着 expire_snapshots 存储过程永远不会删除非过期快照仍然需要的文件。<br>

In [46]:
%%sql
CALL glue_catalog.system.expire_snapshots(table => 'icebergdb.catalog_sales',older_than => TIMESTAMP '2024-01-11 03:10:00.000')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

清理过期快照后，我们可以在查看一下当前的数据文件

In [50]:
%%sh
aws s3 ls s3://myemr-bucket-01/data/iceberg-folder/icebergdb.db/catalog_sales/data/cs_sold_date_sk=2450815/

2024-01-11 01:50:19   82793375 00000-1013-fa88059a-14ae-4ca6-ae6c-3e652694edec-00001.parquet
2024-01-11 03:06:40       3335 00000-1032-3664e1cc-cc6c-468f-8183-d9d9ace18fcd-00001-deletes.parquet


### rewrite_data_files

In [49]:
%%sql
CALL glue_catalog.system.rewrite_data_files(table => 'icebergdb.catalog_sales')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

### rewrite_position_delete_files
Iceberg can rewrite position delete files, which serves two purposes:

* Minor Compaction: Compact small position delete files into larger ones. This reduces the size of metadata stored in manifest files and overhead of opening small delete files.
* Remove Dangling Deletes: Filter out position delete records that refer to data files that are no longer live. After rewrite_data_files, position delete records pointing to the rewritten data files are not always marked for removal, and can remain tracked by the table’s live snapshot metadata. This is known as the ‘dangling delete’ problem.

In [37]:
%%sql
CALL glue_catalog.system.rewrite_position_delete_files('icebergdb.catalog_sales')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

##### 检查文件清理情况

请替换 S3 目录

In [42]:
%%sh
aws s3 ls s3://myemr-bucket-01/data/iceberg-folder/icebergdb.db/catalog_sales/data/cs_sold_date_sk=2450815/

2024-01-11 01:50:19   82793375 00000-1013-fa88059a-14ae-4ca6-ae6c-3e652694edec-00001.parquet
2024-01-11 03:06:40       3335 00000-1032-3664e1cc-cc6c-468f-8183-d9d9ace18fcd-00001-deletes.parquet


In [39]:
%%sh
aws s3 ls s3://myemr-bucket-01/data/iceberg-folder/icebergdb.db/catalog_sales/metadata/

2024-01-11 03:01:37       8620 00003-9ff496b4-9507-454c-847d-ec7743deac99.metadata.json
2024-01-11 03:01:58       9777 00004-49ed5cdf-3ec4-4c4f-bbb5-4f2012da42c0.metadata.json
2024-01-11 03:02:08      10934 00005-ec251382-55f3-45c9-b5c5-be8ae1f5cf11.metadata.json
2024-01-11 03:02:18      11890 00006-2cd9ea38-8764-4bba-9c3d-60b89e0a460b.metadata.json
2024-01-11 03:03:02      12847 00007-3f0df5f9-9ff6-4565-b528-8c675ea636b0.metadata.json
2024-01-11 03:06:40      13908 00008-ecb46631-1717-43a8-903e-533767d8b381.metadata.json
2024-01-11 03:06:40       9283 2d24b1ec-a660-4491-97a0-7c609b075959-m0.avro
2024-01-11 03:06:40       9285 2d24b1ec-a660-4491-97a0-7c609b075959-m1.avro
2024-01-11 03:06:40       9286 2d24b1ec-a660-4491-97a0-7c609b075959-m2.avro
2024-01-11 03:06:40       9283 2d24b1ec-a660-4491-97a0-7c609b075959-m3.avro
2024-01-11 03:06:40       9285 2d24b1ec-a660-4491-97a0-7c609b075959-m4.avro
2024-01-11 03:06:40       9282 2d24b1ec-a660-4491-97a0-7c609b075959-m5.avro
2024-01-11 03:06

## 通过pyspark执行

### 创建表同时写入数据

In [4]:
# sample dataset
inputDF = spark.createDataFrame(
    [
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
        ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
    ],
    ["id", "creation_date", "last_update_time"]
)

# write to iceberg table
inputDF.writeTo("glue_catalog.icebergdb.sample_py_spark") \
    .tableProperty("format", "iceberg/parquet") \
    .tableProperty("format-version", "2") \
    .tableProperty("write.metadata.delete-after-commit.enabled", "true") \
    .tableProperty("history.expire.max-snapshot-age-ms", "86400") \
    .tableProperty('write.metadata.previous-versions-max','5') \
    .tableProperty('history.expire.min-snapshots-to-keep','1') \
    .tableProperty('write.update.mode', 'merge-on-read') \
    .tableProperty('write.delete.mode', 'merge-on-read') \
    .tableProperty('write.merge.mode', 'merge-on-read') \
    .create()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 查看写入的数据内容

In [5]:
spark.sql("select * from glue_catalog.icebergdb.sample_py_spark").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-------------+--------------------+
| id|creation_date|    last_update_time|
+---+-------------+--------------------+
|100|   2015-01-01|2015-01-01T13:51:...|
|101|   2015-01-01|2015-01-01T12:14:...|
|102|   2015-01-01|2015-01-01T13:51:...|
|103|   2015-01-01|2015-01-01T13:51:...|
|104|   2015-01-02|2015-01-01T12:15:...|
|105|   2015-01-02|2015-01-01T13:51:...|
+---+-------------+--------------------+