In [1]:
from utils import get_spark_session
import os

spark = get_spark_session("iceberg_DDL")

:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9fbd5004-e8b4-4f5a-8c21-c432b1837e7f;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.1 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.95.0 in central
	found software.amazon.awssdk#bundle;2.17.178 in central
	found software.amazon.eventstream#eventstream;1.0.1 in central
	found software.amazon.awssdk#url-connection-client;2.17.178 in central
	found software.amazon.awssdk#utils;2.17.178 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found softwa

# Spark DDL

- To use Iceberg in Spark, first users need to configure **Spark Catalogs**.
- Iceberg uses **Apache Spark's DataSourceV2 API** for data source and catalog implementations;

#### References

- https://iceberg.apache.org/docs/nightly/spark-ddl/

## 1. CREATE TABLE

To create an Iceberg Table use `USING ICEBERG`;

```sql
CREATE TABLE nessie.dev.sample (
    id bigint NOT NULL COMMENT 'unique id',
    data string,
    category string)
USING iceberg
PARTITIONED BY (category);
```

### 1.1. PARTITIONED BY

- PARTITIONED BY clause supports transform expressions to create **hidden partitions**.
- 
```sql
CREATE TABLE nessie.dev.sample (
    id bigint NOT NULL COMMENT 'unique id',
    data string,
    category string)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category);
```

- Supported transformations are:
    - `year(ts)`: partition by year;
    - `month(ts)`: partition by month;
    - `day(ts) or date(ts)`: equivalent to dateint partitioning;
    - `hour(ts) or date_hour(ts)`: equivalent to dateint and hour partitioning;
    - `bucket(N, col)`: partition by hashed value mod N buckets;
    - `truncate(L, col)`: partition by value truncated to L;
        - String are truncated to the given length;
        - Integers and longs truncate to bins;

### 1.2. CREATE TABLE AS SELECT

- Iceberg supports CTAS as an atomic operation when using a `SparkCatalog`.
- CTAS is supported, but is not atomic when using `SparkSessionCatalog`.

```sql
CREATE TABLE catalog.db.sample
USING ICEBERG
PARTITIONED BY (partition)
TBLPROPERTIES ('key'='value')
AS SELECT ...
```

### 1.3. DDL CREATE Statements

- With iceberg is possible to `CREATE`, `REPLACE`, `CREATE OR REPLACE` and `CREATE IF NOT EXISTS` actions

In [4]:
TABLE_NAME = "nessie.dev.ddl_test"

spark.sql(f"""
CREATE OR REPLACE TABLE {TABLE_NAME} (
    id BIGINT              NOT NULL COMMENT 'unique id',
    name STRING            NOT NULL COMMENT 'client name',
    age INT                NOT NULL COMMENT 'client age',
    salary DOUBLE                   COMMENT 'last salary',
    purchase_date STRING            COMMENT 'last purchase date')
    
USING ICEBERG
PARTITIONED BY (purchase_date)""")

spark.sql("SHOW TABLES IN nessie").show()
spark.table(TABLE_NAME).printSchema()

+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|      dev|    ddl_test|      false|
|      dev|partitioning|      false|
|      dev|     table_1|      false|
|      dev|     table_2|      false|
|      dev|   third_ice|      false|
+---------+------------+-----------+

root
 |-- id: long (nullable = false)
 |-- name: string (nullable = false)
 |-- age: integer (nullable = false)
 |-- salary: double (nullable = true)
 |-- purchase_date: string (nullable = true)



## 2. DROP TABLE

- Prior to version 0.14, **DROP TABLE only remove the table from the catalog**;
- In order to drop table contents use DROP TABLE PURGE;

```sql
DROP TABLE catalog.db.sample;
DROP TABLE catalog.db.sample PURGE;
```

## 3. ALTER TABLE

### 3.1. ALTER TABLE without SQL Extensions
Iceberg has full ALTER TABLE support in Spark 3, including:

- Renaming table;
- Setting or removing table properties;
- Adding, deleting, and renaming columns;
- Adding, deleting, and renaming nested struct fields;
- Reordering top-level columns and nested struct fields;
- Widening the type of int, float, and decimal fileds;
- Marking required columns optional.

#### 3.1. Renaming the table
```sql
ALTER TABLE catalog.db.sample RENAME TO catalog.db.new_sample;
```

#### 3.2. Changing TBLPROPERTIES
```sql
ALTER TABLE catalog.db.sample SET TBLPROPERTIES ('read.split.target-size'='268435456');
ALTER TABLE catalog.db.sample UNSET TBLPROPERTIES ('read.split.target-size');
```

#### 3.3. Adding new columns
```sql
ALTER TABLE catalog.db.sample ADD COLUMNS (new_column string comment 'new column docs');
ALTER TABLE catalog.db.sample ADD COLUMN new_column bigint;
ALTER TABLE catalog.db.sample ADD COLUMN new_column bigint AFTER other_column;
```

#### 3.4. Rename existing column
```sql
ALTER TABLE catalog.db.sample RENAME COLUMN data TO payload;
```

#### 3.5. Drop NOT NULL constraint
```sql

ALTER TABLE catalog.db.sample ALTER COLUMN id DROP NOT NULL;
```

