-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer groups question #46
Comments
From your blog post I found that you catered for consumer groups by leveraging the nats QueueSubscribe, nice! Just one thing I noticed. When a consumer in the group leaves the group (kill the client) newly published messages are not delivered to the one member still in the group that would've been delivered to the killed one. Do you expect the publisher to republish the message if not acknowledged in time? If you re-publish the messages they might also be routed to the offline consumer. It will be nice to have the groups dynamic especially when scaling workers up and down on kuberentes (like the normal nats QueueSubscribe). |
Hi @riaan53, Liftbridge currently doesn't have something that maps exactly to Kafka consumer groups. As you point out, streams can join a load-balance group, which simply means messages from the NATS topic are distributed between streams in the group. If you need a fault-tolerance consumer of a stream, right now you have to manage that manually (e.g. by tracking your position in the stream, handling failover, etc.). I do intend to add support for higher-level consumer functionality which would include something equivalent to consumer groups. Step one of that however is first implementing consumer-offset checkpointing in the log which is on the roadmap. |
Awesome! Thank you for the feedback. This project is heavily needed. I did try something like this in the past but got way to complex. You took the correct approach. Looking forward to the future of this project and to contribute as well. You are welcome to close this issue or leave it open to track the feature request. |
I will leave it open for now since it is a need that needs addressed. |
Hi @tylertreat Thanks for all the cool work you are doing on liftbridge, very interested to use it. The one feature I am waiting on is consumer groups, I know it was planned to be released in H1 2021 (was hoping for it to be in 1.7.0), but I was wondering if there were any rough predictions on when it might get implemented? Thanks! |
@danthegoodman1 I'm pretty close to having consumer groups implemented. There is a branch here if you want to follow along. My hope is to have a beta API and accompanying Go client support available by the end of November. The bulk of the work left is implementing group coordinator failure detection and failover and finishing some work on the Go client side. |
FYI The implementation of consumer groups is mostly completed. If anyone is interested in trying out consumer groups I'd love to get some early feedback before it's included in a release. Let me know if you'd be interested. |
@tylertreat Is the go client ready as well? I might have some time this weekend if that's not blocking. |
Yep, Go client will have support. The only thing left to implement (outside of more robust testing) is group coordinator failover, i.e. high availability for consumer groups. This will follow a similar pattern as partition leader failover. But I don't think this is strictly needed to just try the consumer group API out, it's more of a production-ready item. I will get branches pushed up and write up some docs on using the API for anyone interested in giving it a spin. |
@tylertreat let me know if you get those docs ready, I'll be happy to test. |
Will do. I was meaning to get that done today. I'll try to get to it this weekend. |
Here are the branches to use: liftbridge: https://github.com/liftbridge-io/liftbridge/tree/consumer_groups Here is a little test program demonstrating the use of consumer groups: package main
import (
"fmt"
"context"
lift "github.com/liftbridge-io/go-liftbridge/v2"
)
func main() {
client, err := lift.Connect([]string{"localhost:9292"})
if err != nil {
panic(err)
}
if err := client.CreateStream(context.Background(), "foo", "foo"); err != nil {
if err != lift.ErrStreamExists {
panic(err)
}
}
consumer, err := client.CreateConsumer("my-group")
if err != nil {
panic(err)
}
err = consumer.Subscribe(context.Background(), []string{"foo"}, func(msg *lift.Message, err error) {
if err != nil {
panic(err)
}
fmt.Println(msg.Stream(), msg.Partition(), msg.Offset(), string(msg.Value()))
})
if err != nil {
panic(err)
}
println("waiting for messages...")
select {}
} You can run a few instances of these and Liftbridge will handle assigning the stream partition to a consumer. This example just uses a single stream with a single partition, but you could also change the stream to multiple partitions and the partitions would be assigned to various consumers in the group. You could also have group members subscribe to multiple streams. Publish some messages and you should see the messages on a specific partition should only go to a single group member. Then, you can kill that consumer and Liftbridge will reassign the partition (with some delay for failure detection). Consumers should always pick up where they left off in the stream. Try killing the consumer and restarting it, for example. Let me know if you have any questions or run into any issues! |
Thanks, will try out either tonight or tomorrow! |
@tylertreat to clarify, it looks like consumer groups are implicit? Such being that a normal consumer now is just a group of 1? |
Consumers always operate in the context of a consumer group. If you don't actually want the mechanics of a group you would just specify a unique group name for each consumer. Alternatively, you could just use client.Subscribe, which is outside the context of consumers/groups. |
Also, in terms of cursor tracking, is there any way for a consumer to know which partitions it is currently consuming, so it can fetch the offset for those partitions (to reset where it was) @tylertreat ? |
Or I should say know without waiting for the first message in each partition. |
Otherwise so far everything is great! |
The consumer does know its assignments though that information is not currently exposed. It'd help me to better understand your use case as the current design is intended to abstract away the need for users to manage cursors directly. |
@tylertreat my understanding after reading https://liftbridge.io/docs/cursors.html is that the consumers will pretty much restart at the beginning of the stream when they are restarted since there is no automatic cursor management. I would need to tell it to keep track of it itself. The question came from wanting to make sure that if a consumer came alive to take over a partition previously handled by another consumer, or on scale down and a consumer added a new partition to consume, that it could look up what the last offset was and use it with the Although the way you mention it makes it sounds like that is actually managed? I just want to make sure that when consumers are passing around partitions during consumer group scale up/down that they pick up where another consumer would have left off. |
So I could implement checking the cursor for all partitions a consumer is currently responsible for on startup, and setting them to the last known position |
Yes, actually consumers are designed to handle this for you. By default they will checkpoint their position automatically (every 5 seconds IIRC). You can disable auto checkpointing if you'd like and manually checkpoint if you prefer more granular control (e.g. to reduce possibility for duplicate delivery). Consumers will always resume at the last checkpointed position. I can put together an example of manual checkpointing to demonstrate this. |
@tylertreat Ah ok, that was not obvious from the docs, maybe something to add in. Or maybe I missed it since I was searching for |
Nope, you're correct. The docs have not been updated yet with the consumer group work. That will get updated before that work is released. |
This shows an example of disabling auto checkpointing of cursors. Instead, each cursor is checkpointed after every message is processed. package main
import (
"fmt"
"context"
lift "github.com/liftbridge-io/go-liftbridge/v2"
)
func main() {
client, err := lift.Connect([]string{"localhost:9292"})
if err != nil {
panic(err)
}
if err := client.CreateStream(context.Background(), "foo", "foo"); err != nil {
if err != lift.ErrStreamExists {
panic(err)
}
}
consumer, err := client.CreateConsumer("my-group", lift.AutoCheckpoint(0))
if err != nil {
panic(err)
}
err = consumer.Subscribe(context.Background(), []string{"foo"}, func(msg *lift.Message, err error) {
if err != nil {
panic(err)
}
fmt.Println(msg.Stream(), msg.Partition(), msg.Offset(), string(msg.Value()))
if err := consumer.Checkpoint(context.Background()); err != nil {
panic(err)
}
})
if err != nil {
panic(err)
}
println("waiting for messages...")
select {}
} |
Got it, thanks for clarifying @tylertreat ! Otherwise everything seems super logical about how consumer groups are handled, really love the simplicity. Looking forward to seeing it in master :) |
@tylertreat did this get merged in? |
Not yet. There ended up being a bit of a long tail of work to get this fully wrapped up, both in the client and server. Estimates are hard. :) Probably another couple of weeks before everything is merged, realistically. I'll be sure to update this issue once it's in. |
@tylertreat Yes they are, no worries just wanted to make sure I wasn’t missing it! |
Merged in #387 which will be released as part of Liftbridge 1.8.0. |
Hi there,
Very interesting project!
Im busy with an event sourcing microservice design and looking at all the options. One thing that is missing from liftbridge is the consumer groups from you roadmap. It simplifies a lot in a microservice design (HA workflows and loadbalancing). Do you have another way to accomplish this with the current features or do you plan to add support for something like this? Sorry for the question here.
Regards,
Riaan
The text was updated successfully, but these errors were encountered: