# Unity Catalog and Iceberg Demo with Databricks Connect

This notebook demonstrates how to use Databricks Connect to work with Apache Iceberg tables in Unity Catalog. We will use the `samples.nyctaxi.trips` dataset.

**Steps:**
1.  Initialize Databricks Connect.
2.  Set up the catalog and schema.
3.  Read data from the source table (`samples.nyctaxi.trips`).
4.  Create a new managed Iceberg table.
5.  Perform schema evolution.
6.  Use time travel to query historical data.

## 1. Initialize Databricks Connect

This assumes you have configured your environment for Databricks Connect. Typically, this involves:
- Running `databricks configure` and providing your host (`https://e2-demo-field-eng.cloud.databricks.com/`) and a token.
- To use a serverless SQL warehouse, set the `DATABRICKS_CLUSTER_ID` environment variable to your warehouse ID.

In [1]:
from databricks.connect import DatabricksSession
from pyspark.sql.functions import col, current_timestamp

# Initialize the DatabricksSession
spark = DatabricksSession.builder.getOrCreate()

print("DatabricksSession initialized successfully.")
print("Spark version:", spark.version)

DatabricksSession initialized successfully.
Spark version: 3.5.2


## 2. Setup Catalog and Schema

Let's define a catalog and schema for our demo assets.

In [2]:
catalog_name = "rohitb_dais2025_demo"
schema_name = "iceberg_demo"

spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
spark.sql(f"USE SCHEMA {schema_name}")

print(f"Using catalog '{catalog_name}' and schema '{schema_name}'.")

Using catalog 'rohitb_dais2025_demo' and schema 'iceberg_demo'.


## 3. Read from Source Table

We'll read from the `samples.nyctaxi.trips` table provided by Databricks.

In [3]:
source_table = "samples.nyctaxi.trips"
source_df = spark.read.table(source_table)

# Display a few rows to inspect the data
display(source_df.limit(5))

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,trip_distance,fare_amount,pickup_zip,dropoff_zip
0,2016-02-13 21:47:53,2016-02-13 21:57:15,1.4,8.0,10103,10110
1,2016-02-13 18:29:09,2016-02-13 18:37:23,1.31,7.5,10023,10023
2,2016-02-06 19:40:58,2016-02-06 19:52:32,1.8,9.5,10001,10018
3,2016-02-12 19:06:43,2016-02-12 19:20:54,2.3,11.5,10044,10111
4,2016-02-23 10:27:56,2016-02-23 10:58:33,2.6,18.5,10199,10022


## 4. Create and Load a Managed Iceberg Table

Now, we will create a new Iceberg table using a subset of the columns from the source table. We'll also limit the data to 10,000 rows.

In [4]:
iceberg_table_name = "nyc_taxi_trips_iceberg"

# Select a subset of columns and data
taxi_df = source_df.limit(10000)

# Write the data to a new managed Iceberg table
taxi_df.write.format("iceberg").mode("overwrite").saveAsTable(iceberg_table_name)

print(f"Created and loaded Iceberg table: {catalog_name}.{schema_name}.{iceberg_table_name}")

Created and loaded Iceberg table: rohitb_dais2025_demo.iceberg_demo.nyc_taxi_trips_iceberg


In [5]:
# Query the new table to verify
display(spark.table(iceberg_table_name).limit(10))

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,trip_distance,fare_amount,pickup_zip,dropoff_zip
0,2016-02-13 21:47:53,2016-02-13 21:57:15,1.4,8.0,10103,10110
1,2016-02-13 18:29:09,2016-02-13 18:37:23,1.31,7.5,10023,10023
2,2016-02-06 19:40:58,2016-02-06 19:52:32,1.8,9.5,10001,10018
3,2016-02-12 19:06:43,2016-02-12 19:20:54,2.3,11.5,10044,10111
4,2016-02-23 10:27:56,2016-02-23 10:58:33,2.6,18.5,10199,10022
5,2016-02-13 00:41:43,2016-02-13 00:46:52,1.4,6.5,10023,10069
6,2016-02-18 23:49:53,2016-02-19 00:12:53,10.4,31.0,11371,10003
7,2016-02-18 20:21:45,2016-02-18 20:38:23,10.15,28.5,11371,11201
8,2016-02-03 10:47:50,2016-02-03 11:07:06,3.27,15.0,10014,10023
9,2016-02-19 01:26:39,2016-02-19 01:40:01,4.42,15.0,10003,11222


In [9]:
display(spark.sql(f"describe table extended {iceberg_table_name}"))

