![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)

### [Docker, Spark, and Iceberg: The Fastest Way to Try Iceberg!](https://tabular.io/blog/docker-spark-and-iceberg/)

In [1]:
!pip install pyspark-opendic==0.3.17

Collecting pyspark-opendic==0.3.17
  Downloading pyspark_opendic-0.3.17-py3-none-any.whl (9.2 kB)
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Installing collected packages: py4j, pyspark-opendic
Successfully installed py4j-0.10.9.7 pyspark-opendic-0.3.17
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
from pyspark.sql import SparkSession

def read_secret(secret_name):
    """ Get `secret_name` from docker-compose secret store"""
    secret_path = f"/run/secrets/{secret_name}"
    try:
        with open(secret_path, "r") as f:
            return f.read().strip()  # Remove any trailing newline
    except FileNotFoundError:
        print(f"Secret {secret_name} not found.")
        return None


## DEFINE SENSITIVE VARIABLES
POLARIS_CATALOG_NAME = 'polaris'
ENGINEER_CLIENT_ID = read_secret("engineer_client_id")
ENGINEER_CLIENT_SECRET = read_secret("engineer_client_secret")
ADLS_IO="org.apache.iceberg.azure.adlsv2.ADLSFileIO"
FILE_IO="org.apache.iceberg.io.ResolvingFileIO"

