Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

question: clarify Poll behavior #489

Closed
stampy88 opened this issue Jul 2, 2023 · 9 comments
Closed

question: clarify Poll behavior #489

stampy88 opened this issue Jul 2, 2023 · 9 comments

Comments

@stampy88
Copy link

stampy88 commented Jul 2, 2023

Hi @twmb,

I have a question for you regarding the PollFetches behavior and the PauseFetchPartitions/ResumeFetchPartitions methods. I am have a consumer that is inspired by your goroutine_per_partition_consuming with manual commits. I want to PauseFetchPartitions whenever I deliver a batch of records to a worker, and ResumeFetchPartitions when that batch is complete. Basically only fetch more records for partitions that have been processed. This seems to be working fine, but I am seeing some unexpected behavior. It seems like there may be more records buffered than are delivered to me in my "poll loop"; is that accurate(I am using PollFetches)? The reason I say this is because I sometimes see that all partitions are paused, but still records are returned from the poll. This is causing a problems because even though all partitions are paused, I am blocked on a send to channel to the worker that is still processing the previous batch. I can make it a larger buffered channel, but I would like to understand how many records can potentially be buffered so I can size this appropriately. Also, what happens if I have all of the partitions paused and do a fetch? This looks like it is handled, https://github.com/twmb/franz-go/blob/master/pkg/kgo/consumer.go#L489, but again just wanted to confirm my understanding.

Thanks for the awesome library!

@twmb
Copy link
Owner

twmb commented Jul 2, 2023

You're accurate that there may be more records buffered. Under the hood what Pause does is literally block the record from being fetched. However, there could have been previous fetches that were issued and are now buffered waiting to be drained with a Poll, and these previous fetches may contain records for your partition.

If you have all partitions paused, fetches will stop being issued. Once a partition is resumed, fetches will begin being issued again.

I might consider never returning data for a partition after it is paused even if it is currently buffered. The implementation is a bit more difficult, and it requires basically dropping everything that was buffered and re-fetching it in the future, so there's more work that the client performs as well.

@stampy88
Copy link
Author

stampy88 commented Jul 2, 2023

Thanks for confirming, @twmb. Can you give a recommendation for sizing my per-partition channel so that I don't block my fetch loop? Is it just 1 additional Poll after I pause that partition?

@twmb
Copy link
Owner

twmb commented Jul 2, 2023

It's undefined, since a fetch could be in flight for a partition you're pausing the moment before you pause.

@stampy88
Copy link
Author

stampy88 commented Jul 3, 2023

So is a fetch only in flight after calling one of the Poll* methods? Meaning if I called PollFetches once, is there a fetch asked for at that point? And after that poll returns, are there no outstanding flights? Just to be clear, it is my fetcher thread that is pausing the partition after the fetch, not my partition consumer thread. Just trying to get as much info as possible so I can set this intelligently.

@brunsgaard
Copy link
Contributor

brunsgaard commented Jul 3, 2023

Hello, Why is processing back pressure not enough to make the channel send-operation in your poll loop block and thereby effectively pausing further fetching while the partition worker is finishing its work. I suspect that pattern will work better.

I don't think it is optimal to use pause/resume as control flow logic. At least I have not seen that before. The places I have seen it used in my company is by exposing the methods via an api, that allows a UI or CLI to pause/resume, often used if downstream components are having issues (faulty deployment, service migration, etc) and processing needs to be paused, while the issue is fixed.

In regard to you question, you can think about it as franz-go having an internal fetch buffer. You don't have access to that buffer directly but you can poll from it, or you can ask how many records it has. Behind the scenes franz-go has a poll loop that reads messages from Kafka and inject them into this internal buffer. What you are pausing and resuming is that internal poll loop.

There is no contract between your call to Poll and the internal poll loop, both calls share the underlying buffer, the internal poll loop as a producer and your Poll/PollFetches as a consumer. Everything happens concurrently, so while you are calling Poll, Pause and Poll again the internal loop might be working on inserting messages into the internal buffer.

I would try to solve your problem with back pressure if possible.

@stampy88
Copy link
Author

stampy88 commented Jul 3, 2023

Hi, I don't want one partition's back pressure to interfere with another partition. Meaning a slow consuming partition should not affect another partition that is working ok. This is the pseudo-code for what I am doing. workers is a slice of worker that deal with a batch of records for a partition in a separate go routine. The "fetcher" go routine will do the fetch, and for each partition, pause it, then send the records to the worker:

f := c.p.PollFetches(ctx)
...
f.EachPartition(func(p kgo.FetchTopicPartition) {	
	tp := TopicPartition{p.Topic, p.Partition}

	c.mu.RLock()
	w, ok := c.workers[tp]
	c.mu.RUnlock()
	if ok {		
		c.partitionCtrl.Pause(ctx, tp)
		w.records <- p.Records		 // <--- DON'T want this to block
	}
})

I have used pausing and resuming in the Java client to achieve the same sort of thing.

@stampy88 stampy88 closed this as completed Jul 5, 2023
@twmb twmb reopened this Jul 8, 2023
@twmb
Copy link
Owner

twmb commented Jul 8, 2023

I'll add a change in v1.14 that prevents anything from paused topics/partitions being returned after the pause call.

Note that this wastes a bit -- internally this requires bumping an (internal) consumer session, drops everything buffered, and kills all in flight fetch requests. But, it's easier to reason about and it's often what people expect when pausing.

twmb added a commit that referenced this issue Jul 8, 2023
This causes slightly more work (for simplicity, we drop everything
buffered and kill all in flight fetch requests), but this is much easier
to reason about.

Closes #489.
@twmb
Copy link
Owner

twmb commented Jul 8, 2023

You can see a test of the new incoming behavior here: 37b8c81#diff-d04863675913e217c999314e4904fdb952cd7b23268f59cd0b358754588bee04R264

@twmb twmb closed this as completed in c3b083b Jul 8, 2023
@twmb twmb mentioned this issue Jul 8, 2023
17 tasks
@stampy88
Copy link
Author

stampy88 commented Jul 8, 2023

Great, thanks @twmb

twmb added a commit that referenced this issue Oct 21, 2023
Issue #489 asked to stop returning data after a partition was paused --
the original implementation of pausing kept returning any data that was
in flight or already buffered, and simply stopped fetching new data.

489 was dealt with by bumping the consumer session, which kills all
in flight fetch requests. This was easy, but can cause a lot of
connection churn if pausing and resuming a lot -- which is #585.

The new implementation allows fetches to complete, but strips data
from fetches based on what is paused at the moment the fetches are being
returned to the client. This does make polling paused fetches very
slightly slower (a map lookup per partition), but there's only so much
that's possible. If a partition is paused, we drop the data and do not
advance the internal offset. If a partition is not paused, we keep the
data and return it -- same as before.
twmb added a commit that referenced this issue Oct 21, 2023
Issue #489 asked to stop returning data after a partition was paused --
the original implementation of pausing kept returning any data that was
in flight or already buffered, and simply stopped fetching new data.

489 was dealt with by bumping the consumer session, which kills all
in flight fetch requests. This was easy, but can cause a lot of
connection churn if pausing and resuming a lot -- which is #585.

The new implementation allows fetches to complete, but strips data
from fetches based on what is paused at the moment the fetches are being
returned to the client. This does make polling paused fetches very
slightly slower (a map lookup per partition), but there's only so much
that's possible. If a partition is paused, we drop the data and do not
advance the internal offset. If a partition is not paused, we keep the
data and return it -- same as before.
twmb added a commit that referenced this issue Oct 21, 2023
Issue #489 asked to stop returning data after a partition was paused --
the original implementation of pausing kept returning any data that was
in flight or already buffered, and simply stopped fetching new data.

489 was dealt with by bumping the consumer session, which kills all
in flight fetch requests. This was easy, but can cause a lot of
connection churn if pausing and resuming a lot -- which is #585.

The new implementation allows fetches to complete, but strips data
from fetches based on what is paused at the moment the fetches are being
returned to the client. This does make polling paused fetches very
slightly slower (a map lookup per partition), but there's only so much
that's possible. If a partition is paused, we drop the data and do not
advance the internal offset. If a partition is not paused, we keep the
data and return it -- same as before.
twmb added a commit that referenced this issue Oct 21, 2023
Issue #489 asked to stop returning data after a partition was paused --
the original implementation of pausing kept returning any data that was
in flight or already buffered, and simply stopped fetching new data.

489 was dealt with by bumping the consumer session, which kills all
in flight fetch requests. This was easy, but can cause a lot of
connection churn if pausing and resuming a lot -- which is #585.

The new implementation allows fetches to complete, but strips data
from fetches based on what is paused at the moment the fetches are being
returned to the client. This does make polling paused fetches very
slightly slower (a map lookup per partition), but there's only so much
that's possible. If a partition is paused, we drop the data and do not
advance the internal offset. If a partition is not paused, we keep the
data and return it -- same as before.
twmb added a commit that referenced this issue Oct 21, 2023
Issue #489 asked to stop returning data after a partition was paused --
the original implementation of pausing kept returning any data that was
in flight or already buffered, and simply stopped fetching new data.

489 was dealt with by bumping the consumer session, which kills all
in flight fetch requests. This was easy, but can cause a lot of
connection churn if pausing and resuming a lot -- which is #585.

The new implementation allows fetches to complete, but strips data
from fetches based on what is paused at the moment the fetches are being
returned to the client. This does make polling paused fetches very
slightly slower (a map lookup per partition), but there's only so much
that's possible. If a partition is paused, we drop the data and do not
advance the internal offset. If a partition is not paused, we keep the
data and return it -- same as before.
twmb added a commit that referenced this issue Oct 21, 2023
Issue #489 asked to stop returning data after a partition was paused --
the original implementation of pausing kept returning any data that was
in flight or already buffered, and simply stopped fetching new data.

489 was dealt with by bumping the consumer session, which kills all
in flight fetch requests. This was easy, but can cause a lot of
connection churn if pausing and resuming a lot -- which is #585.

The new implementation allows fetches to complete, but strips data
from fetches based on what is paused at the moment the fetches are being
returned to the client. This does make polling paused fetches very
slightly slower (a map lookup per partition), but there's only so much
that's possible. If a partition is paused, we drop the data and do not
advance the internal offset. If a partition is not paused, we keep the
data and return it -- same as before.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants