-
Notifications
You must be signed in to change notification settings - Fork 2.6k
ICEBERG-13343: fix race condition in JdbcCatalog #13345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
ICEBERG-13343: fix race condition in JdbcCatalog #13345
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imho, this PR is not relevant related to previous discussions and logic used in the init method.
Based on @jbonofre comments I have reworked the code to not use "IF NOT EXISTS" and instead to check if the table was created after the creation fails. This case will occur during the race condition. Added Apache Derby as a test DB as SqlLite does not handle multiple connections well. Refactored some code to make it cleaner and easier to read. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this change to address the race condition during JDBC catalog initialization.
I think JB made a good point in the other thread,
We are suppose to create table at JDBC Catalog init, before the sink is "running".
This is a good workaround, and this PR eliminates the need for it.
I added a few nit comments. Please take a look
} | ||
} finally { | ||
DriverManager.deregisterDriver(slowDriver); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a catalog operation here to verify that both catalog clients are initialized and ready
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what you are asking for here. the future executes creates and initializes the JdbcCatalog which does not return from initialize until the initialization is complete. So the future.get
at line 227 ensures that the creation and initialization has completed for both.
I don't see a Catalog method to call to verify that it is valid/working. Do you have a suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the test here is testing that the JdbcCatalog
be can initialize with multiple concurrent connections.
My ask is to add a catalog operation (i.e. listNamespaces) on L232, after testing the initialization logic, to test that the catalog is indeed initialized correctly
String testingDB = "jdbc:slow:derby:memory:testDb;create=true"; | ||
new org.apache.derby.jdbc.EmbeddedDriver(); | ||
properties.put(CatalogProperties.URI, testingDB); | ||
SlowDriver slowDriver = new SlowDriver(testingDB); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious if we can test this without SlowDriver
.
Can we simulate the same behavior by raising the parallelism?
For example, testConcurrentConnections
uses 7 threads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can try but then the pass/fail of the test is dependent upon the system on which it runs. I feel that if you want to test a condition where a thread gets paused between two steps, make it pause.
I would be willing to make the entire wrapper classes a library so that here it would simply be creating an instance and overriding the two methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense, ty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @Claudenw 🙌
@@ -149,4 +190,1015 @@ public synchronized void testConcurrentConnections() throws InterruptedException | |||
assertThat(executorService.awaitTermination(3, TimeUnit.MINUTES)).as("Timeout").isTrue(); | |||
assertThat(Iterables.size(icebergTable.snapshots())).isEqualTo(7); | |||
} | |||
|
|||
@Test | |||
public synchronized void testInitializeWithSlowConcurrentConnections() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appreciate the tests here 👍
Co-authored-by: Fokko Driesprong <fokko@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Fix for ICEBERG-13343
Closes #13343