-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Added catalog initialization at kafka connector start #13357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Added catalog initialization at kafka connector start #13357
Conversation
@jbonofre , @Fokko @kevinjqliu would you please take a look at this PR and the associated issue. I think this will avoid issues like we saw with the race condition in JdbcCatalog for other Catalogs that have both global and local parts of the initialization. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bryanck wdyt ?
I feel this is a workaround that doesn't address the core issue in |
This is not a fix for tje race condition in JdbcCatalog. It is a fix to
avoid the problem in other connectors. It also provides the connector the
opportunity to do expensive initiation before each task attempts to do the
initialization. For example, even with the fixed JdbcCatalog, without this
change ecah task in the JdbcCatalog may attempt the expensive table
creation. So if the connector attempts to start 50 threads it is possible
that there will be 50 attempts to create tables followed by 49 calls to sww
if the tables were created.
I am certain that there are other catalogs that could implement a costly
configuration on the first call that would be faster if the first attempt
was made befora anumber of tasks were started.
I think that in most cases this will be more efficient and in cases where
it is not the performance degredation is minimal.
LinkedIn: http://www.linkedin.com/in/claudewarren
…On Tue 24 Jun 2025, 17:01 Bryan Keller, ***@***.***> wrote:
*bryanck* left a comment (apache/iceberg#13357)
<#13357 (comment)>
I feel this is a workaround that doesn't address the core issue in
JdbcCatalog. When multiple workers are launched, each loads the catalog.
The JdbcCatalog will try to initialize its schema if it hasn't been, and
this causes the race condition, as multiple workers try to initialize the
schema concurrently. JdbcCatalog should handle this case, as this can
happen in any concurrent scenario. For example, two different connectors
could load the same JdbcCatalog, and this fix won't help.
—
Reply to this email directly, view it on GitHub
<#13357 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AASTVHSZTWEFBRWCNYWXE5L3FFY65AVCNFSM6AAAAAB7XRYHNSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZTAMBRGA3DQOJRG4>
.
You are receiving this because you authored the thread.Message ID:
***@***.***>
|
@bryanck Are you ok with with this change? I think it improves performance in some cases, and has negligible impact in the others. The idea is to provide the catalog the opportunity to do expensive initialization before each thread starts and asks the catalog to do initialization as well. In the JDBC case this will improve performance because the later initialization will not have to make an additional 2 network calls. |
@Claudenw I did not get why every worker will try to create table if the race condition is handled in the catalog itself. If race condition is handled in the catalog then only one of the worker will be creating the table and others will simply load that. Please do correct me if I am wrong or I misunderstood anything. |
@kumarpritam863 There are several issues converging in this change. The issue with JdbcCatalog is an example. If we look at the code for the connector, it follows the standard process for generic connectors: save the config, make a copy of the config for each task, and let the system to the rest. The underlying system then creates the specified number of tasks by creating an instance of the task class and calling initialize with the context and then start with the properties. We expect the Catalog to be initialized at some point. The earliest point that this can be done is where I have placed it in this PR, in the The problem arises when the Catalog has both a globally shared and a local component like the JdbcCatalog does. In the JdbcCatalog case there are two tables that must exist. The catalog initialization (after my patch) does the following:
At each of those points the thread may be switched out and processing stopped. What has happened in several cases that I have seen is that multiple threads execute step 1 but are switched out before executing step 2. The first thread back completes step 2 and works as expected. All other threads that executed the first step, fail in the second and succeed in the 3rd. So within one machine running multiple threads we execute several requests over the wire in an attempt to configure. It works but it increases the time necessary to start and tends to add spurious log messages about failed table creation. In the JDBC case, if we call the initialization early and from one thread before multiple threads are spun up it will create the tables once and then subsequent calls from the tasks will see the tables in step 1 and continue with step 5. In addition if the configuration fails then the connector fails before the tasks are created saving significant time and resources. Also, note that if there are multiple servers with multiple threads the number of calls increases significantly and the longer failure prone path can be taken even if the thread is not switched out just by having another system create the tables at the right (wrong?) moment. I am aware that this discussion relates only to JdbcCatalog, but it is an exemplar for any catalog that attempts to establish a global configuration. I also realize could be solved in the JDBC case by requiring that the tables be created before starting the system, or by using "create table" commands that use the "if exists" key word. The later was discussed in detail and has to be rejected (see issue #13343). I also note that there are, as far as I can tell, no requirement for the Catalog implementation other than have a no argument constructor, and an So with no way to interrogate Catalogs to understand what their requirements are the only way to make it more efficient at startup is to make a call to initialize the Catalog is when the connector saves the configuration. (as per the change here). The cost of doing this is minimal. This change does two things:
It has multiple advantages:
|
By moving this call, we’re essentially saving just a single initialization in most scenarios — the first call sets up the catalog, and subsequent ones simply reuse it. However, this approach feels slightly like an anti-pattern, as the catalog is conceptually and operationally tied to the task, which is where the actual business logic interacts with it. Embedding catalog initialization at the connector level not only increases the connector's responsibility but can also make it more susceptible to failures. In practice, most observability and alerting systems focus on task-level failures rather than connector-level ones, so this shift could make failure detection and recovery harder to trace and act upon. Additionally, the underlying issue seems tightly coupled with the JDBC behavior. Given that, I believe a more robust and localized solution might be to handle the exception directly here, where it originates. Curious to hear your thoughts on this, @bryanck — what do you think? |
@kumarpritam863 Interestingly the code you reference is the fix in the JDBC Catalog. It contains the fix for issue #13343. While we can and have fixed it there, the problem is that there is no single threaded Catalog.init before Catalog first use. The argument that this change saves a single initialization only holds for lightly loaded system with few tasks. In systems with multiple tasks and reasonable load we have consistently seen the issue arise. When we encountered the issue, prior to the fixed code you reference, the initialization would fail and the connector fail to start. With the patch in place we expect that the initialization will complete but will also take significantly longer and with multiple network requests required to resolve the issue. Most alerting systems will detect if a connector fails to start, which would be the result of a badly configured catalog after this change. I agree this is only evident in JdbcCatalog, but then it is an exemplar of the issue, and an exemplar that does not have an easy fix. I have not looked at other Catalogs to see if they can benefit from an early initialization, but I can envision several architectures where it would be advantageous. This is not a generic problem of the Kafka Connector architecture and while it appears tightly coupled to the JdbcCatalog, the problem arises because Iceberg requires a Catalog but does not place constraints upon initialization and does not recognize that some initialization is best implemented in a single thread. Otherwise, the Catalog interface would have a global initialization method and a local initialization method. In which case I would expect to see the global initialization called during |
Hey @Claudenw, thanks for the PR here! I want to verify my understanding of the underlying issue. It looks like we already fixed the underlying race condition when multiple clients are trying to initialize the same JDBC catalog (#13345). Is this PR intended to address the issue #13356? If so, lets update the PR description. Is the intent here to load the catalog at the connector initialization to ensure that the catalog itself is properly configured? WDYT? |
I think that adding hooks to allow catalogs to be initialized before first use is an important improvement as it allows multi-tenant environments to initialize the catalog based upon the configuration for the connector. Since this connector requires the catalog is seems incumbent upon the connector to ensure that catalog is operational and properly configured, or at least detect that it is not. However, if the consensus is that the solution is to update the documentation I will go along with that, begrudgingly. In any case I will update the description. |
fix for ICEBERG-13356
Change initializes the Catalog before first use by tasks.