def create_session(client_id, client_secret, scope, fileio_impl):
    spark = (SparkSession.builder
        .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.0,software.amazon.awssdk:bundle:2.28.17,software.amazon.awssdk:url-connection-client:2.28.17")
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
        .config("spark.sql.catalog.polaris", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.polaris.type", "rest")
        .config("spark.sql.catalog.polaris.warehouse", POLARIS_CATALOG_NAME)
        .config("spark.sql.catalog.polaris.uri", 'http://polaris:8181/api/catalog')
        .config("spark.sql.catalog.polaris.credential", f"{client_id}:{client_secret}")
        .config("spark.sql.catalog.polaris.scope", 'PRINCIPAL_ROLE:ALL')
        .config("spark.sql.defaultCatalog", "polaris")
        .config("spark.sql.catalogImplementation", "in-memory")
        .config("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp")
        .config("spark.sql.catalog.polaris.token-refresh-enabled", "true")
        .config("spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation", 'vended-credentials')
        .config("spark.sql.catalog.polaris.io-impl", fileio_impl)
        .config("spark.history.fs.logDirectory", "/home/iceberg/spark-events")).getOrCreate()
        
    print("Spark Running")
    return spark


## Start Spark Session
spark = create_session(client_id=ENGINEER_CLIENT_ID, client_secret=ENGINEER_CLIENT_SECRET, scope='PRINCIPAL_ROLE:ALL',fileio_impl=FILE_IO )
spark

Spark Running


25/05/01 10:10:32 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Setting up pyspark-opendic

In [3]:
from pyspark_opendic.catalog import OpenDicCatalog

# Init polarisx catalog
POLARIS_URI= "http://polaris:8181/api"
catalog = OpenDicCatalog(spark, POLARIS_URI)
print("Catalog initialized")

Catalog initialized


In [4]:
catalog.sql("Show namespaces").show()



+---------+
|namespace|
+---------+
|   SYSTEM|
+---------+



In [None]:
%%sql
use SYSTEM

### Define the schema for a andfunc object

In [15]:
catalog.sql(
    """
    DEFINE OPEN function_v2
    props {
        "args": "MAP",
        "language": "STRING",
        "def": "string",
        "comment": "string",
        "packages": "list",
        "runtime": "string",
        "client_version": "int",
        "signature": "STRING",
        "return_type": "STRING"
    }
    """
)

Unnamed: 0,udoType,properties,createdTimestamp,lastUpdatedTimestamp,version
0,function_v2,"{'return_type': 'STRING', 'created_time': 'STRING', 'entity_version': 'STRING', 'uname': 'STRING', 'def': 'STRING', 'signature': 'STRING', 'runtime': 'STRING', 'language': 'STRING', 'packages': 'STRING', 'args': 'STRING', 'last_updated_time': 'STRING', 'comment': 'STRING', 'client_version': 'STRING'}",1970-01-01T00:00Z,1970-01-01T00:00Z,


### Create a new andfunc

In [16]:
catalog.sql(
 """
 CREATE OPEN function_v2 foo
    props {
            "args": {
                "arg1": "int", 
                "arg2": "int"
                },
            "language": "python",
            "def": "def foo(arg1, arg2):\\n\\n        return arg1 + arg2",
            "packages" : ["numpy", "pandas"],
            "comment": "test fun",
            "runtime": "3.12",
            "client_version": 1,
            "return_type": "int",
            "signature": "foo(arg1 str, arg2 int)"
        }
    """
)

Unnamed: 0,type-name,object-name,props,created-time-stamp,last-updated-time-stamp,entity-version
0,function_v2,foo,"{'args': {'arg1': 'int', 'arg2': 'int'}, 'return_type': 'int', 'def': 'def foo(arg1, arg2):  return arg1 + arg2', 'signature': 'foo(arg1 str, arg2 int)', 'runtime': '3.12', 'language': 'python', 'comment': 'test fun', 'packages': ['numpy', 'pandas'], 'client_version': 1}",2025-05-01T10:12:27.933377378Z,2025-05-01T10:12:27.933380669Z,1


### Create Mapping to snowflake

In [7]:
catalog.sql(
    """
    ADD OPEN MAPPING function_v2 PLATFORM snowflake
    SYNTAX {
        "CREATE OR ALTER <type> <name>(<args>)
            RETURNS <return_type>
            LANGUAGE <language>
            RUNTIME = <runtime>
            HANDLER = '<name>'
            AS 
            $$
            <def>
            $$",
    }
    PROPS {
        "args": {
                "propType": "map",
                "format": "<key> <value>",
                "delimiter": ", "
            },
        "packages": {"propType": "list", "format": "'<item>'", "delimiter": ", "}
    }
    """
)

Unnamed: 0,typeName,platformName,syntax,objectDumpMap,createdTimestamp,lastUpdatedTimestamp,version
0,function_v2,snowflake,"""CREATE OR ALTER <type> <name>(<args>)\n RETURNS <return_type>\n LANGUAGE <language>\n RUNTIME = <runtime>\n HANDLER = '<name>'\n AS \n $$\n <def>\n $$"",","{'args': {'propType': 'map', 'format': '<key> <value>', 'delimiter': ', '}, 'packages': {'propType': 'list', 'format': ''<item>'', 'delimiter': ', '}}",2025-05-01T09:44:49.671430555Z,2025-05-01T09:44:49.671434180Z,1


### List objects

In [8]:
catalog.sql(
    """
    SHOW OPEN TYPES
    """
)

Unnamed: 0,udoType,properties,createdTimestamp,lastUpdatedTimestamp,version
0,function,"{'return_type': 'STRING', 'created_time': 'STRING', 'entity_version': 'STRING', 'uname': 'STRING', 'def': 'STRING', 'signature': 'STRING', 'runtime': 'STRING', 'language': 'STRING', 'packages': 'STRING', 'args': 'STRING', 'last_updated_time': 'STRING', 'comment': 'STRING', 'client_version': 'STRING'}",2025-05-01T09:40:03.047Z,2025-05-01T09:40:04.364Z,
1,function_v2,"{'return_type': 'STRING', 'created_time': 'STRING', 'entity_version': 'STRING', 'uname': 'STRING', 'def': 'STRING', 'signature': 'STRING', 'runtime': 'STRING', 'language': 'STRING', 'packages': 'STRING', 'args': 'STRING', 'last_updated_time': 'STRING', 'comment': 'STRING', 'client_version': 'STRING'}",2025-05-01T09:44:46.364Z,2025-05-01T09:44:46.364Z,


In [9]:
catalog.sql(
    """
    SHOW OPEN function_v2
    """
)

Unnamed: 0,type,name,props,createdTimestamp,lastUpdatedTimestamp,entityVersion
0,function_v2,foo,"{'args': {'arg1': 'int', 'arg2': 'int'}, 'return_type': 'int', 'def': 'def foo(arg1, arg2):  return arg1 + arg2', 'signature': 'foo(arg1 str, arg2 int)', 'runtime': '3.12', 'language': 'python', 'comment': 'test fun', 'packages': ['numpy', 'pandas'], 'client_version': 1}",2025-05-01T09:44:46.027065511Z,2025-05-01T09:44:46.027068761Z,1


In [10]:
# Show mapping for <object> to <platform>. Example: [Platform_mapping(function_v2 -> snowflake)]
catalog.sql(
    """
    SHOW OPEN MAPPING function_v2 PLATFORM snowflake
    """
)

Unnamed: 0,typeName,platformName,syntax,objectDumpMap,createdTimestamp,lastUpdatedTimestamp,version
0,function_v2,snowflake,"""CREATE OR ALTER <type> <name>(<args>)\n RETURNS <return_type>\n LANGUAGE <language>\n RUNTIME = <runtime>\n HANDLER = '<name>'\n AS \n $$\n <def>\n $$"",","{'args': {'propType': 'map', 'format': '<key> <value>', 'delimiter': ', '}, 'packages': {'propType': 'list', 'format': ''<item>'', 'delimiter': ', '}}",2025-05-01T09:44:49.671430Z,2025-05-01T09:44:49.671434Z,1


In [11]:
# Show all mappings from <object>. Example: [snowflake,spark]
catalog.sql(
    """
    SHOW OPEN PLATFORMS FOR function_v2
    """
)

Unnamed: 0,typeName,platformName,syntax,objectDumpMap,createdTimestamp,lastUpdatedTimestamp,version
0,function_v2,snowflake,"""CREATE OR ALTER <type> <name>(<args>)\n RETURNS <return_type>\n LANGUAGE <language>\n RUNTIME = <runtime>\n HANDLER = '<name>'\n AS \n $$\n <def>\n $$"",","{'args': {'propType': 'map', 'format': '<key> <value>', 'delimiter': ', '}, 'packages': {'propType': 'list', 'format': ''<item>'', 'delimiter': ', '}}",2025-05-01T09:44:49.671430Z,2025-05-01T09:44:49.671434Z,1


In [12]:
catalog.sql(
    """
    SHOW OPEN PLATFORMS
    """
)

Unnamed: 0,platformName,platformMappings
0,snowflake,"[{'typeName': 'function_v2', 'platformName': 'snowflake', 'syntax': '""CREATE OR ALTER <type> <name>(<args>)  RETURNS <return_type>  LANGUAGE <language>  RUNTIME = <runtime>  HANDLER = '<name>'  AS $$  <def>  $$"",', 'objectDumpMap': {'args': {'propType': 'map', 'format': '<key> <value>', 'delimiter': ', '}, 'packages': {'propType': 'list', 'format': ""'<item>'"", 'delimiter': ', '}}, 'createdTimestamp': '2025-05-01T09:44:49.671430Z', 'lastUpdatedTimestamp': '2025-05-01T09:44:49.671434Z', 'version': 1}, {'typeName': 'function', 'platformName': 'snowflake', 'syntax': 'CREATE OR ALTER <type> <name>(<args>)  RETURNS <return_type>  LANGUAGE <language>  PACKAGES = (<packages>)  RUNTIME = <runtime>  HANDLER = '<name>'  AS $$  <def>  $$ ', 'objectDumpMap': {'args': {'propType': 'map', 'format': '<key> <value>', 'delimiter': ', '}, 'packages': {'propType': 'list', 'format': ""'<item>'"", 'delimiter': ', '}}, 'createdTimestamp': '2025-05-01T09:40:03.342585Z', 'lastUpdatedTimestamp': '2025-05-01T09:40:03.342587Z', 'version': 1}]"


In [13]:
catalog.sql(
    """
    SHOW OPEN MAPPINGS FOR snowflake
    """
)

Unnamed: 0,typeName,platformName,syntax,objectDumpMap,createdTimestamp,lastUpdatedTimestamp,version
0,function,snowflake,CREATE OR ALTER <type> <name>(<args>)\n RETURNS <return_type>\n LANGUAGE <language>\n PACKAGES = (<packages>)\n RUNTIME = <runtime>\n HANDLER = '<name>'\n AS $$\n <def>\n $$\n,"{'args': {'propType': 'map', 'format': '<key> <value>', 'delimiter': ', '}, 'packages': {'propType': 'list', 'format': ''<item>'', 'delimiter': ', '}}",2025-05-01T09:40:03.342585Z,2025-05-01T09:40:03.342587Z,1
1,function_v2,snowflake,"""CREATE OR ALTER <type> <name>(<args>)\n RETURNS <return_type>\n LANGUAGE <language>\n RUNTIME = <runtime>\n HANDLER = '<name>'\n AS \n $$\n <def>\n $$"",","{'args': {'propType': 'map', 'format': '<key> <value>', 'delimiter': ', '}, 'packages': {'propType': 'list', 'format': ''<item>'', 'delimiter': ', '}}",2025-05-01T09:44:49.671430Z,2025-05-01T09:44:49.671434Z,1


In [14]:
catalog.sql(
    """
    SYNC OPEN function_v2 for snowflake
    """
)

Executing 1 SQL statements...

Formatted SQL:
"CREATE OR ALTER function_v2 foo(arg1 int, arg2 int)
            RETURNS int
            LANGUAGE python
            RUNTIME = 3.12
            HANDLER = 'foo'
            AS 
            $$
            def foo(arg1, arg2):

        return arg1 + arg2
            $$",
Executing SQL (multilined SQL): 
"""
"CREATE OR ALTER function_v2 foo(arg1 int, arg2 int)
            RETURNS int
            LANGUAGE python
            RUNTIME = 3.12
            HANDLER = 'foo'
            AS 
            $$
            def foo(arg1, arg2):

        return arg1 + arg2
            $$",
"""


```json
{
    "executions": [
        {
            "sql": "\"CREATE OR ALTER function_v2 foo(arg1 int, arg2 int)\n            RETURNS int\n            LANGUAGE python\n            RUNTIME = 3.12\n            HANDLER = 'foo'\n            AS \n            $$\n            def foo(arg1, arg2):\n\n        return arg1 + arg2\n            $$\",",
            "status": "failed",
            "error": "\n[PARSE_SYNTAX_ERROR] Syntax error at or near '\"\"'.(line 1, pos 0)\n\n== SQL ==\n\"\"\"\n^^^\n\"CREATE OR ALTER function_v2 foo(arg1 int, arg2 int)\n            RETURNS int\n            LANGUAGE python\n            RUNTIME = 3.12\n            HANDLER = 'foo'\n            AS \n            $$\n            def foo(arg1, arg2):\n\n        return arg1 + arg2\n            $$\",\n\"\"\"\n"
        }
    ]
}
```

In [16]:
catalog.sql(
    """
    SYNC OPEN OBJECTS for snowflake
    """
)

```json
{
    "error": "HTTP Error",
    "exception message": "500 Server Error: Internal Server Error for url: http://polaris:8181/api/opendic/v1/platforms/snowflake/pull",
    "Catalog Response": {
        "error": {
            "message": null,
            "type": "NullPointerException",
            "code": 500
        }
    }
}
```

### Drop andfunc

In [15]:
catalog.sql(
    """
    DROP OPEN function_v2
    """
)


```json
{
    "error": "HTTP Error",
    "exception message": "400 Client Error: Bad Request for url: http://polaris:8181/api/opendic/v1/objects/function_v2",
    "Catalog Response": {
        "error": {
            "message": "An exception occurred while creating a query in EntityManager: \nException Description: Problem compiling [SELECT m from ModelPolicyMappingRecord m  where m.targetCatalogId=:targetCatalogId and m.targetId=:targetId]. \n[14, 38] The abstract schema type 'ModelPolicyMappingRecord' is unknown.\n[47, 64] The state field path 'm.targetCatalogId' cannot be resolved to a valid type.\n[88, 98] The state field path 'm.targetId' cannot be resolved to a valid type.",
            "type": "IllegalArgumentException",
            "code": 400
        }
    }
}
```

In [7]:
catalog.sql(
    """
    DROP OPEN MAPPINGS for snowflake
    """
)


```json
{
    "error": "HTTP Error",
    "exception message": "404 Client Error: Not Found for url: http://polaris:8181/api/opendic/v1/platforms/snowflake",
    "Catalog Response": {
        "No mappings for platform snowflake found": "snowflake"
    }
}
```

### Visualize opendic tables

In [17]:
%%sql
show tables in SYSTEM

namespace,tableName,isTemporary
SYSTEM,function,False
SYSTEM,function_v2,False


In [18]:
%%sql
select * from SYSTEM.function_v2

Py4JJavaError: An error occurred while calling o146.collectToPython.
: org.apache.iceberg.exceptions.NotFoundException: Failed to open input stream for file: file:/data/SYSTEM/function_v2/metadata/snap-397138463769984362-1-da961996-5028-4c2a-808b-d60355c51f14.avro
	at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:185)
	at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:102)
	at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
	at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37)
	at org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:333)
	at org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:260)
	at org.apache.iceberg.ManifestLists.read(ManifestLists.java:45)
	at org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:165)
	at org.apache.iceberg.BaseSnapshot.deleteManifests(BaseSnapshot.java:199)
	at org.apache.iceberg.BaseDistributedDataScan.findMatchingDeleteManifests(BaseDistributedDataScan.java:208)
	at org.apache.iceberg.BaseDistributedDataScan.doPlanFiles(BaseDistributedDataScan.java:149)
	at org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139)
	at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.tasks(SparkPartitioningAwareScan.java:174)
	at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.taskGroups(SparkPartitioningAwareScan.java:202)
	at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.outputPartitioning(SparkPartitioningAwareScan.java:104)
	at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$$anonfun$partitioning$1.applyOrElse(V2ScanPartitioningAndOrdering.scala:44)
	at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$$anonfun$partitioning$1.applyOrElse(V2ScanPartitioningAndOrdering.scala:42)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:466)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1216)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1215)
	at org.apache.spark.sql.catalyst.plans.logical.LocalLimit.mapChildren(basicLogicalOperators.scala:1608)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:466)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:466)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1216)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1215)
	at org.apache.spark.sql.catalyst.plans.logical.GlobalLimit.mapChildren(basicLogicalOperators.scala:1587)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:466)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.partitioning(V2ScanPartitioningAndOrdering.scala:42)
	at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.$anonfun$apply$1(V2ScanPartitioningAndOrdering.scala:35)
	at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.$anonfun$apply$3(V2ScanPartitioningAndOrdering.scala:38)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.apply(V2ScanPartitioningAndOrdering.scala:37)
	at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.apply(V2ScanPartitioningAndOrdering.scala:33)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:152)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:148)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:144)
	at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:162)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:182)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:179)
	at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:238)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:284)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:252)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:117)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4146)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.FileNotFoundException: File file:/data/SYSTEM/function_v2/metadata/snap-397138463769984362-1-da961996-5028-4c2a-808b-d60355c51f14.avro does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
	at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:183)
	... 98 more


