Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent context update after/while rebalancing #161

Merged
merged 1 commit into from
Jun 20, 2019

Conversation

nicolasguyomar
Copy link
Contributor

Hi,

This method leads to the context being updated with previously assigned partitions, and to a fatal error of trying to seek to a "no longer assigned partition".

Reproduction can be done by submitting a BQ connector with one task, and update that same connector to use 5 tasks for instance.

It really often ends up with the first created task to fail with that exception :

[2019-06-05 10:19:20,150] ERROR WorkerSinkTask{id=mybigquery--0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
java.lang.IllegalStateException: No current assignment for partition mypartition-0
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
	at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1501)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:601)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:70)
	at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:675)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:291)

I was able to load in BigQuery millions of records with that fix, while killing the connector, adding and removing tasks while still not loose any records.

Could you please double check that behavior and merge that PR ?

Have a good day

Nicolas

This call might have been needed before, but it is no longer required
@C0urante
Copy link
Collaborator

@mtagle IIRC we chatted about this a while back and decided the method was basically useless at this point and safe to remove, right?

@codecov-io
Copy link

Codecov Report

Merging #161 into master will increase coverage by 0.09%.
The diff coverage is n/a.

@@             Coverage Diff              @@
##             master     #161      +/-   ##
============================================
+ Coverage      65.1%   65.19%   +0.09%     
+ Complexity      238      237       -1     
============================================
  Files            33       33              
  Lines          1367     1362       -5     
  Branches        130      130              
============================================
- Hits            890      888       -2     
+ Misses          439      437       -2     
+ Partials         38       37       -1
Impacted Files Coverage Δ Complexity Δ
...wepay/kafka/connect/bigquery/BigQuerySinkTask.java 58.75% <ø> (+0.56%) 23 <0> (-1) ⬇️

@rhauch
Copy link

rhauch commented Jun 11, 2019

I wonder if the bug is not necessarily that the connector is attempting to manage the offsets, but how it is doing it. The updateOffsets(...) is essentially trying to set the task context's offsets to those it got directly from Connect via the flush(...) method. This definitely is unnecessary and problematic, as noted above.

But if it is desired that the connector track its own offsets, then it should use the preCommit(...) method that was added as part of KIP-89, and which first appeared in AK 0.10.2.0. Connect now calls this method periodically, and passes in what it thinks the offsets are for each topic partition that have been consumed. By default, the preCommit(...) method simply calls flush(...) with the same offsets. This way, connectors that override flush(...) still work.

A connector can, however, override preCommit(...) so that it returns the actual offsets for each topic partition of the records it has completely processed. And when it does this, Connect will asynchronously commit those offsets on the consumer (or synchronously if the task is closing).

@C0urante
Copy link
Collaborator

@criccomini @mtagle @jgao54 ping :)

@criccomini
Copy link
Contributor

:looking:

@criccomini
Copy link
Contributor

Based on what @rhauch is saying, I think we want to override preCommit(). Reason being is that we need to have flushes (when we guarantee writes to BQ are done successfully) fully finished before we commit our offsets. As such, I think we should do:

preCommit() {
  flush();
  saveOffsets();
}

@criccomini
Copy link
Contributor

Talked with @mtagle about this a bit more. It actually sounds like we don't need to manage the offsets ourselves since we apparently fully flush all messages (or time out on flush) before returning. As such, I think we can offload the work to Kafka connect. The PR should suffice then. Going to let @mtagle confirm before merging.

@criccomini
Copy link
Contributor

(Also, planning to release in the next 2 weeks)

@mtagle
Copy link
Contributor

mtagle commented Jun 20, 2019

As far as I can tell, KCBQ should not need to keep track of it's own offsets, at least if it's just doing normal BQ streaming. .flush() makes sure that all the rows that we've been given have been successfully to BigQuery before it returns.

(Going through GCS is a little bit more complicated but after .flush() completes successfully you can at least be sure that the data has reached GCS. Since GCS loading is technically in beta this is something I feel comfortable with.)

So, with that being said I'm totally happy to get rid of this method, and I believe it is safe to do so.

@mtagle mtagle merged commit 0208081 into wepay:master Jun 20, 2019
@C0urante
Copy link
Collaborator

Thanks @mtagle @criccomini!

criccomini pushed a commit that referenced this pull request Jun 27, 2019
This call might have been needed before, but it is no longer required
ivanyu pushed a commit to ivanyu/kafka-connect-bigquery that referenced this pull request Jul 15, 2019
This call might have been needed before, but it is no longer required
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants