Skip to content

ICEBERG-13343: fix race condition in JdbcCatalog #13345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

Claudenw
Copy link

@Claudenw Claudenw commented Jun 18, 2025

Fix for ICEBERG-13343

Closes #13343

@github-actions github-actions bot added the core label Jun 18, 2025
Copy link
Member

@jbonofre jbonofre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imho, this PR is not relevant related to previous discussions and logic used in the init method.

@github-actions github-actions bot added the build label Jun 19, 2025
@Claudenw
Copy link
Author

Based on @jbonofre comments I have reworked the code to not use "IF NOT EXISTS" and instead to check if the table was created after the creation fails. This case will occur during the race condition.

Added Apache Derby as a test DB as SqlLite does not handle multiple connections well.

Refactored some code to make it cleaner and easier to read.

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this change to address the race condition during JDBC catalog initialization.

I think JB made a good point in the other thread,

We are suppose to create table at JDBC Catalog init, before the sink is "running".

This is a good workaround, and this PR eliminates the need for it.

I added a few nit comments. Please take a look

}
} finally {
DriverManager.deregisterDriver(slowDriver);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a catalog operation here to verify that both catalog clients are initialized and ready

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you are asking for here. the future executes creates and initializes the JdbcCatalog which does not return from initialize until the initialization is complete. So the future.get at line 227 ensures that the creation and initialization has completed for both.

I don't see a Catalog method to call to verify that it is valid/working. Do you have a suggestion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the test here is testing that the JdbcCatalog be can initialize with multiple concurrent connections.

My ask is to add a catalog operation (i.e. listNamespaces) on L232, after testing the initialization logic, to test that the catalog is indeed initialized correctly

String testingDB = "jdbc:slow:derby:memory:testDb;create=true";
new org.apache.derby.jdbc.EmbeddedDriver();
properties.put(CatalogProperties.URI, testingDB);
SlowDriver slowDriver = new SlowDriver(testingDB);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious if we can test this without SlowDriver.

Can we simulate the same behavior by raising the parallelism?
For example, testConcurrentConnections uses 7 threads

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can try but then the pass/fail of the test is dependent upon the system on which it runs. I feel that if you want to test a condition where a thread gets paused between two steps, make it pause.

I would be willing to make the entire wrapper classes a library so that here it would simply be creating an instance and overriding the two methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, ty

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @Claudenw 🙌

@@ -149,4 +190,1015 @@ public synchronized void testConcurrentConnections() throws InterruptedException
assertThat(executorService.awaitTermination(3, TimeUnit.MINUTES)).as("Timeout").isTrue();
assertThat(Iterables.size(icebergTable.snapshots())).isEqualTo(7);
}

@Test
public synchronized void testInitializeWithSlowConcurrentConnections()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appreciate the tests here 👍

Co-authored-by: Fokko Driesprong <fokko@apache.org>
Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@kevinjqliu kevinjqliu requested review from Fokko and bryanck June 25, 2025 16:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Creating 2 or more Iceberg Kafka Sink Connectors using slow JDBC catalog causes a startup failure on first run.
4 participants