### ABOUT NOTEBOOKS
For the Feature Friday Iceberg labs, we've decided to leverage a Jupyter notebook due to the ability to provide an interative experience blending formatted text (markdown) and executable code in a single document.

<u>General usage notes</u>
- A notebook is constructed of a set of ordered cells that are either markdown text or code
- Code cannot be executed in a markdown cell, but the output of a code cell can be markdown text
- If you encounter an error during code execution due to an undefined variable, ensure that the code block that defines the variable (located in a cell above) is executed to set its value
  - NOTE: All variables defined in this notebook are in **uppercase**

### SRR RESOURCES
<u>Training Materials</u>
- Engineering
  - [Iceberg Tables Training for Support](https://snowflakecomputing.atlassian.net/wiki/spaces/CustomerSupport/pages/3028713756/Iceberg+Tables+Training+for+Support)
- Support
  - [Intro to Iceberg Tables - Key Concepts training](https://snowflake.zoom.us/rec/share/997cOWJYC6rBIuyzs2P02oD8f28M7dzWj0ygDG9NuIhw3uti2EUqq8_h1m9VUQ8l.dpBvyZ9p_YEQVkOx)
    - Passcode: <code>AAPQ&W5$</code>    

<u>Runbook(s)</u>
  - [Iceberg Support Runbook](https://docs.google.com/document/d/18MjH6n3ypi4VEbs5_wa_Vpxgi0XtVWCLW4_EEe7DBLQ/edit#heading=h.1bu2wjpos1k9)

<u>Product documentation</u>
  - [Iceberg tables](https://docs.snowflake.com/en/user-guide/tables-iceberg)

### WHAT IS APACHE ICEBERG?

[Apache Iceberg](https://iceberg.apache.org/docs/latest/) is an open-source table format specification developed for huge analytic datasets. 

### WHAT ARE ICEBERG TABLES?

Iceberg tables are a new table type designed to support the [Apache Iceberg table specification](https://iceberg.apache.org/spec/) to represent a large collection of slowly-changing files on a distributed file system (AWS S3, Azure Blob, Google Cloud Storage) with performance close to that of native Snowflake tables.

The architecture of an Apache Iceberg table is defined as three distinct layers:
- [Iceberg Catalog](https://iceberg.apache.org/concepts/catalog/#iceberg-catalogs) - Used to manage a collection of tables</li>
- [Metadata](https://iceberg.apache.org/spec/#specification) - Used to manage table states in a catalog through a combination of metadata files (JSON), manifest lists (Avro), and manifest files (Avro)
- Data - Collection of files that represent the data for all tables in the catalog

<div>
<img src="https://iceberg.apache.org/assets/images/iceberg-metadata.png" width="50%"/>
</div>


Snowflake supports Iceberg tables with externally managed catalogs (<strong>unmanaged</strong>) and natively managed catalogs (<strong>managed</strong>).

The following catalog types are supported for unmanaged iceberg tables:
- [AWS Glue data catalog](https://docs.aws.amazon.com/glue/latest/dg/catalog-and-crawler.html)
- Object storage
    - Iceberg
    - Loose parquet (PrPr)

To create an unmanaged Iceberg table, you will need to configure:
- [EXTERNAL VOLUME](https://docs.snowflake.com/sql-reference/sql/create-external-volume) - Defines the location(s) for the external catalog, metadata, and data
- [CATALOG INTEGRATION](https://docs.snowflake.com/en/sql-reference/sql/create-catalog-integration) - Defines the details of the external catalog

### ALWAYS START HERE
**<u>IMPORTANT</u>**: You **must** execute the cell below to initialize the notebook for code execution

In [None]:
import os

SNOW_LOCATOR = os.getenv("SNOW_LOCATOR", None)
SNOW_USER = os.getenv("SNOW_USER", None)
SNOW_PASSWD = os.getenv("SNOW_PASSWD", None)
SNOW_DB = os.getenv("SNOW_DB", None)
SNOW_ROLE = os.getenv("SNOW_ROLE", None)

if (SNOW_LOCATOR is None) or (SNOW_USER is None) or (SNOW_PASSWD is None) or (SNOW_DB is None) or (SNOW_ROLE is None):
    raise Exception("SNOW_LOCATOR, SNOW_USER, SNOW_PASSWD, SNOW_DB, and SNOW_ROLE must be defined in snowflake.env to continue")

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("jupyter").getOrCreate()

%reload_ext sparksql_magic
%reload_ext sql
%sql snowflake://{SNOW_USER}:{SNOW_PASSWD}@{SNOW_LOCATOR}/{SNOW_DB}?role={SNOW_ROLE}

### Exercise 0: (Optional) Cleanup objects created the previous notebook
Execute the cell below to drop the Snowflake objects created in this exercise.

In [None]:
%%sql

use role support_rl;

drop database if exists {SNOW_USER}_db;
drop database if exists {SNOW_USER}_iceberg_db;
drop integration if exists {SNOW_USER}_iceberg_os_catalog_int;
drop integration if exists {SNOW_USER}_os_iceberg_cat_int;
drop integration if exists {SNOW_USER}_glue_cat_int;
drop external volume if exists {SNOW_USER}_ext_vol;

#### Step 1: Create a warehouse and database/schema workspace to use throughout the lab

In [None]:
%%sql

use role support_rl;

create warehouse if not exists {SNOW_USER}_wh warehouse_size='xsmall';
create database if not exists {SNOW_USER}_db;
create schema if not exists {SNOW_USER}_db.iceberg;

NOTE: Objects created outside of Snowflake need to be removed manually.

### Exercise 1: Create an unmanaged Snowflake Iceberg table (type=OBJECT_STORE)
<u>Prerequisites</u>
1. You must have a personal AWS S3 bucket in <code>us-west-2</code>
2. You must have an AWS IAM policy that allows read/write access to your personal AWS S3 bucket
3. You must have an AWS IAM role that is assigned the AWS IAM policy (2)

In [None]:
# Replace <MY_ICEBERG_LAB_S3_BUCKET> with the name of your bucket in us-west-2
MY_ICEBERG_LAB_S3_BUCKET='<MY_ICEBERG_LAB_S3_BUCKET>'
# Replace <MY_ICEBERG_LAB_S3_IAM_ROLE_ARN> with your IAM AWS S3 access role ARN
MY_ICEBERG_LAB_S3_IAM_ROLE_ARN='<MY_ICEBERG_LAB_S3_IAM_ROLE_ARN>'

#### Step 1: Create an AWS Iceberg table using AWS Athena
NOTE: Run the cell below to generate instructions

In [None]:
from IPython.display import Markdown as md

SOURCE_TABLE='kterada_db.kt_glue_iceberg_tpcds_sf10tcl_web_sales'
TARGET_DB_NAME=SNOW_USER + '_db'
TARGET_TABLE_NAME='iceberg_tpcds_sf10tcl_web_sales_t'

here = """
<u>Instructions</u>:
1. Login to the AWS CE-Sandbox Console via [SnowBiz Okta](https://snowbiz.okta.com/)
2. Navigate to the [AWS Athena query editor](https://us-west-2.console.aws.amazon.com/athena/home?region=us-west-2#/query-editor) in <code>us-west-2</code>
3. Execute the following SQL to create an AWS Iceberg table:
```
create table {db_name}.{table_name} with (table_type='iceberg', location='s3://{my_os_s3_bucket}/iceberg_feature_lab/tpcds_sf10tcl/web_sales/', is_external=false)
as (select * from {source_table} limit 1000);
```
""".format(my_os_s3_bucket=MY_ICEBERG_LAB_S3_BUCKET, username=SNOW_USER, db_name=TARGET_DB_NAME, table_name=TARGET_TABLE_NAME, source_table=SOURCE_TABLE)

md(here)

#### Step 2: Create an external volume
NOTE: This object will be reused in other exercises in the lab.

<u>References</u>
- Snowflake Documentation:
  - [Configure an external volume for Iceberg tables](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume)
  - [CREATE EXTERNAL VOLUME](https://docs.snowflake.com/en/sql-reference/sql/create-external-volume)

In [None]:
%%sql

use role support_rl;

create or replace external volume {SNOW_USER}_iceberg_lab_ext_vol
    storage_locations = (
        (
            name = 's3_iceberg_lab'
            storage_provider = 's3'
            storage_base_url = 's3://{MY_ICEBERG_LAB_S3_BUCKET}/'
            storage_aws_role_arn = '{MY_ICEBERG_LAB_S3_IAM_ROLE_ARN}'
            encryption = (type='aws_sse_s3')
        )
    )
;

desc external volume {SNOW_USER}_iceberg_lab_ext_vol;

NOTE: After creating the external volume, you must update trust relationships in your AWS IAM role based on the `DESCRIBE` output above

#### Step 3: Create a catalog integration
NOTE: This object will be reused in other exercises in the lab.

<u>References</u>
- Snowflake Documentation:
  - [Configure a catalog integration for Iceberg tables](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration)
  - [CREATE CATALOG INTEGRATION](https://docs.snowflake.com/en/sql-reference/sql/create-catalog-integration)

In [None]:
%%sql

use role support_rl;

create or replace catalog integration {SNOW_USER}_os_cat_int
    catalog_source = object_store
    table_format = iceberg
    enabled = true
;

desc integration {SNOW_USER}_os_cat_int;

#### Step 4: Create the Snowflake Iceberg table (unmanaged)
<u>References</u>
- Snowflake Documentation:
  - [Create an Iceberg table](https://docs.snowflake.com/user-guide/tables-iceberg-create)
  - [CREATE ICEBERG TABLE](https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table)

In [None]:
# Replace <MY_OS_ICEBERG_TABLE_METADATA_FILE> with the name of AWS Iceberg table metadata file in the 
# iceberg_feature_lab/tpcds_sf10tcl/web_sales/ path from your us-west-2 bucket
MY_OS_ICEBERG_TABLE_METADATA_FILE='<MY_OS_ICEBERG_TABLE_METADATA_FILE>'

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

create or replace iceberg table {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_os_web_sales_t
    external_volume = {SNOW_USER}_iceberg_lab_ext_vol
    catalog = {SNOW_USER}_os_cat_int
    metadata_file_path = 'iceberg_feature_lab/tpcds_sf10tcl/web_sales/metadata/{MY_OS_ICEBERG_TABLE_METADATA_FILE}'
;

#### Step 5: Verify the Snowflake Iceberg table contains data

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select
    count(*)
from {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_os_web_sales_t
where true
;

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select
    *
from {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_os_web_sales_t
where true
limit 25
;

#### Step 6: Update the AWS Iceberg table
NOTE: Run the cell below to generate instructions

In [None]:
from IPython.display import Markdown as md

here = """
<u>Instructions</u>
1. In the AWS CE-Sandbox Console, navigate to the [AWS Athena query editor](https://us-west-2.console.aws.amazon.com/athena/home?region=us-west-2#/query-editor) in <code>us-west-2</code>
2. Execute the following SQL to insert additional records into the AWS Iceberg table:
```
insert into {db_name}.{table_name} (select * from {source_table} except select * from {db_name}.{table_name} limit 1000);
```
""".format(db_name=TARGET_DB_NAME, table_name=TARGET_TABLE_NAME, source_table=SOURCE_TABLE)

md(here)

#### Step 7: Manually refresh the unmanaged Snowflake Iceberg table
<u>References</u>
- Snowflake Documentation
  - [Manage an Iceberg table](https://docs.snowflake.com/user-guide/tables-iceberg-manage)

In [None]:
# Replace <MY_LATEST_OS_ICEBERG_TABLE_METADATA_FILE> with the name of latest AWS Iceberg table metadata file in the 
# iceberg_feature_lab/tpcds_sf10tcl/web_sales/ path from your us-west-2 bucket (HINT: Filter for the most current timestamp)
MY_LATEST_OS_ICEBERG_TABLE_METADATA_FILE='<MY_LATEST_OS_ICEBERG_TABLE_METADATA_FILE>'

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

alter iceberg table {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_os_web_sales_t refresh 'iceberg_feature_lab/tpcds_sf10tcl/web_sales/metadata/{MY_LATEST_OS_ICEBERG_TABLE_METADATA_FILE}';

#### Step 8: Verify the Snowflake Iceberg table contains more data

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select
    count(*)
from {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_os_web_sales_t
where true
;

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select
    *
from {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_os_web_sales_t
where true
limit 25
;

#### Step 9: Inspect the Snowflake Iceberg table DDL

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

select get_ddl('table', '{SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_os_web_sales_t');

#### Step 10: Execute DDL on the unmanaged Snowflake Iceberg table

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

truncate table {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_os_web_sales_t;

#### Step 11: Inspect SYSTEM\$GET_ICEBERG_TABLE_INFORMATION details (unmanaged)
<u>References</u>
- Snowflake Documentation:
  - [SYSTEM$GET_ICEBERG_TABLE_INFORMATION](https://docs.snowflake.com/en/sql-reference/functions/system_get_iceberg_table_information)

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select system$get_iceberg_table_information('{SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_os_web_sales_t');

#### Step 12: Convert the Snowflake Iceberg table from unmanaged to managed
<u>References</u>
- Snowflake Documentation:
  - [Convert an Iceberg table](https://docs.snowflake.com/en/user-guide/tables-iceberg-conversion)
  - [ALTER ICEBERG TABLE](https://docs.snowflake.com/en/sql-reference/sql/alter-iceberg-table)

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

alter iceberg table {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_os_web_sales_t convert to managed base_location='snowflake/web_sales_t';

#### Step 13: Rename the Snowflake Iceberg table
<u>References</u>
- Snowflake Documentation:
  - [ALTER ICEBERG TABLE](https://docs.snowflake.com/en/sql-reference/sql/alter-iceberg-table)

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

alter iceberg table {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_os_web_sales_t rename to {SNOW_USER}_managed_os_web_sales_t;

#### Step 14: Inspect the Snowflake Iceberg table DDL

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select get_ddl('table', '{SNOW_USER}_db.iceberg.{SNOW_USER}_managed_os_web_sales_t'); 

#### Step 15: Execute DDL on the managed Snowflake Iceberg table

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

truncate table {SNOW_USER}_db.iceberg.{SNOW_USER}_managed_os_web_sales_t;

#### Step 16: Inspect SYSTEM\$GET_ICEBERG_TABLE_INFORMATION details (managed)
<u>References</u>
- Snowflake Documentation:
  - [SYSTEM$GET_ICEBERG_TABLE_INFORMATION](https://docs.snowflake.com/en/sql-reference/functions/system_get_iceberg_table_information)

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select system$get_iceberg_table_information('{SNOW_USER}_db.iceberg.{SNOW_USER}_managed_os_web_sales_t');

#### Step 17: (Optional) Cleanup
Execute the cell below to drop the Snowflake objects created in this exercise.

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

drop iceberg table if exists {SNOW_USER}_db.iceberg.{SNOW_USER}_managed_os_web_sales_t;
/* 
uncomment the lines after all labs are completed
*/
--drop integration if exists {SNOW_USER}_os_cat_int;
--drop external volume if exists {SNOW_USER}_iceberg_lab_ext_vol;

NOTE: Objects created outside of Snowflake need to be removed manually.

### Exercise 2: Create an unmanaged Snowflake Iceberg Table (type=GLUE)
NOTE: This exercise will use a pre-existing AWS Iceberg table created by AWS Glue

#### Step 1: Create an IAM role with AWS S3 permissions to the pre-existing AWS Iceberg table
NOTE: Run the cell below to generate instructions

In [None]:
from IPython.display import Markdown as md

here = """
<u>Instructions</u>
1. In the AWS CE-Sandbox Console, navigate to **Identity and Access Management (IAM)** > [**Roles**](https://us-east-1.console.aws.amazon.com/iam/home#/roles)
2. Click the **Create role** button
3. On the **Specify permissions page**, click the **JSON** button to toggle to the JSON editor
4. Select **Trusted entity type** > **AWS account**
5. Click the **Next** button
6. On the **Add permissions** page, lookup and click the checkbox for **kterada_custom_s3_policy**
7. Click the **Next** button
8. Name the role **{username}_iceberg_lab_s3_role**
9. Click the **Create role** button
10. Replace `<MY_GLUE_IAM_ROLE_ARN>` with the IAM role ARN in the cell below and execute
""".format(username=SNOW_USER, db_name=TARGET_DB_NAME, table_name=TARGET_TABLE_NAME, source_table=SOURCE_TABLE)

md(here)

In [None]:
# Replace <MY_GLUE_IAM_ROLE_ARN> with your IAM AWS Glue access role ARN
MY_GLUE_IAM_ROLE_ARN='<MY_GLUE_IAM_ROLE_ARN>'

#### Step 2: Create an external volume
<u>References</u>
- Snowflake Documentation:
  - [Configure an external volume for Iceberg tables](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume)
  - [CREATE EXTERNAL VOLUME](https://docs.snowflake.com/en/sql-reference/sql/create-external-volume)

In [None]:
%%sql

use role support_rl;

create or replace external volume {SNOW_USER}_glue_ext_vol
    storage_locations = (
        (
            name = 's3_iceberg_lab_glue'
            storage_provider = 's3'
            storage_base_url = 's3://kt-s3-us-west-2/'
            storage_aws_role_arn = '{MY_GLUE_IAM_ROLE_ARN}'
            encryption = (type='aws_sse_s3')
        )
    )
;

desc external volume {SNOW_USER}_glue_ext_vol;

NOTE: After creating the external volume, you must update trust relationships in your AWS IAM role based on the `DESCRIBE` output above

#### Step 3: Create an IAM role with AWS Glue service permissions
NOTE: Run the cell below to generate instructions

In [None]:
from IPython.display import Markdown as md

here = """
<u>Instructions</u>:
1. In the AWS CE-Sandbox Console, navigate to **Identity and Access Management (IAM)** > [**Roles**](https://us-east-1.console.aws.amazon.com/iam/home#/roles)
2. Click the **Create role** button
3. On the **Specify permissions page**, click the **JSON** button to toggle to the JSON editor
4. Select **Trusted entity type** > **AWS account**
5. Click the **Next** button
6. On the **Add permissions** page, lookup and click the checkbox for **kterada_custom_glue_policy**
7. Click the **Next** button
8. Name the role `{username}_glue_role`
9. Click the **Create role** button
10. Replace `<MY_GLUE_SERVICE_IAM_ROLE_ARN>` with the IAM role ARN in the cell below and execute
""".format(username=SNOW_USER, db_name=TARGET_DB_NAME, table_name=TARGET_TABLE_NAME, source_table=SOURCE_TABLE)

md(here)

In [None]:
# Replace <MY_GLUE_SERVICE_IAM_ROLE_ARN> with your IAM AWS Glue service access role ARN
MY_GLUE_SERVICE_IAM_ROLE_ARN='<MY_GLUE_SERVICE_IAM_ROLE_ARN>'

#### Step 4: Create a catalog integration
<u>References</u>
- Snowflake Documentation:
  - [Configure a catalog integration for Iceberg tables](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration)
  - [CREATE CATALOG INTEGRATION](https://docs.snowflake.com/en/sql-reference/sql/create-catalog-integration)

In [None]:
%%sql

use role support_rl;

create or replace catalog integration {SNOW_USER}_glue_cat_int
    catalog_source = glue
    catalog_namespace = 'kterada_db'
    table_format = iceberg
    glue_aws_role_arn = '{MY_GLUE_SERVICE_IAM_ROLE_ARN}'
    glue_catalog_id = '094271313368'
    glue_region = 'us-west-2'
    enabled = true
;

desc integration {SNOW_USER}_glue_cat_int;

NOTE: After creating the catalog integration, you must update trust relationships in your AWS IAM role based on the `DESCRIBE` output above

#### Step 5: Create the Snowflake Iceberg table (unmanaged)
<u>References</u>
- Snowflake Documentation:
  - [Create an Iceberg table](https://docs.snowflake.com/user-guide/tables-iceberg-create)
  - [CREATE ICEBERG TABLE](https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table)

In [None]:
%%sql

use role support_rl;

create or replace iceberg table {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_glue_web_sales_t
    external_volume = {SNOW_USER}_glue_ext_vol
    catalog = {SNOW_USER}_glue_cat_int
    catalog_namespace = 'kterada_db'
    catalog_table_name = 'kt_glue_iceberg_tpcds_sf10tcl_web_sales'
;

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select
    count(*)
from {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_glue_web_sales_t
where true
;

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select
    *
from {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_glue_web_sales_t
where true
limit 25
;

#### Step 6: Inspect the Snowflake Iceberg table DDL

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

select get_ddl('table', '{SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_glue_web_sales_t'); 

#### Step 7: Inspect SYSTEM\$GET_ICEBERG_TABLE_INFORMATION details
<u>References</u>
- Snowflake Documentation:
  - [SYSTEM$GET_ICEBERG_TABLE_INFORMATION](https://docs.snowflake.com/en/sql-reference/functions/system_get_iceberg_table_information)

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select system$get_iceberg_table_information('{SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_glue_web_sales_t');

#### Step 8: (Optional) Cleanup
Execute the cell below to drop the Snowflake objects created in this exercise.

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

drop iceberg table if exists {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_glue_web_sales_t;
drop catalog integration if exists {SNOW_USER}_glue_cat_int;
drop external volume if exists {SNOW_USER}_glue_ext_vol;

NOTE: Objects created outside of Snowflake need to be removed manually.

### Exercise 3: Create an unmanaged parquet direct (loose parquet) Snowflake Iceberg table

#### Step 1: Upload parquet source data
NOTE: Run the cell below to generate instructions

In [None]:
from IPython.display import Markdown as md

here = """
<u>Instructions</u>
1. In the AWS CE-Sandbox Console, navigate to **S3** > [**Buckets**](https://s3.console.aws.amazon.com/s3/home?region=us-west-2) in `us-west-2`
2. Navigate to the `iceberg_feature_lab` folder in your `{bucket_name}` bucket
3. Click the **Create folder** button
4. Enter `nyc_tlc_data` for the **Folder name** and click the **Create folder** button
5. Click into the `nyc_tlc_data` folder
6. Click the **Create folder** button
7. Enter `green` for the **Folder name** and click the **Create folder** button
8. Click into the `green` folder
9. Download one of the available Green Taxi Trip Records data files from NYC Taxi & Limousine Commission's [trip data page](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) (e.g., [November 2023 Green Taxi Trip Records](https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-11.parquet))
10. Click the **Upload** button, then the **Add Files** button
11. Nagivate and select the parquet file downloaded in Step 9
12. Click the **Upload** button to upload the file
""".format(bucket_name=MY_ICEBERG_LAB_S3_BUCKET)


md(here)

#### Step 2: Create an external volume
IMPORTANT: The external volume used for parquet direct Snowflake Iceberg tables must explicitly disable encryption.

<u>References</u>
- Snowflake Documentation:
  - [Configure an external volume for Iceberg tables](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume)
  - [CREATE EXTERNAL VOLUME](https://docs.snowflake.com/en/sql-reference/sql/create-external-volume)

In [None]:
%%sql

use role support_rl;

create or replace external volume {SNOW_USER}_iceberg_lab_no_enc_ext_vol
    storage_locations = (
        (
            name = 's3_iceberg_lab'
            storage_provider = 's3'
            storage_base_url = 's3://{MY_ICEBERG_LAB_S3_BUCKET}/'
            storage_aws_role_arn = '{MY_ICEBERG_LAB_S3_IAM_ROLE_ARN}'
            encryption = (type='none')
        )
    )
;

desc external volume {SNOW_USER}_iceberg_lab_no_enc_ext_vol;

#### Step 3: Create a catalog integration
<u>References</u>
- Snowflake Documentation:
  - [Configure a catalog integration for Iceberg tables](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration)
  - [CREATE CATALOG INTEGRATION](https://docs.snowflake.com/en/sql-reference/sql/create-catalog-integration)

In [None]:
%%sql

use role support_rl;

create or replace catalog integration {SNOW_USER}_pd_cat_int
    catalog_source = object_store
    table_format = none
    enabled = true
;

desc integration {SNOW_USER}_pd_cat_int;

#### Step 4: Create the parquet direct Iceberg Snowflake tables (unmanaged)

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

/*
table schema detail: https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_green.pdf
*/

create or replace iceberg table {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_pd_nyc_tlc_green_trip_record_t (
    vendor_id              string,
    llpep_pickup_datetime  datetime,
    llpep_dropoff_datetime datetime,
    passenger_count        bigint,
    trip_distance          double,
    pu_location_id         string,
    do_location_id         string,
    rate_code_id           bigint,
    store_and_fwd_flag     string,
    payment_type           bigint,
    fare_amount            double,
    extra                  double,
    mta_tax                double,
    improvement_surcharge  double,
    tip_amount             double,
    tolls_amount           double,
    total_amount           double,
    trip_type              bigint
    )
    external_volume = {SNOW_USER}_iceberg_lab_no_enc_ext_vol
    catalog = {SNOW_USER}_pd_cat_int
    base_location = 'iceberg_feature_lab/nyc_tlc_data/green/'
;

#### Step 5: Verify the Snowflake Iceberg tables contain data

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select
    count(*)
from {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_pd_nyc_tlc_green_trip_record_t
where true
;

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select
    *
from {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_pd_nyc_tlc_green_trip_record_t
where true
limit 25
;

#### Step 6: Inspect the Snowflake Iceberg table DDL

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

select get_ddl('table', '{SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_pd_nyc_tlc_green_trip_record_t'); 

#### Step 7: Inspect SYSTEM\$GET_ICEBERG_TABLE_INFORMATION details
<u>References</u>
- Snowflake Documentation:
  - [SYSTEM$GET_ICEBERG_TABLE_INFORMATION](https://docs.snowflake.com/en/sql-reference/functions/system_get_iceberg_table_information)

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select system$get_iceberg_table_information('{SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_pd_nyc_tlc_green_trip_record_t');

#### Step 8: (Optional) Cleanup
Execute the cell below to drop the Snowflake objects created in this exercise.

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

drop iceberg table if exists {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_pd_nyc_tlc_green_trip_record_t;
drop stage if exists {SNOW_USER}_db.iceberg.{SNOW_USER}_pd_cat_int;
drop integration if exists {SNOW_USER}_iceberg_lab_no_enc_ext_vol;

### Exercise 4: Read Spark Iceberg tables in Snowflake

#### Step 1: Create a Spark database
<u>References</u>
- Spark Documentation:
  - [CREATE DATABASE](https://spark.apache.org/docs/3.4.2/sql-ref-syntax-ddl-create-database.html#content)

In [None]:
%%sparksql

create database if not exists demo;

#### Step 2: Create the Spark Iceberg tables
<u>References</u>
- Iceberg Documentation:
  - [CREATE TABLE](https://iceberg.apache.org/docs/latest/spark-ddl/#create-table)

SPARK ICEBERG TABLE (SIMPLE DATA TYPES)

In [None]:
%%sparksql

CREATE TABLE IF NOT EXISTS demo.nyc.taxi_simple_types (
    VendorID              bigint,
    tpep_pickup_datetime  timestamp,
    tpep_dropoff_datetime timestamp,
    passenger_count       double,
    trip_distance         double,
    RatecodeID            double,
    store_and_fwd_flag    string,
    PULocationID          bigint,
    DOLocationID          bigint,
    payment_type          bigint,
    fare_amount           double,
    extra                 double,
    mta_tax               double,
    tip_amount            double,
    tolls_amount          double,
    improvement_surcharge double,
    total_amount          double,
    congestion_surcharge  double,
    airport_fee           double
)
USING iceberg
PARTITIONED BY (days(tpep_pickup_datetime))
;

SPARK ICEBERG TABLE (COMPLEX DATA TYPES)

In [None]:
%%sparksql

CREATE TABLE IF NOT EXISTS demo.nyc.taxi_complex_types (
    tpep_pickup_datetime timestamp,
    tpep_dropoff_datetime timestamp,
    data struct<
        VendorID                       : bigint,
        tpep_pickup_datetime_as_epoch  : bigint,
        tpep_dropoff_datetime_as_epoch : bigint,
        passenger_count                : double,
        trip_distance                  : double,
        RatecodeID                     : double,
        store_and_fwd_flag             : string,
        PULocationID                   : bigint,
        DOLocationID                   : bigint,
        payment_type                   : bigint,
        fare_amount                    : double,
        extra                          : double,
        mta_tax                        : double,
        tip_amount                     : double,
        tolls_amount                   : double,
        improvement_surcharge          : double,
        total_amount                   : double,
        congestion_surcharge           : double,
        airport_fee                    : double>
)
USING iceberg
PARTITIONED BY (days(tpep_pickup_datetime))
;

#### Step 3: Load the Spark Iceberg tables
<u>References<u>
- Spark Documentation:
  - [pyspark.sql.DataFrameReader.parquet](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.parquet.html) 
  - [pyspark.sql.DataFrameWriter.saveAsTable](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.saveAsTable.html)

SPARK ICEBERG TABLE (SIMPLE DATA TYPES)

In [None]:
for filename in [
    "yellow_tripdata_2023-11.parquet",
    "yellow_tripdata_2023-10.parquet",
    "yellow_tripdata_2023-09.parquet"
]:
    df = spark.read.parquet(f"/home/spark/data/{filename}")
    df.write.mode("append").saveAsTable("demo.nyc.taxi_simple_types")

SPARK ICEBERG TABLE (COMPLEX DATA TYPES)

In [None]:
from pyspark.sql.functions import struct, unix_timestamp

for filename in [
    "yellow_tripdata_2023-11.parquet",
    "yellow_tripdata_2023-10.parquet",
    "yellow_tripdata_2023-09.parquet",
]:
    df = spark.read.parquet(f"/home/spark/data/{filename}")
    df = df.withColumn("tpep_pickup_datetime_as_epoch", unix_timestamp("tpep_pickup_datetime"))
    df = df.withColumn("tpep_dropoff_datetime_as_epoch", unix_timestamp("tpep_dropoff_datetime"))
    df = df.withColumn("data", struct(
                                    "VendorID"
                                    , "tpep_pickup_datetime_as_epoch"
                                    , "tpep_dropoff_datetime_as_epoch"
                                    , "passenger_count"
                                    , "trip_distance"
                                    , "RatecodeID"
                                    , "store_and_fwd_flag"
                                    , "PULocationID"
                                    , "DOLocationID"
                                    , "payment_type"
                                    , "fare_amount"
                                    , "extra"
                                    , "mta_tax"
                                    , "tip_amount"
                                    , "tolls_amount"
                                    , "improvement_surcharge"
                                    , "total_amount"
                                    , "congestion_surcharge"
                                    , "airport_fee"))
    df = df.select("tpep_pickup_datetime", "tpep_dropoff_datetime", "data")
    df.write.mode("append").saveAsTable("demo.nyc.taxi_complex_types")

#### Step 4: Verify the Spark Iceberg tables contain data

SPARK ICEBERG TABLE (SIMPLE DATA TYPES)

In [None]:
%%sparksql

select count(*) from demo.nyc.taxi_simple_types;

In [None]:
%%sparksql

select * from demo.nyc.taxi_simple_types limit 25;

SPARK ICEBERG TABLE (COMPLEX DATA TYPES)

In [None]:
%%sparksql

select count(*) from demo.nyc.taxi_complex_types;

In [None]:
%%sparksql

select * from demo.nyc.taxi_complex_types limit 25;

#### Step 5: Create Snowflake Iceberg tables on the Spark Iceberg tables (unmanaged)

SPARK ICEBERG TABLE (SIMPLE TYPES)

In [None]:
# Replace <SPARK_ICEBERG_TAXI_SIMPLE_TYPES_TABLE_METADATA_FILE> with the name of latest Spark Iceberg table metadata file in the 
# applicable path from your us-west-2 bucket (HINT: Filter for the most current timestamp)
SPARK_ICEBERG_TAXI_SIMPLE_TYPES_TABLE_METADATA_FILE='<SPARK_ICEBERG_TAXI_SIMPLE_TYPES_TABLE_METADATA_FILE>'

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

create or replace iceberg table {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_spark_taxi_simple_types
    external_volume = {SNOW_USER}_iceberg_lab_ext_vol
    catalog = {SNOW_USER}_os_cat_int
    metadata_file_path = 'spark/warehouse/demo/nyc/taxi_simple_types/metadata/{SPARK_ICEBERG_TAXI_SIMPLE_TYPES_TABLE_METADATA_FILE}'
;

SPARK ICEBERG TABLE (COMPLEX TYPES)

In [None]:
# Replace <SPARK_ICEBERG_TAXI_COMPLEX_TYPES_TABLE_METADATA_FILE> with the name of latest Spark Iceberg table metadata file in the 
# applicable path from your us-west-2 bucket (HINT: Filter for the most current timestamp)
SPARK_ICEBERG_TAXI_COMPLEX_TYPES_TABLE_METADATA_FILE='<SPARK_ICEBERG_TAXI_COMPLEX_TYPES_TABLE_METADATA_FILE>'

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

create or replace iceberg table {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_spark_taxi_complex_types
    external_volume = {SNOW_USER}_iceberg_lab_ext_vol
    catalog = {SNOW_USER}_os_cat_int
    metadata_file_path = 'spark/warehouse/demo/nyc/taxi_complex_types/metadata/{SPARK_ICEBERG_TAXI_COMPLEX_TYPES_TABLE_METADATA_FILE}'
;

#### Step 6: Verify the Snowflake Iceberg tables contain data

SPARK ICEBERG TABLE (SIMPLE TYPES)

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select
    count(*)
from {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_spark_taxi_simple_types
where true
;

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select
    *
from {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_spark_taxi_simple_types
where true
limit 25
;

SPARK ICEBERG TABLE (COMPLEX TYPES)

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select
    count(*)
from {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_spark_taxi_complex_types
where true
;

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select
    *
from {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_spark_taxi_complex_types
where true
limit 25
;

#### Step 7: Inspect the Snowflake Iceberg table DDL

SPARK ICEBERG TABLE (SIMPLE TYPES)

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

select get_ddl('table', '{SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_spark_taxi_simple_types');

SPARK ICEBERG TABLE (COMPLEX TYPES)

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

select get_ddl('table', '{SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_spark_taxi_complex_types');

#### Step 8: Inspect SYSTEM\$GET_ICEBERG_TABLE_INFORMATION details
<u>References</u>
- Snowflake Documentation:
  - [SYSTEM$GET_ICEBERG_TABLE_INFORMATION](https://docs.snowflake.com/en/sql-reference/functions/system_get_iceberg_table_information)

SPARK ICEBERG TABLE (SIMLE TYPES)

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select system$get_iceberg_table_information('{SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_spark_taxi_simple_types');

SPARK ICEBERG TABLE (COMPLEX TYPES)

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select system$get_iceberg_table_information('{SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_spark_taxi_complex_types');

#### Step 9: Cleanup (Optional)

Execute the cells below to drop the Snowflake objects created in this exercise.

In [None]:
%%sparksql

drop table demo.nyc.taxi_simple_types;

In [None]:
%%sparksql

drop table demo.nyc.taxi_complex_types;

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

drop iceberg table if exists {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_spark_taxi_simple_types;
drop iceberg table if exists {SNOW_USER}_db.iceberg.{SNOW_USER}_unmanaged_spark_taxi_complex_types;

### Exercise 5: Read a managed Snowflake Iceberg table in Spark

#### Step 1: Create the Snowflake Iceberg table (managed)
<u>References</u>
- Snowflake Documentation:
  - [CREATE ICEBERG TABLE](https://docs.snowflake.com/sql-reference/sql/create-iceberg-table)

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

create or replace iceberg table {SNOW_USER}_db.iceberg.{SNOW_USER}_managed_customer_t (
    c_custkey integer,
    c_name string,
    c_address string,
    c_nationkey integer,
    c_phone string,
    c_acctbal integer,
    c_mktsegment string,
    c_comment string
)
catalog = snowflake
external_volume = {SNOW_USER}_iceberg_lab_ext_vol
base_location = 'iceberg_feature_lab/managed/tpch_sf10/customer/data/'
;

#### Step 2: Load the Snowflake Iceberg table

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

insert into {SNOW_USER}_db.iceberg.{SNOW_USER}_managed_customer_t
    select * from snowflake_sample_data.tpch_sf10.customer
;

#### Step 3: Verify the Snowflake Iceberg table contains data

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select
    count(*)
from {SNOW_USER}_db.iceberg.{SNOW_USER}_managed_customer_t
where true
;

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select
    *
from {SNOW_USER}_db.iceberg.{SNOW_USER}_managed_customer_t
where true
limit 25
;

#### Step 4: Inspect the Snowflake Iceberg table DDL

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

select get_ddl('table', '{SNOW_USER}_db.iceberg.{SNOW_USER}_managed_customer_t');

#### Step 5: Inspect SYSTEM\$GET_ICEBERG_TABLE_INFORMATION details
<u>References</u>
- Snowflake Documentation:
  - [SYSTEM$GET_ICEBERG_TABLE_INFORMATION](https://docs.snowflake.com/en/sql-reference/functions/system_get_iceberg_table_information)

In [None]:
%%sql

use role support_rl;
use warehouse {SNOW_USER}_wh;
use {SNOW_USER}_db.iceberg;

select system$get_iceberg_table_information('{SNOW_USER}_db.iceberg.{SNOW_USER}_managed_customer_t');

#### Step 6: Read the managed Snowflake Iceberg table from Spark

In [None]:
spark.catalog.setCurrentCatalog("snowflake_catalog")
spark.sql(f"select * from {SNOW_USER}_db.iceberg.{SNOW_USER}_managed_customer_t").limit(25).show()

#### Step 7: (Optional) Cleanup
Execute the cell below to drop the Snowflake objects created in this exercise.

In [None]:
%%sql

use role support_rl;
use {SNOW_USER}_db.iceberg;

drop iceberg table if exists {SNOW_USER}_db.iceberg.{SNOW_USER}_managed_customer_t;