Unnamed: 0,col_name,data_type,comment
0,tpep_pickup_datetime,timestamp,
1,tpep_dropoff_datetime,timestamp,
2,trip_distance,double,
3,fare_amount,double,
4,pickup_zip,int,
5,dropoff_zip,int,
6,last_updated_ts,timestamp,
7,,,
8,# Delta Statistics Columns,,
9,Column Names,"col-4, col-1, col-5, col-2, col-6, col-3",


## 5. Schema Evolution

Let's add a new column `last_updated_ts` to track when the record was last modified.

In [6]:
spark.sql(f"ALTER TABLE {iceberg_table_name} ADD COLUMN last_updated_ts TIMESTAMP")

print("Schema evolved. Added column 'last_updated_ts'.")

# Verify the new schema
display(spark.sql(f"DESCRIBE TABLE {iceberg_table_name}"))

Schema evolved. Added column 'last_updated_ts'.


Unnamed: 0,col_name,data_type,comment
0,tpep_pickup_datetime,timestamp,
1,tpep_dropoff_datetime,timestamp,
2,trip_distance,double,
3,fare_amount,double,
4,pickup_zip,int,
5,dropoff_zip,int,
6,last_updated_ts,timestamp,


## 6. Time Travel

We can query the table's history and view the data as it was before the schema change.

In [7]:
# View the table history
display(spark.sql(f"DESCRIBE HISTORY {iceberg_table_name}"))

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,1,2025-06-21 21:25:19.692,3124585922042689,rohit.bhagwat@databricks.com,ADD COLUMNS,"{'columns': '[{""column"":{""name"":""last_updated_ts"",""type"":""timestamp"",""nullable"":true,""metadata"":{}}}]'}",,,0621-212228-40wysa0e-v2n,0.0,WriteSerializable,True,{},,Databricks-Runtime/16.4.x-photon-scala2.12
1,0,2025-06-21 21:24:58.004,3124585922042689,rohit.bhagwat@databricks.com,CREATE OR REPLACE TABLE AS SELECT,"{'partitionBy': '[]', 'clusterBy': '[]', 'description': None, 'isManaged': 'true', 'properties': '{""delta.enableIcebergCompatV2"":""true"",""write.metadata.path"":""s3://databricks-e2demofieldengwest/b169b504-4c54-49f2-bc3a-adf4b128f36d/tables/6df43aae-d1b8-4ce9-90e2-5324b2bf8668/_iceberg/metadata"",""delta.universalFormat.enabledFormats"":""iceberg"",""write.parquet.compression-codec"":""zstd"",""delta.enableIcebergWriterCompatV1"":""true"",""write.summary.partition-limit"":""100"",""write.wap.enabled"":""false"",""delta.enableTypeWidening"":""true"",""write.metadata.compression-codec"":""gzip"",""delta.checkpointPolicy"":""v2"",""write.object-storage.enabled"":""true"",""delta.columnMapping.mode"":""id"",""delta.columnMapping.maxColumnId"":""6"",""history.expire.min-snapshots-to-keep"":""100"",""write.data.path"":""s3://databricks-e2demofieldengwest/b169b504-4c54-49f2-bc3a-adf4b128f36d/tables/6df43aae-d1b8-4ce9-90e2-5324b2bf8668"",""delta.enablemanagedicebergtable"":""true"",""history.expire.max-snapshot-age-ms"":""0"",""gc.enabled"":""false"",""delta.enableInCommitTimestamps"":""true""}', 'statsOnLoad': 'true'}",,,0621-212228-40wysa0e-v2n,,WriteSerializable,False,"{'numFiles': '1', 'numRemovedFiles': '0', 'numRemovedBytes': '0', 'numOutputRows': '10000', 'numOutputBytes': '159058'}",,Databricks-Runtime/16.4.x-photon-scala2.12


In [8]:
# Query the table at version 1 (before the ALTER TABLE statement)
df_v1 = spark.read.option("versionAsOf", 1).table(iceberg_table_name)

print("Schema of table at version 1:")
df_v1.printSchema()

Schema of table at version 1:
root
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_zip: integer (nullable = true)
 |-- dropoff_zip: integer (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)



Notice that the `last_updated_ts` column is not present in the schema of the first version.

## 7. Cleanup

Run the following commands to remove the resources created in this demo.

In [None]:
# spark.sql(f"DROP TABLE IF EXISTS {iceberg_table_name}")
# spark.sql(f"DROP SCHEMA IF EXISTS {schema_name}")
# spark.sql(f"DROP CATALOG IF EXISTS {catalog_name}")
# print("Cleaned up resources.")