Skip to content

Added catalog initialization at kafka connector start #13357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

Claudenw
Copy link
Contributor

@Claudenw Claudenw commented Jun 20, 2025

fix for ICEBERG-13356

Change initializes the Catalog before first use by tasks.

@Claudenw
Copy link
Contributor Author

@jbonofre , @Fokko @kevinjqliu would you please take a look at this PR and the associated issue. I think this will avoid issues like we saw with the race condition in JdbcCatalog for other Catalogs that have both global and local parts of the initialization.

Copy link
Member

@jbonofre jbonofre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bryanck wdyt ?

@bryanck
Copy link
Contributor

bryanck commented Jun 24, 2025

I feel this is a workaround that doesn't address the core issue in JdbcCatalog. When multiple workers are launched, each loads the catalog. The JdbcCatalog will try to initialize its schema if it hasn't been, and this causes the race condition, as multiple workers try to initialize the schema concurrently. JdbcCatalog should handle this case, as this can happen in any concurrent scenario. For example, two different connectors could load the same JdbcCatalog, and this fix won't help.

@Claudenw
Copy link
Contributor Author

Claudenw commented Jun 24, 2025 via email

@Claudenw
Copy link
Contributor Author

@bryanck Are you ok with with this change?

I think it improves performance in some cases, and has negligible impact in the others.

The idea is to provide the catalog the opportunity to do expensive initialization before each thread starts and asks the catalog to do initialization as well.

In the JDBC case this will improve performance because the later initialization will not have to make an additional 2 network calls.

@kumarpritam863
Copy link
Contributor

@Claudenw I did not get why every worker will try to create table if the race condition is handled in the catalog itself. If race condition is handled in the catalog then only one of the worker will be creating the table and others will simply load that. Please do correct me if I am wrong or I misunderstood anything.

@Claudenw
Copy link
Contributor Author

@kumarpritam863 There are several issues converging in this change.

The issue with JdbcCatalog is an example.

If we look at the code for the connector, it follows the standard process for generic connectors: save the config, make a copy of the config for each task, and let the system to the rest.

The underlying system then creates the specified number of tasks by creating an instance of the task class and calling initialize with the context and then start with the properties.

We expect the Catalog to be initialized at some point. The earliest point that this can be done is where I have placed it in this PR, in the connector.start() method. The only other reasonable place to initialize the Catalog is in the task.start() method.

The problem arises when the Catalog has both a globally shared and a local component like the JdbcCatalog does. In the JdbcCatalog case there are two tables that must exist. The catalog initialization (after my patch) does the following:

  1. Checks to see if the tables are available.
  2. If not attempt to create the tables
  3. if create failed check again to see if the tables are there.
  4. if not fail.
  5. read/write data to the catalog.

At each of those points the thread may be switched out and processing stopped. What has happened in several cases that I have seen is that multiple threads execute step 1 but are switched out before executing step 2. The first thread back completes step 2 and works as expected. All other threads that executed the first step, fail in the second and succeed in the 3rd.

So within one machine running multiple threads we execute several requests over the wire in an attempt to configure. It works but it increases the time necessary to start and tends to add spurious log messages about failed table creation.

In the JDBC case, if we call the initialization early and from one thread before multiple threads are spun up it will create the tables once and then subsequent calls from the tasks will see the tables in step 1 and continue with step 5. In addition if the configuration fails then the connector fails before the tasks are created saving significant time and resources.

Also, note that if there are multiple servers with multiple threads the number of calls increases significantly and the longer failure prone path can be taken even if the thread is not switched out just by having another system create the tables at the right (wrong?) moment.

I am aware that this discussion relates only to JdbcCatalog, but it is an exemplar for any catalog that attempts to establish a global configuration. I also realize could be solved in the JDBC case by requiring that the tables be created before starting the system, or by using "create table" commands that use the "if exists" key word. The later was discussed in detail and has to be rejected (see issue #13343). I also note that there are, as far as I can tell, no requirement for the Catalog implementation other than have a no argument constructor, and an initialize() method that takes a name and a map of properties.

So with no way to interrogate Catalogs to understand what their requirements are the only way to make it more efficient at startup is to make a call to initialize the Catalog is when the connector saves the configuration. (as per the change here). The cost of doing this is minimal.

This change does two things:

  1. Allows Catalogs that have global data to create that data once.
  2. Allows a Catalog failure to be detected early.

It has multiple advantages:

  1. Reduces the time and resource consumption during failed some Catalog initializations.
  2. Reduces the chance of resource conflict for Catalogs that have global data.

@Claudenw
Copy link
Contributor Author

Claudenw commented Jul 2, 2025

@bryanck, @jbonofre are you guys OK with this?

@kumarpritam863
Copy link
Contributor

By moving this call, we’re essentially saving just a single initialization in most scenarios — the first call sets up the catalog, and subsequent ones simply reuse it. However, this approach feels slightly like an anti-pattern, as the catalog is conceptually and operationally tied to the task, which is where the actual business logic interacts with it.

Embedding catalog initialization at the connector level not only increases the connector's responsibility but can also make it more susceptible to failures. In practice, most observability and alerting systems focus on task-level failures rather than connector-level ones, so this shift could make failure detection and recovery harder to trace and act upon.

Additionally, the underlying issue seems tightly coupled with the JDBC behavior. Given that, I believe a more robust and localized solution might be to handle the exception directly here, where it originates.

Curious to hear your thoughts on this, @bryanck — what do you think?

@Claudenw
Copy link
Contributor Author

Claudenw commented Jul 2, 2025

@kumarpritam863 Interestingly the code you reference is the fix in the JDBC Catalog. It contains the fix for issue #13343.

While we can and have fixed it there, the problem is that there is no single threaded Catalog.init before Catalog first use.

The argument that this change saves a single initialization only holds for lightly loaded system with few tasks. In systems with multiple tasks and reasonable load we have consistently seen the issue arise. When we encountered the issue, prior to the fixed code you reference, the initialization would fail and the connector fail to start. With the patch in place we expect that the initialization will complete but will also take significantly longer and with multiple network requests required to resolve the issue.

Most alerting systems will detect if a connector fails to start, which would be the result of a badly configured catalog after this change.

I agree this is only evident in JdbcCatalog, but then it is an exemplar of the issue, and an exemplar that does not have an easy fix. I have not looked at other Catalogs to see if they can benefit from an early initialization, but I can envision several architectures where it would be advantageous.

This is not a generic problem of the Kafka Connector architecture and while it appears tightly coupled to the JdbcCatalog, the problem arises because Iceberg requires a Catalog but does not place constraints upon initialization and does not recognize that some initialization is best implemented in a single thread. Otherwise, the Catalog interface would have a global initialization method and a local initialization method. In which case I would expect to see the global initialization called during container.start() and the local initialization called during task.start().

@kevinjqliu
Copy link
Contributor

Hey @Claudenw, thanks for the PR here! I want to verify my understanding of the underlying issue.

It looks like we already fixed the underlying race condition when multiple clients are trying to initialize the same JDBC catalog (#13345).

Is this PR intended to address the issue #13356? If so, lets update the PR description.

Is the intent here to load the catalog at the connector initialization to ensure that the catalog itself is properly configured?
If so, I feel like this puts extra burden on the connector. Ideally, the connector should not worry about the catalog and its state. The connector should just connect to the catalog and perform subsequent operations.
I feel a better approach is to have the catalog pre-configured before the setting up the connector. Perhaps we can update the kakfa-connect docs to call that out.

WDYT?

@Claudenw
Copy link
Contributor Author

Claudenw commented Jul 7, 2025

@kevinjqliu

I think that adding hooks to allow catalogs to be initialized before first use is an important improvement as it allows multi-tenant environments to initialize the catalog based upon the configuration for the connector. Since this connector requires the catalog is seems incumbent upon the connector to ensure that catalog is operational and properly configured, or at least detect that it is not.

However, if the consensus is that the solution is to update the documentation I will go along with that, begrudgingly.

In any case I will update the description.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants