# Setup

In [1]:
# h/t https://www.dremio.com/blog/getting-hands-on-with-polaris-oss-apache-iceberg-and-apache-spark/
import pyspark
from pyspark.sql import SparkSession
import os

## DEFINE SENSITIVE VARIABLES
POLARIS_URI = 'http://polaris:8181/api/catalog'
POLARIS_CATALOG_NAME = 'polariscatalog'
POLARIS_CREDENTIALS = 'root:secret'
POLARIS_SCOPE = 'PRINCIPAL_ROLE:ALL'

In [2]:
conf = (
    pyspark.SparkConf()
        .setAppName('lakehouse')
  		#packages
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1,org.apache.hadoop:hadoop-aws:3.4.0')
  		#SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
  		#Configuring Catalog
        .set('spark.sql.catalog.polaris', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.polaris.warehouse', POLARIS_CATALOG_NAME)
        .set('spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation', 'true')
        .set('spark.sql.catalog.polaris.catalog-impl', 'org.apache.iceberg.rest.RESTCatalog')
        .set('spark.sql.catalog.polaris.uri', POLARIS_URI)
        .set('spark.sql.catalog.polaris.credential', POLARIS_CREDENTIALS)
        .set('spark.sql.catalog.polaris.scope', POLARIS_SCOPE)
        .set('spark.sql.catalog.polaris.token-refresh-enabled', 'true')
        .set('spark.sql.defaultCatalog', 'polaris')
        # S3 Configuration
        .set('spark.hadoop.fs.s3.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
        .set('spark.hadoop.fs.AbstractFileSystem.s3.impl', 'org.apache.hadoop.fs.s3a.S3A')
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

Spark Running


25/07/15 17:30:03 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
# # Run this for debug of the config
# for item in spark.sparkContext.getConf().getAll():
#     print(f"{item[0]}: {item[1]}")

Expect the following warning the first time that you run something:

```
WARN RESTSessionCatalog: Iceberg REST client is missing the OAuth2 server URI configuration[…]
```

# Create database and table

In [3]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS polaris.lakehouse")
spark.sql("SHOW DATABASES IN polaris").show()



+---------+
|namespace|
+---------+
|lakehouse|
+---------+



In [4]:
%%sql
USE polaris.lakehouse;

25/07/15 17:30:17 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [5]:
%%sql
CREATE TABLE
  IF NOT EXISTS customers (
    customer_id BIGINT,
    first_name VARCHAR(255),
    last_name VARCHAR(255),
    email VARCHAR(255)
  );

# Insert a row

In [6]:
%%sql
INSERT INTO
  customers (customer_id, first_name, last_name, email)
VALUES
  (1, 'Rey', 'Skywalker', 'rey@rebelscum.org');

                                                                                

In [7]:
for i in range(5):
    spark.sql("""
        INSERT INTO customers (customer_id, first_name, last_name, email) 
        VALUES (1, 'Rey', 'Skywalker', 'rey@rebelscum.org')
    """)

# Examine metadata

See https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables

In [8]:
%%sql

SELECT COUNT(*) FROM customers;

count(1)
6


In [9]:
%%sql

SELECT COUNT(*), SUM(record_count) FROM polaris.lakehouse.customers.files;

count(1),sum(record_count)
6,6


In [10]:
%%sql
SELECT
  file_path,   record_count,   file_size_in_bytes
FROM
  polaris.lakehouse.customers.files;

file_path,record_count,file_size_in_bytes
s3://warehouse/lakehouse/customers/data/00000-5-a741261a-ad83-42a8-a862-7c2443459275-0-00001.parquet,1,1367
s3://warehouse/lakehouse/customers/data/00000-4-dddf2cca-64ab-4fb1-8fa6-529f984b7efb-0-00001.parquet,1,1367
s3://warehouse/lakehouse/customers/data/00000-3-4db9519f-001d-45c5-ba33-25c740ea4832-0-00001.parquet,1,1367
s3://warehouse/lakehouse/customers/data/00000-2-b9cbcc53-3771-4a91-8b2e-3b959544efda-0-00001.parquet,1,1367
s3://warehouse/lakehouse/customers/data/00000-1-4b88846f-a81b-4c91-9cc4-bf588607c59a-0-00001.parquet,1,1367
s3://warehouse/lakehouse/customers/data/00000-0-34a3b148-a850-4c33-8003-53192f171753-0-00001.parquet,1,1367


In [11]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define schema
schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True)
])

