# S3Tablesを試してみるための手順
---

## 1 Terrafromでインフラ準備

### 1.1 Terraformの設定ファイルの準備
`env.tfvars.sample` ファイルを参考に `env.tfvars` を作成します。

regionに関してはS3Tablesが使えるリージョンを設定します。

```toml
profile = "yourprofile"
region  = "us-east-1"
bucket  = "yourbucket-in-your-region"
```

### 1.2 Terraformの実行
DevContainer上から以下のコマンドを実行します。


```sh
$ make tf-apply
```

実行が完了した際に設定が出力されます。こちらは後続の手順で利用するのでメモしておきます。

![](../images/001.png)

## 2 SparkによるS3Tablesの操作
SparkからS3Tablesを操作していきます.

### 2.1 Spark環境設定
変数に環境設定を行なっていきます。

---
まずは以下のCellでTerraformの出力を参照して、変数の設定をします。

In [3]:
AWS_ACCOUNT_ID = "xxxxxxxxxxx"
AWS_REGION = "us-east-1"
AWS_PROFILE = "xxxxxxxx"
BUCKET_NAME = "s3-tables-sample-xxxxxxxxxxx"
NAMESPACE = "s3_tables_sample"

---
その他の設定も変数に入れておきます。ここでpysparkから利用する環境変数も設定します。

In [4]:
import os

s3tables_bucket_arn = f"arn:aws:s3tables:{AWS_REGION}:{AWS_ACCOUNT_ID}:bucket/{BUCKET_NAME}"

# pysparkの実行時に読み込まれるAWS接続設定
os.environ["AWS_PROFILE"] = AWS_PROFILE
os.environ["AWS_REGION"] = AWS_REGION


# 以下は任意に変更可能
app_name = "S3TablesSample"
catalog_name = "s3tablesbucket"
table_name = "sample_table"

---
pysparkの準備を行ないます

In [5]:
from pyspark.sql import SparkSession

# Sparkで利用するパッケージを指定します
spark_packages = [
    # Amazon S3 Tables Catalog for Apache Iceberg
    # refs: https://github.com/awslabs/s3-tables-catalog
    "software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:0.1.3",

    # Pyspark: 3.5用
    # Scala: 2.12用
    # refs: https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.5
    "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1",

    # その他必要になるAWS SDK
    "software.amazon.awssdk:s3:2.20.68",
    "software.amazon.awssdk:sts:2.20.68",
    "software.amazon.awssdk:glue:2.20.68",
    "software.amazon.awssdk:dynamodb:2.20.68",
    "software.amazon.awssdk:kms:2.20.68",
]