In [7]:
%%sql
DESCRIBE EXTENDED SYSTEM.function

col_name,data_type,comment
uname,string,
args,"map<string,string>",
return_type,string,
def,string,
signature,string,
runtime,string,
language,string,
comment,string,
packages,array<string>,
client_version,int,


## Load One Month of NYC Taxi/Limousine Trip Data

For this notebook, we will use the New York City Taxi and Limousine Commision Trip Record Data that's available on the AWS Open Data Registry. This contains data of trips taken by taxis and for-hire vehicles in New York City. We'll save this into an iceberg table called `taxis`.

To be able to rerun the notebook several times, let's drop the table if it exists to start fresh.

In [9]:
%%sql

CREATE DATABASE IF NOT EXISTS nyc

In [14]:
%%sql

DROP TABLE IF EXISTS nyc.taxis

IllegalArgumentException: An exception occurred while creating a query in EntityManager: 
Exception Description: Problem compiling [SELECT m from ModelPolicyMappingRecord m  where m.targetCatalogId=:targetCatalogId and m.targetId=:targetId]. 
[14, 38] The abstract schema type 'ModelPolicyMappingRecord' is unknown.
[47, 64] The state field path 'm.targetCatalogId' cannot be resolved to a valid type.
[88, 98] The state field path 'm.targetId' cannot be resolved to a valid type.

In [11]:
df = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2021-04.parquet")
df.write.saveAsTable("nyc.taxis")

                                                                                

In [12]:
%%sql

DESCRIBE EXTENDED nyc.taxis

col_name,data_type,comment
VendorID,bigint,
tpep_pickup_datetime,timestamp_ntz,
tpep_dropoff_datetime,timestamp_ntz,
passenger_count,double,
trip_distance,double,
RatecodeID,double,
store_and_fwd_flag,string,
PULocationID,bigint,
DOLocationID,bigint,
payment_type,bigint,


In [13]:
%%sql

SELECT COUNT(*) as cnt
FROM nyc.taxis

cnt
2171187


## Schema Evolution

Adding, dropping, renaming, or altering columns is easy and safe in Iceberg. In this example, we'll rename `fare_amount` to `fare` and `trip_distance` to `distance`. We'll also add a float column `fare_per_distance_unit` immediately after `distance`.

In [None]:
%%sql

ALTER TABLE nyc.taxis RENAME COLUMN fare_amount TO fare

In [None]:
%%sql

ALTER TABLE nyc.taxis RENAME COLUMN trip_distance TO distance

In [None]:
%%sql

ALTER TABLE nyc.taxis ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'

In [None]:
%%sql

ALTER TABLE nyc.taxis ALTER COLUMN distance TYPE double;

In [None]:
%%sql

ALTER TABLE nyc.taxis ALTER COLUMN distance AFTER fare;

In [None]:
%%sql

ALTER TABLE nyc.taxis
ADD COLUMN fare_per_distance_unit float AFTER distance

Let's update the new `fare_per_distance_unit` to equal `fare` divided by `distance`.

In [None]:
%%sql

UPDATE nyc.taxis
SET fare_per_distance_unit = fare/distance

In [None]:
%%sql

SELECT
VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance_unit
FROM nyc.taxis

## Expressive SQL for Row Level Changes
With Iceberg tables, `DELETE` queries can be used to perform row-level deletes. This is as simple as providing the table name and a `WHERE` predicate. If the filter matches an entire partition of the table, Iceberg will intelligently perform a metadata-only operation where it simply deletes the metadata for that partition.

Let's perform a row-level delete for all rows that have a `fare_per_distance_unit` greater than 4 or a `distance` greater than 2. This should leave us with relatively short trips that have a relatively high fare per distance traveled.

In [None]:
%%sql

DELETE FROM nyc.taxis
WHERE fare_per_distance_unit > 4.0 OR distance > 2.0

There are some fares that have a `null` for `fare_per_distance_unit` due to the distance being `0`. Let's remove those as well.

In [None]:
%%sql

DELETE FROM nyc.taxis
WHERE fare_per_distance_unit is null

In [None]:
%%sql

SELECT
VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance_unit
FROM nyc.taxis

In [None]:
%%sql

SELECT COUNT(*) as cnt
FROM nyc.taxis

## Partitioning

A table’s partitioning can be updated in place and applied only to newly written data. Query plans are then split, using the old partition scheme for data written before the partition scheme was changed, and using the new partition scheme for data written after. People querying the table don’t even have to be aware of this split. Simple predicates in WHERE clauses are automatically converted to partition filters that prune out files with no matches. This is what’s referred to in Iceberg as *Hidden Partitioning*.

In [None]:
%%sql

ALTER TABLE nyc.taxis
ADD PARTITION FIELD VendorID

## Metadata Tables

Iceberg tables contain very rich metadata that can be easily queried. For example, you can retrieve the manifest list for any snapshot, simply by querying the table's `snapshots` table.

In [None]:
%%sql

SELECT snapshot_id, manifest_list
FROM nyc.taxis.snapshots

The `files` table contains loads of information on data files, including column level statistics such as null counts, lower bounds, and upper bounds.

In [None]:
%%sql

SELECT file_path, file_format, record_count, null_value_counts, lower_bounds, upper_bounds
FROM nyc.taxis.files

## Time Travel

The history table lists all snapshots and which parent snapshot they derive from. The `is_current_ancestor` flag let's you know if a snapshot is part of the linear history of the current snapshot of the table.

In [None]:
%%sql

SELECT *
FROM nyc.taxis.history

You can time-travel by altering the `current-snapshot-id` property of the table to reference any snapshot in the table's history. Let's revert the table to it's original state by traveling to the very first snapshot ID.

In [None]:
%%sql --var df

SELECT *
FROM nyc.taxis.history

In [None]:
original_snapshot = df.head().snapshot_id
spark.sql(f"CALL system.rollback_to_snapshot('nyc.taxis', {original_snapshot})")
original_snapshot

In [None]:
%%sql

SELECT
VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance_unit
FROM nyc.taxis

Another look at the history table shows that the original state of the table has been added as a new entry
with the original snapshot ID.

In [None]:
%%sql

SELECT *
FROM nyc.taxis.history

In [None]:
%%sql

SELECT COUNT(*) as cnt
FROM nyc.taxis