# Create data for 10 identical rows
data = [(1, 'Rey', 'Skywalker', 'rey@rebelscum.org')] * 10

# Create DataFrame and insert
df = spark.createDataFrame(data, schema)
df.write.mode("append").insertInto("customers")

In [12]:
%%sql
    INSERT INTO customers (customer_id, first_name, last_name, email) 
VALUES (1, 'Rey', 'Skywalker', 'rey@rebelscum.org'),
       (2, 'Hermione', 'Granger', 'hermione@hogwarts.edu'),
       (3, 'Tony', 'Stark', 'tony@starkindustries.com');

# Insert a second row

In [13]:
%%sql
INSERT INTO
  customers (customer_id, first_name, last_name, email)
VALUES
  (2, 'Hermione', 'Granger', 'leviosaaaaa@hogwarts.edu');


# Examine metadata

See https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables

In [14]:
%%sql
SELECT
  file_path,
  record_count,
  file_size_in_bytes
FROM
  polaris.lakehouse.customers.files;

file_path,record_count,file_size_in_bytes
s3://warehouse/lakehouse/customers/data/00000-30-6e4120ad-2356-413d-b2ee-feceb4042258-0-00001.parquet,1,1438
s3://warehouse/lakehouse/customers/data/00000-27-a5adea9c-f5e1-4440-be9b-8fb866203a74-0-00001.parquet,1,1367
s3://warehouse/lakehouse/customers/data/00001-28-a5adea9c-f5e1-4440-be9b-8fb866203a74-0-00001.parquet,1,1416
s3://warehouse/lakehouse/customers/data/00002-29-a5adea9c-f5e1-4440-be9b-8fb866203a74-0-00001.parquet,1,1396
s3://warehouse/lakehouse/customers/data/00001-14-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00002-15-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00004-17-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00005-18-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00006-19-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00008-21-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407


# Rewrite data files

In [15]:
%%sql
SELECT COUNT(*) AS data_file_ct, SUM(record_count) AS record_ct FROM polaris.lakehouse.customers.files;

data_file_ct,record_ct
20,20


In [None]:
%%sql
CALL polaris.system.rewrite_data_files 
    (table => 'lakehouse.customers',
      options => map ('rewrite-all', 'true')
)



rewritten_data_files_count,added_data_files_count,rewritten_bytes_count,failed_data_files_count
6,1,7757,0


In [16]:
%%sql
SELECT
  file_path,
  record_count,
  file_size_in_bytes
FROM
  polaris.lakehouse.customers.files;

file_path,record_count,file_size_in_bytes
s3://warehouse/lakehouse/customers/data/00000-30-6e4120ad-2356-413d-b2ee-feceb4042258-0-00001.parquet,1,1438
s3://warehouse/lakehouse/customers/data/00000-27-a5adea9c-f5e1-4440-be9b-8fb866203a74-0-00001.parquet,1,1367
s3://warehouse/lakehouse/customers/data/00001-28-a5adea9c-f5e1-4440-be9b-8fb866203a74-0-00001.parquet,1,1416
s3://warehouse/lakehouse/customers/data/00002-29-a5adea9c-f5e1-4440-be9b-8fb866203a74-0-00001.parquet,1,1396
s3://warehouse/lakehouse/customers/data/00001-14-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00002-15-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00004-17-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00005-18-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00006-19-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00008-21-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407


In [17]:
%%sql
SELECT
  file_path,
  record_count,
  file_size_in_bytes
FROM
  polaris.lakehouse.customers.all_data_files;

