Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced top-level ConsumerGroup construct #277

Merged
merged 28 commits into from Aug 9, 2019

Conversation

stevevls
Copy link
Contributor

This is a lower-level API that unlocks use cases such as consuming
each assigned partition using a separate Reader or to overwrite
offsets for a group.

Refactored logic out of Reader and into ConsumerGroup so that
Reader uses the ConsumerGroup when GroupID is set.

This is a lower-level API that unlocks use cases such as consuming
each assigned partition using a separate Reader or to overwrite
offsets for a group.

Refactored logic out of Reader and into ConsumerGroup so that
Reader uses the ConsumerGroup when GroupID is set.
@stevevls
Copy link
Contributor Author

stevevls commented May 21, 2019

This change is in support of #263. It is largely a lift-and-shift, pulling logic out of Reader and into ConsumerGroup. Some parts have been re-written to make group membership more crisp and to improve testability.

Tests are currently failing, and more tests are forthcoming, but this PR currently captures the spirit of what we're trying to do.

The way the ConsumerGroup can be used to consume messages per-partition would look something like:

func main() {
        group, err := NewConsumerGroup(ConsumerGroupConfig{
                ID:             "my-group",
                Brokers:        []string{"kafka:9092"},
                Topics:         []string{"my-topic"},
                CommitInterval: time.Second,
        })
        if err != nil {
                fmt.Printf("error creating consumer group: %+v\n", err)
                os.Exit(1)
        }
        defer group.Close()

        consume := func(reader *Reader, gen *Generation) {
                for {
                        msg, err := reader.ReadMessage(context.TODO())
                        switch err {
                        case io.EOF:
                                return // reader has been closed
                        case nil:
                                fmt.Printf("received message %s/%d/%d : %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
                                gen.MarkOffset(msg.Topic, msg.Partition, msg.Offset)
                        default:
                                fmt.Printf("error reading message: %+v\n", err)
                        }
                }
        }

        for {
                gen, err := group.Next(context.TODO())
                if err != nil {
                        fmt.Printf("error getting next generation: %+v\n", err)
                        break
                }

                assignments := gen.Assignments["my-topic"]
                readers := make([]*Reader, len(assignments))
                for i, assignment := range assignments {
                        readers[i] = NewReader(ReaderConfig{
                                Brokers:   []string{"127.0.0.1:9092"},
                                Topic:     "my-topic",
                                Partition: assignment.ID,
                        })
                        readers[i].SetOffset(assignment.Offset)
                        go consume(readers[i], gen)
                }

                <-gen.Done
                for _, reader := range readers {
                        reader.Close()
                }

                if err = gen.Commit(); err != nil {
                        fmt.Printf("error committing group, generation %d: %+v\n", gen.ID, err)
                        os.Exit(1)
                }
        }
}

It could also be used to overwrite group offsets with code like:

func main() {
        group, err := NewConsumerGroup(ConsumerGroupConfig{
                ID:      "my-group",
                Brokers: []string{"kafka:9092"},
                Topics:  []string{"my-topic"},
        })
        if err != nil {
                fmt.Printf("error creating consumer group: %+v\n", err)
                os.Exit(1)
        }
        defer group.Close()

        gen, err := group.Next(context.TODO())
        if err != nil {
                fmt.Printf("error getting next generation: %+v\n", err)
                os.Exit(1)
        }
        err = gen.CommitOffsets(map[string]map[int]int64{
                "my-topic": {
                        0: 123,
                        1: 456,
                        3: 789,
                },
        })
        if err != nil {
                fmt.Printf("error committing offsets next generation: %+v\n", err)
                os.Exit(1)
        }
}

@abuchanan-nr
Copy link
Contributor

I haven't dug into the details of the code yet, but some high-level thoughts:

In your example, the readers are closed twice. Is this a bug?

In your example, I don't see offsets being marked on the group/generation. Is this a bug? Would you need to also pass the Generation to the consume code in order to call MarkOffset?

Will the goroutines be required to share a Generation (and therefore a single commit queue) in order to commit offsets? Ideally, the per-partition goroutines wouldn't share a commit queue, but maybe that's not practical?

Is Generation.CommitOffsets just Generation.MarkOffset() + generation.Commit()?

This is a good example of why having constructors like NewReader panic is a bad – you might end up using those constructors in a way where you don't want them to panic. I'm definitely worried about burying that panic in my shared library code when I wrap this low-level API in a higher level API.

You might consider moving low-level stuff to a subpackage. ConsumerGroup is such a common type in kafka-land, I can see people easily getting confused with this low-level API.

Looks pretty good though. I was confused at first, but I had to remind myself this is a low-level API. I expect I'll need to wrap this in a higher-level API, but I think all the pieces are there to do that.

@stevevls
Copy link
Contributor Author

stevevls commented May 22, 2019

Great feedback...thanks! The example code I wrote was kind of off-the-cuff, so not surprised you found some bugs in there. 😄 I updated the example snippet to pass the generation and call MarkOffset and to remove the double close on the reader.

Will the goroutines be required to share a Generation (and therefore a single commit queue) in order to commit offsets? Ideally, the per-partition goroutines wouldn't share a commit queue, but maybe that's not practical?

At the moment, yes. This is a carry-over from how they're currently handled. This strategy reduces round trips. Can you think of any cases where it would be bad to have the shared commit queue?

Is Generation.CommitOffsets just Generation.MarkOffset() + generation.Commit()?

Again, this is an artifact from the way we currently handle offsets. The code in Reader currently only will mark the offset in an ascending direction. CommitOffsets gives a way to commit exact offsets, even if they go backward. If having all these functions feels awkward, I could push the MarkOffset stuff back into the Reader and only provide the CommitOffsets call on the Generation. Perhaps it's overly opinionated to have all these different functions...

This is a good example of why having constructors like NewReader panic is a bad – you might end up using those constructors in a way where you don't want them to panic. I'm definitely worried about burying that panic in my shared library code when I wrap this low-level API in a higher level API.

Great point. We hadn't yet captured that on the wish list in #150, but it's there now! It's possible to avoid the panic with a bit of care since it's only hit if the config is invalid, but generally speaking, it's concerning. We'll have to fix that at some point...

You might consider moving low-level stuff to a subpackage. ConsumerGroup is such a common type in kafka-land, I can see people easily getting confused with this low-level API.

I'll talk to the other folks to see what our long-term plans are...we eventually want to export many more of the lower-level operations, so this may be a good forcing function for us to consider how all that should look.

@abuchanan-nr
Copy link
Contributor

Can you think of any cases where it would be bad to have the shared commit queue?

In the case of synchronous commits, wouldn't it essentially synchronize all the readers? Also, I just noticed that the config says If 0, commits will be handled synchronously. but looking at this code, I'm not sure that applies to MarkOffset.

Otherwise, I think the reader threads will be sharing a single lock on the underlying offsets map. But, honestly I can't say for certain how that would impact performance without testing. Possibly the locking could be replaced by buffered channel or sync.Map, if necessary.

If having all these functions feels awkward, I could push the MarkOffset stuff back into the Reader

It does feel a bit awkward, but not sure how the example code would work without MarkOffset. I guess the user would need their own offset stash and commit loop. Given this is a low-level API, I guess it's normal to have a complex API.

@stevevls
Copy link
Contributor Author

Ah, the docs on CommitInterval need to be updated to say that commits won't be managed...that's copy/paste from pulling it from the Reader. Good catch!

Good call re: the synchronous commit. The current Reader essentially does serialize commits across partitions, but that's not a good reason to preserve that behavior. I have yet to ever use the synchronous commit feature, so I'm also unconvinced of its utility or that it should be the default, but that's a topic for another day...

Thinking a bit more about the commit loop, one could argue that the partition watcher falls into the same camp--it's logic that someone implementing consumer group code may very well want, but strictly speaking, it's not actually a part of the group. That said, I think there's a better case for including the partition watcher since it is involved in group mechanics, i.e. rebalancing. Tracking and managing offsets to commit has nothing to do with the mechanics of the group.

I think all this discussion is pointing towards removing the MarkOffset and Commit functions and putting the commit loop back into the reader. Thanks! 😄

@stevevls
Copy link
Contributor Author

I've done some refactoring that I think makes the consumer group a little more idiomatic by removing the exported Done channel in favor of having the group launch the daemon go routines and passing a context.Context. I think that it also makes it safer by ensuring that e.g. a user provided commit loop doesn't exit prematurely while the rest of the consumer group grinds on.

I added some example code as well. I think that's probably the best place to start if looking over this code as it shows how a caller would interact with the consumer group. The next best place to look is the reader code to see how it's been refactored.

I'm still thinking that it might be nice to provide commit loop functionality but as a separate type that works with a consumer group generation. I think the primary use folks will have for this code is the parallel reader example, so it would be nice to provide commit management code. Still working through what that will look like, though. Also, such a change can come later since it would be new code and would not break b/w compatibility.

@stevevls stevevls mentioned this pull request Jun 25, 2019
@achille-roussel achille-roussel self-assigned this Jul 12, 2019
@vgvineet4
Copy link

Can this PR be merged as my use case is to reset offset for consumer group.

@stevevls
Copy link
Contributor Author

Hi @vgvineet4. I'm finishing up some test cases now and hope to merge this soon. As I've been refactoring the code, I've discovered a lot of edge cases and error conditions that weren't properly handled. Once I get the tests ironed out, we'll be good to merge this. I think the API is locked down at this point, so if you'd like to pull this branch as a pre-release, I don't anticipate any breaking changes with subsequent commits.

Copy link
Contributor

@achille-roussel achille-roussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice work here! Let's get this merged 👍

@stevevls stevevls merged commit 11670d5 into master Aug 9, 2019
@stevevls stevevls deleted the consumer-groups-refactor branch August 9, 2019 20:06
@cuiyuan
Copy link

cuiyuan commented Aug 13, 2019

Is this feature available?

@achille-roussel
Copy link
Contributor

@cuiyuan yes, we release v0.3.1 which includes the work that was merged in this PR

@chenhengqi
Copy link

Awesome!

This feature should be documented. Also the WatchPartitionChanges field in ReaderConfig.

@stevevls
Copy link
Contributor Author

stevevls commented Sep 5, 2019

You raise an excellent point! 😁 #347

stevevls pushed a commit that referenced this pull request Sep 19, 2019
Previously, the batch read functions returned io.EOF to signal the
end of the batch.  However, this caused ambiguity because there are
valid io.EOF errors from other sources (e.g. a connection that is
closed by the broker).  Because io.EOF can leave the conection in
an unusable state, we are closing the connection and logging it.
However, this also means that we're closing the connection after
every single fetch request, which is not desirable.

This PR also fixes an accidental line deletion of deadline
management that was introduced in #277.

Fixes #258
stevevls pushed a commit that referenced this pull request Sep 21, 2019
The Batch read functions return io.EOF to signal the end of the batch. 
However, there was ambiguity because io.EOF errors could also surface 
for other reasons (e.g. a connection closed by the broker).  Due to this
ambiguity, the Reader would treat io.EOF as an error.  In turn, this
would result in an error message being logged and the connection being
closed after every single fetch request.  Such behavior is undesirable
from a performance perpsective because the connection could be re-used. 
It is also misleading because it increases error stats and logs stats
when there is really no error.

This PR fixes the handling of the io.EOF in the Reader to be an expected
condition meanwhile ensuring that the Batch maps any underlying io.EOF
errors to io.ErrUnexpectedEOF.

This PR also fixes an accidental line deletion of deadline
management that was introduced in #277.

Fixes #258
@madneal
Copy link

madneal commented May 22, 2020

@stevevls I tried to use this feature. But I found cannot use kafka-consumer-groups to list the CURRENT-OFFSET of the groupId. Does it has something to do with this feature?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants