Skip to content

Commit

Permalink
Make Iceberg materialized view refresh atomic
Browse files Browse the repository at this point in the history
Include old file deletion and new file appends in the same transaction.
  • Loading branch information
alexjo2144 authored and findepi committed Sep 16, 2022
1 parent e1f688d commit 1dc391a
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 11 deletions.
6 changes: 0 additions & 6 deletions docs/src/main/sphinx/connector/iceberg.rst
Expand Up @@ -1019,11 +1019,5 @@ view is queried, the snapshot-ids are used to check if the data in the storage
table is up to date. If the data is outdated, the materialized view behaves
like a normal view, and the data is queried directly from the base tables.

.. warning::

There is a small time window between the commit of the delete and insert,
when the materialized view is empty. If the commit operation for the insert
fails, the materialized view remains empty.

Dropping a materialized view with :doc:`/sql/drop-materialized-view` removes
the definition and the storage table.
Expand Up @@ -2119,12 +2119,14 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
Collection<ComputedStatistics> computedStatistics,
List<ConnectorTableHandle> sourceTableHandles)
{
// delete before insert .. simulating overwrite
executeDelete(session, tableHandle);

IcebergWritableTableHandle table = (IcebergWritableTableHandle) insertHandle;

Table icebergTable = transaction.table();
// delete before insert .. simulating overwrite
transaction.newDelete()
.deleteFromRowFilter(Expressions.alwaysTrue())
.commit();

List<CommitTaskData> commitTasks = fragments.stream()
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
.collect(toImmutableList());
Expand Down
Expand Up @@ -194,8 +194,8 @@ public void testRefreshMaterializedView()

assertMetastoreInvocations("REFRESH MATERIALIZED VIEW test_refresh_mview_view",
ImmutableMultiset.builder()
.addCopies(GET_TABLE, 9)
.addCopies(REPLACE_TABLE, 2)
.addCopies(GET_TABLE, 6)
.addCopies(REPLACE_TABLE, 1)
.build());
}

Expand Down

0 comments on commit 1dc391a

Please sign in to comment.