file_path,record_count,file_size_in_bytes
s3://warehouse/lakehouse/customers/data/00000-1-4b88846f-a81b-4c91-9cc4-bf588607c59a-0-00001.parquet,1,1367
s3://warehouse/lakehouse/customers/data/00001-14-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00002-15-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00004-17-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00005-18-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00006-19-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00008-21-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00009-22-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00011-24-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407
s3://warehouse/lakehouse/customers/data/00012-25-1406b9fb-a6d7-4b6c-8492-15c832d3bf3f-0-00001.parquet,1,1407


## Expire snapshots

In [None]:
%%sql

SELECT committed_at, snapshot_id, parent_id, manifest_list FROM polaris.lakehouse.customers.snapshots;

committed_at,snapshot_id,parent_id,manifest_list
2025-07-14 14:37:31.531000,2403840741999442414,,s3://warehouse/rmoff/customers/metadata/snap-2403840741999442414-1-007b3a78-a267-43e5-aee1-1d8f3e5a147d.avro
2025-07-14 14:37:38.926000,3830932525036690208,2.4038407419994424e+18,s3://warehouse/rmoff/customers/metadata/snap-3830932525036690208-1-8dab0845-7f6b-4ec4-b417-26c17d993add.avro
2025-07-14 14:37:39.916000,6409867327989167022,3.8309325250366894e+18,s3://warehouse/rmoff/customers/metadata/snap-6409867327989167022-1-7b5585b8-fad2-450b-b08f-fa9f277662b6.avro
2025-07-14 14:37:40.921000,325546929694535411,6.409867327989167e+18,s3://warehouse/rmoff/customers/metadata/snap-325546929694535411-1-3a105c0c-dd60-4c91-9042-2d0f7925bbcd.avro
2025-07-14 14:37:41.808000,269825382665437490,3.255469296945354e+17,s3://warehouse/rmoff/customers/metadata/snap-269825382665437490-1-d3f8a73b-df91-4b8f-b585-e6f5237c9601.avro
2025-07-14 14:37:42.757000,129306070246549703,2.6982538266543747e+17,s3://warehouse/rmoff/customers/metadata/snap-129306070246549703-1-5f52fc1a-ac1b-457b-b52f-fa6bed1a233d.avro
2025-07-14 14:40:11.290000,8861050211953882166,1.293060702465497e+17,s3://warehouse/rmoff/customers/metadata/snap-8861050211953882166-1-ad6e5d20-590f-40ad-b73a-fca12f412274.avro
2025-07-14 14:42:07.828000,2371922233042001406,8.861050211953881e+18,s3://warehouse/rmoff/customers/metadata/snap-2371922233042001406-1-523344c2-1dbe-4c37-be8c-c56981c0e536.avro
2025-07-14 14:42:08.031000,5882833294520864762,2.3719222330420014e+18,s3://warehouse/rmoff/customers/metadata/snap-5882833294520864762-1-1a7f61bc-4c4f-48bd-8017-591b84723e39.avro
2025-07-14 14:42:08.230000,2961764211154500616,5.882833294520865e+18,s3://warehouse/rmoff/customers/metadata/snap-2961764211154500616-1-f971d471-9325-4650-842b-52831cd4dab6.avro


In [None]:
%%sql

CALL polaris.system.expire_snapshots(table => 'lakehouse.customers',
                                                older_than => TIMESTAMP '2025-07-11 12:40:00.000')

In [None]:
%%sql

SELECT committed_at, snapshot_id, parent_id, manifest_list FROM polaris.lakehouse.customers.snapshots;

## Remove orphan files

https://github.com/apache/polaris/issues/214
https://github.com/pavibhai/polaris/commit/09de4c8fefd6894cca14298df389459999cdc58a

In [None]:
%%sql
    
    CALL polaris.system.remove_orphan_files(table => 'lakehouse.customers', 
                                            dry_run => true)

```sql
trino:lakehouse> SET SESSION iceberg.remove_orphan_files_min_retention = '10s';
SET SESSION
trino:lakehouse> ALTER TABLE customers EXECUTE remove_orphan_files(retention_threshold => '1s');

Query 20250711_135921_00014_4hdju, FAILED, 1 node
Splits: 1 total, 0 done (0.00%)
0.07 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20250711_135921_00014_4hdju failed: Retention specified (1.00s) is shorter than the minimum retention configured in the system (10.00s). Minimum retention can be chan
ged with iceberg.remove-orphan-files.min-retention configuration property or iceberg.remove_orphan_files_min_retention session property

trino:lakehouse> ALTER TABLE customers EXECUTE remove_orphan_files(retention_threshold => '15s');
ALTER TABLE EXECUTE
 rows
------
(0 rows)

Query 20250711_135924_00015_4hdju, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.10 [0 rows, 0B] [0 rows/s, 0B/s]

trino:lakehouse>
```

# Time travel

In [None]:
%%sql

SELECT * FROM customers 

In [None]:
%%sql

SELECT * FROM customers VERSION AS OF 8348506506333219028

In [None]:
%%sql

SELECT * from polaris.lakehouse.customers.manifests;

In [None]:
%%sql

SELECT * FROM polaris.lakehouse.customers.manifests TIMESTAMP AS OF (NOW() - INTERVAL 30 SECONDS);


## Manifests 

In [None]:
%%sql

SELECT * from polaris.lakehouse.customers.manifests;

## Snapshots

In [None]:
%%sql

SELECT * from polaris.lakehouse.customers.snapshots;

In [None]:
%%sql

SELECT snapshot_id, manifest_list from polaris.lakehouse.customers.snapshots 
    WHERE committed_at > NOW() - INTERVAL 30 MINUTES 
    ORDER BY committed_at ASC LIMIT 1

# metadata

In [None]:
%%sql

SELECT * from polaris.lakehouse.customers.metadata_log_entries;

In [None]:
%%sql

select
    h.made_current_at,
    s.operation,
    h.snapshot_id,
    h.is_current_ancestor
from polaris.lakehouse.customers.history h
join polaris.lakehouse.customers.snapshots s
  on h.snapshot_id = s.snapshot_id
order by made_current_at;

made_current_at,operation,snapshot_id,is_current_ancestor
2025-07-14 14:37:31.531000,append,2403840741999442414,True
2025-07-14 14:37:38.926000,append,3830932525036690208,True
2025-07-14 14:37:39.916000,append,6409867327989167022,True
2025-07-14 14:37:40.921000,append,325546929694535411,True
2025-07-14 14:37:41.808000,append,269825382665437490,True
2025-07-14 14:37:42.757000,append,129306070246549703,True
2025-07-14 14:40:11.290000,replace,8861050211953882166,True
2025-07-14 14:42:07.828000,append,2371922233042001406,True
2025-07-14 14:42:08.031000,append,5882833294520864762,True
2025-07-14 14:42:08.230000,append,2961764211154500616,True


# Delete

In [None]:
%%sql

DELETE FROM customers WHERE customer_id=1;

In [None]:
%%sql
SELECT
    content,
  file_path,
  record_count,
  file_size_in_bytes
FROM
  polaris.lakehouse.customers.files;

In [None]:
%%sql
SELECT
    content,
  file_path,
  record_count,
  file_size_in_bytes
FROM
  polaris.lakehouse.customers.all_data_files;

# Update

In [None]:
%%sql

SELECT * FROM customers

In [None]:
%%sql

UPDATE customers SET first_name = 'Bob' where first_name = 'Hermione';

# Add column

In [None]:
%%sql
ALTER TABLE customers DROP COLUMN phone

In [None]:
%%sql
INSERT INTO customers VALUES (3,'FRED','BLOGGS','fred@hotspot.com','0000 1234 1234');

In [None]:
%%sql
SELECT * FROM customers;

In [None]:
%%sql
SELECT * FROM customers TIMESTAMP AS OF (NOW() - INTERVAL 30 MINUTES);

In [None]:
%%sql
SELECT * FROM customers VERSION AS OF 7334416300196800603;

In [None]:
%%sql
SELECT * FROM customers;

In [None]:
%%sql
INSERT INTO customers (customer_id, first_name, last_name, email) 
VALUES        (3, 'Tony', 'Stark', 'tony@starkindustries.com');

In [None]:
%%sql
DELETE FROM customers;

In [None]:
%%sql
    CALL polaris.system.rollback_to_snapshot('lakehouse.customers', 707189909035517389)