# Spark Sessionを生成する関数
def get_spark_session(
    app_name: str = app_name,
    catalog_name: str = catalog_name,
    packages: list[str] = spark_packages,
    aws_region: str = AWS_REGION,
    s3tables_bucket_arn: str = s3tables_bucket_arn,
) -> SparkSession:
    # SparkSessionの作成
    spark = (
        SparkSession.builder.appName(app_name)
        .config(
            "spark.hadoop.fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.profile.ProfileCredentialsProvider",
        )
        .config("spark.jars.packages", ",".join(packages))
        .config(
            "spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        )
        .config(
            f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog"
        )
        .config(
            f"spark.sql.catalog.{catalog_name}.catalog-impl",
            "software.amazon.s3tables.iceberg.S3TablesCatalog",
        )
        .config(f"spark.sql.catalog.{catalog_name}.warehouse", s3tables_bucket_arn)
        .config(f"spark.sql.catalog.{catalog_name}.s3.region", aws_region)
        .config("spark.sql.catalog.s3tablesbucket.allow-delete-purge", "true")
        .getOrCreate()
    )
    return spark



### 2.2 S3 Tablesにテーブル作成と確認
ここから[AWSのブログ](https://aws.amazon.com/jp/blogs/news/new-amazon-s3-tables-storage-optimized-for-analytics-workloads/)に合せて作業してみます。


---
まずは一度SparkSessionを作成します。 この際に必要なパッケージもダウンロードされます。（ある程度時間かかります)

In [6]:
spark = get_spark_session()
spark.stop()

:: loading settings :: url = jar:file:/workspaces/s3-tables-sample/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/vscode/.ivy2/cache
The jars for the packages stored in: /home/vscode/.ivy2/jars
software.amazon.s3tables#s3-tables-catalog-for-iceberg-runtime added as a dependency
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
software.amazon.awssdk#s3 added as a dependency
software.amazon.awssdk#sts added as a dependency
software.amazon.awssdk#glue added as a dependency
software.amazon.awssdk#dynamodb added as a dependency
software.amazon.awssdk#kms added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-dd1df947-0bfa-4455-8d2c-cf278a8d8594;1.0
	confs: [default]
	found software.amazon.s3tables#s3-tables-catalog-for-iceberg-runtime;0.1.3 in central
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.1 in central
	found software.amazon.awssdk#s3;2.20.68 in central
	found software.amazon.awssdk#aws-xml-protocol;2.20.68 in central
	found software.amazon.awssdk#aws-query-protocol;2.20.68 in central
	found so

---
Terraformで作成したNamespaceが存在することが確認できます.

In [7]:
spark = get_spark_session()
sql = f"SHOW NAMESPACES IN `{catalog_name}`"
print(sql)
spark.sql(sql).show()
spark.stop()

SHOW NAMESPACES IN `s3tablesbucket`
+-------------------+
|          namespace|
+-------------------+
|s3_tables_sample_ns|
+-------------------+



---
現時点ではNamespaceの中にテーブルは存在しません.

In [8]:
spark = get_spark_session()
sql = f"SHOW TABLES IN `{catalog_name}`.`{NAMESPACE}`"
print(sql)
spark.sql(sql).show()
spark.stop()

SHOW TABLES IN `s3tablesbucket`.`s3_tables_sample_ns`
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



### 2.3 テーブルを作成
---
[ブログ](https://aws.amazon.com/jp/blogs/news/new-amazon-s3-tables-storage-optimized-for-analytics-workloads/)の記事に合せてテーブルを作成します。

In [9]:
spark = get_spark_session()
sql = f"""CREATE TABLE IF NOT EXISTS `{catalog_name}`.`{NAMESPACE}`.`{table_name}`
 (id INT,
  name STRING,
  value INT)
  USING iceberg
"""
print(sql)
spark.sql(sql)
spark.stop()

CREATE TABLE IF NOT EXISTS `s3tablesbucket`.`s3_tables_sample_ns`.`sample_table`
 (id INT,
  name STRING,
  value INT)
  USING iceberg



---
テーブルにデータを挿入します.

In [10]:
spark = get_spark_session()
sql = f"""INSERT INTO `{catalog_name}`.`{NAMESPACE}`.`{table_name}`
  VALUES
  (1, 'Jeff', 100),
  (2, 'Carmen', 200),
  (3, 'Stephen', 300),
  (4, 'Andy', 400),
  (5, 'Tina', 500),
  (6, 'Bianca', 600),
  (7, 'Grace', 700)
"""
print(sql)
spark.sql(sql)
spark.stop()

INSERT INTO `s3tablesbucket`.`s3_tables_sample_ns`.`sample_table`
  VALUES
  (1, 'Jeff', 100),
  (2, 'Carmen', 200),
  (3, 'Stephen', 300),
  (4, 'Andy', 400),
  (5, 'Tina', 500),
  (6, 'Bianca', 600),
  (7, 'Grace', 700)



                                                                                

### 2.4 テーブルの内容を確認
---
テーブルが作成されて、SparkからもデータをSELECTできるようになります。

In [11]:
spark = get_spark_session()

sql = f"SHOW NAMESPACES IN `{catalog_name}`"
print(sql)
spark.sql(sql).show()

sql = f"SELECT * FROM `{catalog_name}`.`{NAMESPACE}`.`{table_name}`"
print(sql)
spark.sql(sql).show()

spark.stop()

SHOW NAMESPACES IN `s3tablesbucket`
+-------------------+
|          namespace|
+-------------------+
|s3_tables_sample_ns|
+-------------------+

SELECT * FROM `s3tablesbucket`.`s3_tables_sample_ns`.`sample_table`


[Stage 1:>                                                          (0 + 1) / 1]

+---+-------+-----+
| id|   name|value|
+---+-------+-----+
|  1|   Jeff|  100|
|  2| Carmen|  200|
|  3|Stephen|  300|
|  4|   Andy|  400|
|  5|   Tina|  500|
|  6| Bianca|  600|
|  7|  Grace|  700|
+---+-------+-----+



                                                                                

## 3. Athenaからデータを確認

### 3.1 Lakeformationの権限付与
Athenaからデータを確認するためにはLakeformation経由での権限を付与する必要があります。

---
以下のboto3関数を実行します。ここでは実行ユーザに対してSELECT権限を割り当てています。

In [None]:
from boto3 import Session
session = Session(profile_name=AWS_PROFILE,region_name=AWS_REGION)
sts = session.client("sts")
lakeformation = session.client("lakeformation")
principal=sts.get_caller_identity()["Arn"]
lakeformation.grant_permissions(
    Principal={"DataLakePrincipalIdentifier": principal},
    Permissions=["SELECT"],
    Resource={
        "Table": {
            "CatalogId": f"s3tablescatalog/{BUCKET_NAME}",
            "DatabaseName": NAMESPACE,
            "Name": table_name,
        }
    }
)

---
AWSコンソールからクエリを実行してみると以下のようにデータが取得できます。

画面左のペインに入っている値を確認すると理解が深まります。

```sql
SELECT * FROM "sample_table" limit 10;
```

![](../images/002.png)

## 4. かたづけ
作った環境を片付けていきます。

## 4.1 Tableの削除
BucketやNamespaceはTableが削除されていないと、削除できません。

---
まずはS3TablesのTableを削除します。こちらはboto3の関数で実施します。

In [None]:
from boto3 import Session
session = Session(profile_name=AWS_PROFILE,region_name=AWS_REGION)
s3tables = session.client("s3tables")
s3tables.delete_table(
    tableBucketARN=s3tables_bucket_arn,
    namespace=NAMESPACE,
    name=table_name,
)

### 4.2 Terraformの削除
---
DevContainer上から以下のコマンドを実行します。
これでTerraformで作成したS3TablesのバケットやNamespaceも削除されます。


```sh
$ make tf-destroy
```