#### 3.6. Droping a column or nested field
```sql
ALTER TABLE catalog.db.sample DROP COLUMN id;
ALTER TABLE catalog.db.sample DROP COLUMN point.z;

```

### 3.2. ALTER TABLE with SQL Extensions

- Iceberg supports adding new partition fields to an spec using `ADD PARTITION FIELD`
- Adding partition field is a metadata operation and does not change any of the existing table data;
- New data will be written with the new partitioning, but existing data will remain in old partition layout;

```sql
ALTER TABLE catalog.db.sample ADD PARTITION FIELD bucket(16, id);
ALTER TABLE catalog.db.sample ADD PARTITION FIELD truncate(4, data);
ALTER TABLE catalog.db.sample ADD PARTITION FIELD year(ts);
ALTER TABLE catalog.db.sample ADD PARTITION FIELD bucket(16, id) AS shard;

ALTER TABLE catalog.db.sample DROP PARTITION FIELD catalog;
ALTER TABLE catalog.db.sample DROP PARTITION FIELD bucket(16, id);
ALTER TABLE catalog.db.sample DROP PARTITION FIELD truncate(4, data);
ALTER TABLE catalog.db.sample DROP PARTITION FIELD year(ts);
ALTER TABLE catalog.db.sample DROP PARTITION FIELD shard;

```

### 3.3. ALTER TABLE WRITE ORDERED BY

Iceberg Tables can be configured with a sort order that is used to automatically sort data that is written to the table in some engines.

```sql

ALTER TABLE catalog.db.sample WRITE ORDERED BY category, id;
ALTER TABLE catalog.db.sample WRITE ORDERED BY category ASC, id DESC;
ALTER TABLE catalog.db.sample WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST;

ALTER TABLE prod.db.sample WRITE LOCALLY ORDERED BY category, id
ALTER TABLE prod.db.sample WRITE UNORDERED
```

## 4. Branching and Tagging DDL

### 4.1. Branches

- Branches can be created at current snapshot via the `CREATE BRANCH` statement;
- Do not fail if the branch already exists with `IF NOT EXISTS`;
- Update the branch if it already exists with `CREATE OR REPLACE`;
- Create a branch at a specific snapshot;
- Create a branch with a specified retention period.

```sql

ALTER TABLE prod.db.sample CREATE BRANCH `audit-branch`
ALTER TABLE prod.db.sample CREATE BRANCH IF NOT EXISTS `audit-branch`
ALTER TABLE prod.db.sample CREATE OR REPLACE BRANCH `audit-branch`
ALTER TABLE prod.db.sample CREATE BRANCH `audit-branch` AS OF VERSION 1234
    
-- CREATE audit-branch at snapshot 1234, retain audit-branch for 30 days, and retain the latest 30 days. The latest 3 snapshot snapshots, and 2 days worth of snapshots.
ALTER TABLE prod.db.sample CREATE BRANCH `audit-branch` AS OF VERSION 1234 RETAIN 30 DAYS WITH SNAPSHOT RETENTION 3 SNAPSHOTS 2 DAYS
```

### 4.2. Tags

# Iceberg TBLPROPERTIES

## File Formats and Compression Types 

## Column Metrics Tracking

- If a table has many columns, tracking the metrics for all columns can get very expensive to your writes;
- You can manage metrics and turn them off for columns where they may not be relevant to your query patterns;

## Parquet Vectorization

- By default parquet vectorization will be turned off.
- This change the behavior of how data is delivered from one row at a time to column batches which can improve performance.
- This should generally be turned on; It defaults to being turned off;

In [None]:
spark.sql("""
CREATE TABLE nessie.dev.table_2 (
  id INT,
  name STRING,
  age INT,
  salary FLOAT,
  last_purchase_date DATE
) USING iceberg TBLPROPERTIES (
  'write.delete.mode'='copy-on-write',
  'write.update.mode'='copy-on-write',
  'write.merge.mode'='copy-on-write',
  'write.format.default'='parquet',
  'write.delete.format.default'='avro',
  'write.parquet.compression-codec'='zstd',
  'write.metadata.delete-after-commit.enabled'='true',
  'write.metadata.previous-versions-max=50,
  
  
  
  
) PARTITIONED BY ( last_purchase_date ) 
""")
spark.sql("DESCRIBE EXTENDED nessie.dev.table_2").show(100)
spark.sql("SHOW TBLPROPERTIES nessie.dev.table_2").show(100)

In [None]:
spark.sql("""
CREATE TABLE nessie.dev.table_2 (
  id INT,
  name STRING,
  age INT,
  salary FLOAT,
  last_purchase_date DATE
) USING iceberg TBLPROPERTIES (
  'write.delete.mode'='copy-on-write',
  'write.update.mode'='copy-on-write',
  'write.merge.mode'='copy-on-write',
  'write.format.default'='parquet',
  'write.delete.format.default'='avro',
  'write.parquet.compression-codec'='zstd',
  'write.metadata.delete-after-commit.enabled'='true',
  'write.metadata.previous-versions-max=50,
  
  
  
  
) PARTITIONED BY ( last_purchase_date ) 
""")
spark.sql("DESCRIBE EXTENDED nessie.dev.table_2").show(100)
spark.sql("SHOW TBLPROPERTIES nessie.dev.table_2").show(100)