Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

The best practice of handling offsets of asynchronous consuming? #106

Closed
asaarashi opened this issue Sep 29, 2016 · 6 comments
Closed

The best practice of handling offsets of asynchronous consuming? #106

asaarashi opened this issue Sep 29, 2016 · 6 comments

Comments

@asaarashi
Copy link

asaarashi commented Sep 29, 2016

Hi, thanks for sharing the great library!
I have a case as the following, the consumer consumes the message asynchronously, so the offset may not be committed serially. What is the best practice of handling offsets to guarantee the offset committing correctly?

for {
        select {
        case m, ok := <- consumer.Messages():
            if ok {
                go func(m *sarama.ConsumerMessage) {
                    exportMessage := worker.Ingest(m)

                    // TODO: Commit the offsets
                }(m)
            }
        }
    }
@wvanbergen
Copy link
Owner

You could use a channel to tell the main grouting about messages that have been completely processed.

However, you should be careful with asynchronously processing messages. Because not all messages will take equally long to process, a message with offset 3 could be done before a message with offset 2. What do you want to do in this case?

On Sep 29, 2016, at 4:52 AM, shiberiu notifications@github.com wrote:

Hi, thanks for sharing the great library!
I have a case as the following, the consumer consumes the message asynchronously, so the offset may not be committed serially. What is the best practice of handling offsets to guarantee the offset committing correctly?

for {
select {
case m, ok := <- consumer.Messages():
if ok {
go func(m *sarama.ConsumerMessage) {
exportMessage := worker.Ingest(m)

                // TODO: Commit the processed offsets
            }(m)
        }
    }
}


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub #106, or mute the thread https://github.com/notifications/unsubscribe-auth/AAA9_v8ciuq-LXc6LcuQymkBzUOClDudks5quyfQgaJpZM4KJhwO.

@asaarashi
Copy link
Author

Thanks for quick reply.

I need to consume message at high concurrency, so I fork many goroutines to do that.

Due to avoid the risk of offset management of asynchronous consuming message, I figure out a more safer way: commit the offset immediately, and if fails to upload the message, save it for re-consume.

    for {
        select {
        case m, ok := <- consumer.Messages():
            if ok {
                if err := consumer.CommitUpto(m); err == nil {
                    go func(m *sarama.ConsumerMessage) {
                        exportMessage := worker.Ingest(m)

                        // TODO: Uploads the consumed message. If fails to upload, saves the message for re-consume.
                    }(m)
                }
            }
        }
    }

Is this a possible solution?

@wvanbergen
Copy link
Owner

The standard solution to consume faster is by having more partitions and more consumer instances.
If you can get that to work, that would be by far my preferred solution because you can let go of all the complexity, and you get to keep all the guarantees Kafka offers.

If that doesn’t work for you, it really depends on what kind of guarantees your application need.
Saving failed messages for re-consume could work, but it really depends on your application.
You will lose the ordering guarantee with this approach, for instance.

Willem

On Sep 29, 2016, at 8:47 AM, shiberiu notifications@github.com wrote:

Thanks for quick reply.

I need to consume message at high concurrency, so I fork many goroutines to do that.

Due to avoid the risk of offset management of asynchronous consuming message, I figure out a more safer way: commit the offset immediately, and if fails to upload the message, save it for re-consume.

for {
    select {
    case m, ok := <- consumer.Messages():
        if ok {
            if err := consumer.CommitUpto(m); err == nil {
                go func(m *sarama.ConsumerMessage) {
                    exportMessage := worker.Ingest(m)

                    // TODO: Uploads the consumed message. If fails to upload, saves the message for re-consume.
                }(m)
            }
        }
    }
}

Is this a possible solution?


You are receiving this because you commented.
Reply to this email directly, view it on GitHub #106 (comment), or mute the thread https://github.com/notifications/unsubscribe-auth/AAA9_lIjCGHA0IwWMK_gYFvbVaSQhGsCks5qu18LgaJpZM4KJhwO.

@asaarashi
Copy link
Author

Yes, that may be a better approach. More standard and less complexity.

Assumes that there are 2 partitions in topic1, would it look like the following?

consumer1, consumerErr1 := consumergroup.JoinConsumerGroup("main-group", []string{"topic1"}, zookeeperNodes, kafkaConfig)
consumer2, consumerErr2 := consumergroup.JoinConsumerGroup("main-group", []string{"topic1"}, zookeeperNodes, kafkaConfig)

go func() {
    for {
        select {
            case consumer1.Messages():
            // Process message...
        }
    }
}()

go func() {
    for {
        select {
            case consumer2.Messages():
            // Process message...
        }
    }
}()
// ......

@wvanbergen
Copy link
Owner

No, you just call JoinConsumerGroup once. Then, you simply start the application several times simultaneously. Added benefit of this is that you can do this on multiple servers.

@asaarashi
Copy link
Author

Cleared. Thank you!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants