![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)

### [Integrated Audits: Streamlined Data Observability with Apache Iceberg](https://tabular.io/blog/integrated-audits/)

In [1]:
spark

To be able to rerun the notebook several times, let's drop the `permits` table if it exists to start fresh.

In [2]:
%%sql

DROP TABLE IF EXISTS permits

# Load NYC Film Permits Data

For this demo, we will use the [New York City Film Permits dataset](https://data.cityofnewyork.us/City-Government/Film-Permits/tg4x-b46p) available as part of the NYC Open Data initiative. We're using a locally saved copy of a 1000 record sample, but feel free to download the entire dataset to use in this notebook!

We'll save the sample dataset into an iceberg table called `permits`.

In [3]:
df = spark.read.option("inferSchema","true").option("multiline","true").json("/home/iceberg/data/nyc_film_permits.json")
df.write.saveAsTable("permits")

                                                                                

Taking a quick peek at the data, you can see that there are a number of permits for different boroughs in New York.

In [4]:
spark.read \
    .format("iceberg") \
    .load("permits") \
    .groupBy("borough") \
    .count() \
    .show()

+-------------+-----+
|      borough|count|
+-------------+-----+
|    Manhattan|  456|
|       Queens|  157|
|     Brooklyn|  340|
|        Bronx|   38|
|Staten Island|    9|
+-------------+-----+



# Generate an ID for an Integrated Audit Session

An integrated audit session is a single cadence of:
1. Staging changes to a table
2. Auditing the staged changes
3. Committing the changes (optional)

Each of these sessions must be represented with an ID. You can use any convention that makes sense in your environment but in this demo we'll simply use a UUID.

In [5]:
import uuid
ia_session_id = uuid.uuid4().hex
ia_session_id

'da63708dc1234438b4130d29f0f9e8f4'

# The Setup

Tables by default are not configured to allow integrated audits, therefore the first step is enabling this by setting the `write.wap.enabled` table metadata property to `true`

In [6]:
%%sql

ALTER TABLE permits
SET TBLPROPERTIES (
    'write.wap.enabled'='true'
)

Next, the `spark.wap.id` property of your Spark session configuration must be set to the integrated audit session ID.

In [7]:
spark.conf.set('spark.wap.id', ia_session_id)

With a `spark.wap.id` value set, you can now safely write **directly to the permits table**--don't worry, these changes will only be staged, not committed!

# Staging The Changes

To stage the changes, you simply write directly to the `permits` table. This is awesome in situations where you're working with a large and complex data ingestion pipeline.
Instead of including hard-coded logic in your pipeline to switch between a sort of "audit-mode" as opposed to "production-mode", with integrated audits you simple run your
production code!

For this demo, let's use a simple query that deletes all records for film permits in the manhattan borough.

In [8]:
%%sql

DELETE FROM permits
WHERE borough='Manhattan'

As described, even though the query was executed against the production table, these changes are only staged and not committed since we are within an integrated audit session. Let's confirm this by verifying that a count by borough still includes the Manhattan records.

In [9]:
%%sql

SELECT borough, count(*) permit_cnt
FROM permits
GROUP BY borough

borough,permit_cnt
Manhattan,456
Queens,157
Brooklyn,340
Bronx,38
Staten Island,9


# The Audit

Once the changes for this session are staged, you can perform all of your audits to validate the data. The first step is to retrieve the snapshot ID generated by the changes and tagged with this integrated audit session ID.

In [10]:
query = f"""
SELECT snapshot_id
FROM permits.snapshots
WHERE summary['wap.id'] = '{ia_session_id}'
"""
ia_session_snapshot_records = %sql $query
ia_session_snapshot = ia_session_snapshot_records.rows[0][0]

22/02/27 17:42:58 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
22/02/27 17:42:58 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
22/02/27 17:43:01 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
22/02/27 17:43:01 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore UNKNOWN@172.18.0.3
22/02/27 17:43:01 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


In [11]:
ia_session_snapshot

3399075732289260801

This snapshot includes the staged (but not commited) changes to your production table. Once you have this snapshot ID, you can use Iceberg's Time Travel feature to query it!

In [12]:
spark.read \
    .option("snapshot-id", ia_session_snapshot) \
    .format("iceberg") \
    .load("permits") \
    .groupBy("borough") \
    .count() \
    .show()

+-------------+-----+
|      borough|count|
+-------------+-----+
|       Queens|  157|
|     Brooklyn|  340|
|        Bronx|   38|
|Staten Island|    9|
+-------------+-----+



At this point, you can use any auditing tool or technique to validate your changes. For this demo, we'll do a simple audit that confirms that the only remaining boroughs are Queens, Brooklyn, Bronx, and Staten Island. If either borough is missing or any additional boroughs are found, we'll raise an exception.

In [13]:
expected_boroughs = {"Queens", "Brooklyn", "Bronx", "Staten Island"}
distinct_boroughs = spark.read \
    .option("snapshot-id", ia_session_snapshot) \
    .format("iceberg") \
    .load("permits") \
    .select("borough") \
    .distinct() \
    .toLocalIterator()
boroughs = {row[0] for row in distinct_boroughs}

In [14]:
# Since `boroughs` and `required_boroughs` are both sets (array of distinct items),
# we can confirm that they match by checking that the lengths of the sets are equal
# to eachother as well as to the union of both sets.
if len(boroughs) != len(expected_boroughs) != len(set.union(boroughs, expected_boroughs)):
    raise ValueError(f"Audit failed, borough set does not match expected boroughs: {boroughs} != {expected_boroughs}")

If the above check does not fail, we can go ahead and commit our staged data to publish our changes!

# The Publish

After the audits are completed, publishing the data is as simple as running a `cherrypick_snapshot` stored procedure.

In [15]:
publish_query = f"CALL demo.system.cherrypick_snapshot('permits', {ia_session_snapshot})"
%sql $publish_query

source_snapshot_id,current_snapshot_id
3399075732289260801,3399075732289260801


That's it! Publishing the changes from this integrated audit session is a simple metadata-only operation that instantly makes the changes live for all downstream consumers querying the `permits` table! Query results will now include the commit that removed all Manhattan records.

In [16]:
spark.read \
    .format("iceberg") \
    .load("permits") \
    .groupBy("borough") \
    .count() \
    .show()

+-------------+-----+
|      borough|count|
+-------------+-----+
|       Queens|  157|
|     Brooklyn|  340|
|        Bronx|   38|
|Staten Island|    9|
+-------------+-----+



# What Happens When The Audits Fail?

What about when your audits fail? What happens to the snapshots generated? How about the data and metadata files?

One of the best parts of Iceberg's integrated audits is that the cleanup of "*staged-yet-not-committed-data*" is part of the normal snapshot cleanup process of a typical Iceberg warehouse. To be more specific, let's say a daily snapshot expiration is performed on the data warehouse (using the [expire_snapshots](https://iceberg.apache.org/docs/latest/spark-procedures/#expire_snapshots) procedure) and all snapshots older than 7 days are expired. That means once your staged snapshot reaches 7 days in age, it will be expired.

Additionally, since the changes were never committed, the underlying data files for the snapshot will be removed since they're not referenced by any other snapshots in the linear history of the table.

Let's see this in action. First, start a new integrated audit session and stage a commit by inserting a single record.

In [17]:
ia_session_id = uuid.uuid4().hex
ia_session_id

'91fc3db862584b7ba5cd601917161549'

In [18]:
spark.conf.set('spark.wap.id', ia_session_id)

In [19]:
%%sql

INSERT INTO permits
VALUES (
    'Hoboken',
    'Television',
    '1',
    'United States of America',
    '2021-11-24T23:00:00.000',
    '2021-11-23T09:38:17.000',
    'Mayor\'s Office of Film, Theatre & Broadcasting',
    '613322',
    'Shooting Permit',
    'WASHINGTON STREET',
    '100',
    '2021-11-24T07:00:00.000',
    'Episodic series',
    '07030'
)

Next, let's identify the snapshot that was tagged with the integrated audit session ID.

In [20]:
%%sql

SELECT snapshot_id
FROM permits.snapshots

snapshot_id
2396070907941716061
3399075732289260801
801796559703398824


In [21]:
query = f"""
SELECT snapshot_id
FROM permits.snapshots
WHERE summary['wap.id'] = '{ia_session_id}'
"""
ia_session_snapshot_records = %sql $query
ia_session_snapshot = ia_session_snapshot_records.rows[0][0]

In [22]:
ia_session_snapshot

801796559703398824

A quick check of the history table shows that this snapshot is not included as part of the current history of the table since it has not been published yet.

In [23]:
%%sql

SELECT *
FROM permits.history

made_current_at,snapshot_id,parent_id,is_current_ancestor
2022-02-27 17:42:54.596000,2396070907941716061,,True
2022-02-27 17:43:02.309000,3399075732289260801,2.396070907941716e+18,True


In a scenario where the audits fail and this change is not published, the `expire_snapshots` procedure will clean up the snapshot **and** the data files. Let's demonstrate this by calling the `expire_snapshots` procedure for all snapshots older than the current timestamp.

In [24]:
import time
%sql CALL demo.system.expire_snapshots('permits', {round(time.time() * 1000)}, 100)

                                                                                

deleted_data_files_count,deleted_manifest_files_count,deleted_manifest_lists_count
1,1,1


The output from the `expire_snapshots` procedure shows that a data file, a manifest file, and a manifest list file were deleted. Furthermore, the snapshot no longer appears in the permit table's snapshots table.

In [25]:
%%sql

SELECT *
FROM permits.snapshots

committed_at,snapshot_id,parent_id,operation,manifest_list,summary
2022-02-27 17:42:54.596000,2396070907941716061,,append,/home/iceberg/warehouse/permits/metadata/snap-2396070907941716061-1-51038f37-3618-409e-b49a-84abb22448b3.avro,"{'spark.app.id': 'local-1645983764572', 'changed-partition-count': '1', 'added-data-files': '1', 'total-equality-deletes': '0', 'added-records': '1000', 'total-position-deletes': '0', 'added-files-size': '54395', 'total-delete-files': '0', 'total-files-size': '54395', 'total-records': '1000', 'total-data-files': '1'}"
2022-02-27 17:42:56.751000,3399075732289260801,2.396070907941716e+18,overwrite,/home/iceberg/warehouse/permits/metadata/snap-3399075732289260801-1-6ea89112-433b-4fae-a965-79a6f898d78c.avro,"{'added-data-files': '1', 'total-equality-deletes': '0', 'added-records': '544', 'deleted-data-files': '1', 'deleted-records': '1000', 'total-records': '544', 'spark.app.id': 'local-1645983764572', 'removed-files-size': '54395', 'changed-partition-count': '1', 'wap.id': 'da63708dc1234438b4130d29f0f9e8f4', 'total-position-deletes': '0', 'added-files-size': '31210', 'total-delete-files': '0', 'total-files-size': '31210', 'total-data-files': '1'}"
