Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom offset committing interface for consumer/runner #122

Closed
bowendeng opened this issue Aug 27, 2018 · 16 comments
Closed

Add custom offset committing interface for consumer/runner #122

bowendeng opened this issue Aug 27, 2018 · 16 comments

Comments

@bowendeng
Copy link

Feature

Currently, the consumer only supports committing the resolved offset. It would be nice to support custom offset committing.

Use case

We are trying to do aggregation based on kafkajs, but the size of the working set is not fixed. For example, an input stream would look like this:

X0 -- X1 -- X2 -- X3 -- Y0 -- Y1 -- Z0 -- Z1 -- Z2 -- ...

We try to aggrege events of the same kind (agg([X0, X1, X2, X3]), agg([Y0, Y1]), agg([Z0, Z1, Z2, ...])), and only committing offset when the aggregated result has been saved. The problem is that until seeing a Y event, we don't know if all the X events have arrived. However, when we saw Y0, the resolved offset has already passed X3.

Having the ability to commit custom offsets would allow us to properly track the working progress. Thanks!

@bowendeng bowendeng changed the title Add custom offset commit interface for consumer/runner Add custom offset committing interface for consumer/runner Aug 27, 2018
@tulios
Copy link
Owner

tulios commented Aug 28, 2018

Hi @bwdeng, I think you can solve your problem with the current feature set. If you switch to eachBatch you can use two properties in your favor. Set eachBatchAutoResolve to false so the consumer won't auto-resolve the offsets, and set autoCommitThreshold to 1 so you can control the commit flow. Example:

{
  eachBatchAutoResolve: false,
  autoCommitThreshold: 1,
  eachBatch: async ({ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary }) => {
    for (let message of batch.messages) {
      if (aggregate(message)) {
        lastOffset = message.offset
        await heartbeat()
      } else {
        await resolveOffset(lastOffset)
        await commitOffsetsIfNecessary()
      }
    }
  }
}

This is pseudo code and I'm assuming a lot, but let's go through it. You get the whole batch and can detect if you have the full or the partial set. You can force the commit with commitOffsetsIfNecessary. WDYT?

@bowendeng
Copy link
Author

bowendeng commented Aug 28, 2018

@tulios Thanks for the reply. I like the solution you proposed, but I think it doesn't work well if the events of the same kind are fetched across multiple batches. For example,

Ideally
[Batch 1]                   | [Batch 2]
... -- W3 -- X0 -- X1 -- X2 | X3 -- Y0 -- Y1 -- Y2 --...

Actually
[Batch 1]                   | [Batch 2]
... -- W3 -- X0 -- X1 -- X2 | X0 -- X1 -- X2 -- X3 -- Y0 -- Y1 -- Y2 --...

The last offset resolved in Batch 1 is W3. When we fetch the next batch, the new batch actually starts from X0, not X3, so we received X0, X1, and X2 twice here. Those duplicate events can be filtered out with a little bit of performance cost, but the problem still exists if there's a full batch of "intermediate" events, for instance,

Ideally
[Batch 1]                   | [Batch 2]             | [Batch 3]
... -- W3 -- X0 -- X1 -- X2 | X3 -- X4 -- ... -- Xn | Xn+1 -- Xn+2 -- ...

Actually
[Batch 1]                   | [Batch 2]             | [Batch 3]
... -- W3 -- X0 -- X1 -- X2 | X0 -- X1 -- ... -- Xm | X0 -- X1 -- ... -- Xm ...

In such case, we didn't resolve any offset in batch 2, so it will keep fetching the same batch over and over again, not making any actual progress.

I think the fundamental problem is that there's a mismatch between resolved offsets and "aggregation" offsets: the resolved offset is kind of like a reading progress, but the "aggregation" offset is more like a processing progress. For some tasks, we need to look ahead to process. Having an interface for custom offset committing would enable us to decouple the updates of different progress.

@tulios
Copy link
Owner

tulios commented Aug 28, 2018

I see, but are you planning to change the offset outside of the consumer loop? You can call resolveOffset with any offset, and with autoCommitThreshold set to 1 commitOffsetsIfNecessary will always commit. I'm just making sure that we can't cover this scenario with the current feature set.

@bowendeng
Copy link
Author

Exactly, I was hoping that there's a way to commit the offset of a message outside of its original batch callback.

At first, I tried updating the offset through the admin interface. But got The coordinator is not aware of this member error. It seems like only the active member can update the offsets of their assigned partitions.

Currently, I have a flaky patch which exposes the underlying ConsumerGroup used by the Runner, and use it to update offsets:

public async commit(topic: string, partition: number, offset: string): Promise<boolean> {
    const { groupId, generationId, memberId, coordinator } = this.consumerGroup; // the underlying consumer group
    if (coordinator && coordinator.isConnected()) {
        await coordinator.offsetCommit({
            groupId,
            memberId,
            groupGenerationId: generationId,
            topics: [
                {
                    topic,
                    partitions: [{ partition, offset }],
                },
            ],
        });
        return true;
    }
    return false;
}

But I don't think this is the proper way to do it. Also, I had to patch the fetch function here as commitOffsetsIfNecessary to disable the auto-offset committing after each fetch.

So do you suggest that we should use something like:

async function commit(offset) {
    const currentOffset = getResolvedOffset() // backup current offset
    await resolveOffset(offset) // resolve offset for committing only
    await commitOffsetsIfNecessary() // commit offset
    await resolveOffset(currentOffset) // restore the original offset
}

Thanks!

@tulios
Copy link
Owner

tulios commented Aug 28, 2018

You have an interesting use case. The admin client can change offsets outside of the consumer group, but the group must be empty, which doesn't help you much. Can you try the approach we've discussed first? Just remember that resolveOffset will resolve offsets for the partition of the batch.

I think you can go a long way with eachBatch but if you can't solve the problem we can discuss a proper solution.

@bowendeng
Copy link
Author

bowendeng commented Aug 30, 2018

@tulios I implemented a custom offset commit function based on the approach we discussed:

const resolvedOffsets: { [topic: string]: string[] } = {};
const commitFns: { [topic: string]: Array<(offset: string) => Promise<void>> } = {};

async function commit(topic: string, partition: number, offset: string): Promise<boolean> {
    const commitFn = commitFns[topic][partition];
    if (commitFn) {
        await commitFn(offset);
        return true;
    }
    return false;
}

function updateCommitFn(
    batch: Batch,
    resolveOffset: (offset: string) => void,
    commitOffsetsIfNecessary: () => Promise <void>,
) {
    const { topic, partition } = batch;
    // For an empty batch, firstOffset() will return null, and lastOffset()
    // will return HIGH_WATERMARK - 1
    const currentOffset = batch.firstOffset() || batch.lastOffset();
    resolvedOffsets[topic][partition] = currentOffset;
    commitFns[topic][partition] = async (offset: string) => {
        // set resolve offset as the target and commit it
        resolveOffset(
            Long.fromValue(offset)
                .sub(1)
                .toString(),
        );
        await commitOffsetsIfNecessary();
        // restore the resolved offset
        resolveOffset(resolvedOffsets[topic][partition]);
    };
}

async function main() {
    const kafka = new Kafka({ brokers: ["kafka:9092"], clientId: "test-client" });
    const consumer = kafka.consumer({ groupId: "test-group", partitionAssigners: [PartitionAssigners.roundRobin] });
    await consumer.connect();
    await consumer.subscribe({ topic: "test-topic", fromBeginning: true });
    await consumer.run({
        autoCommitThreshold: 1,
        eachBatchAutoResolve: false,
        eachBatch: async ({ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary }) => {
            const { topic, partition, messages } = batch;

            // update commit function
            updateCommitFn(batch, resolveOffset, commitOffsetsIfNecessary);

            // consume messages
            for (const { offset, key, value, timestamp } of messages) {
                // process could call commit for with a custom offset
                await process({ topic, partition, offset, key, value, timestamp });
                await heartbeat();
            }

            // resolve offset manually
            const offset = batch.lastOffset();
            resolveOffset(offset);
            resolvedOffsets[topic][partition] = offset;
        },
    });
}

Also, I need to patch commitOffsets to disable the auto committing in fetch.

// Patch kafkajs to disable auto offset committing
const consumerGroup = require("kafkajs/src/consumer/consumerGroup");
consumerGroup.prototype.commitOffsets = async () => undefined;

It works, sort of. Frankly, I still think it would be nice to have an interface which can commit custom offsets directly. I suggest that:

  • Pass a commitOffsets function (which doesn't change the resolved offset and doesn't check the auto-commit threshold) to the eachBatch callback.
  • Make the offset committing in fetch configurable.

What do you think?

@tulios
Copy link
Owner

tulios commented Aug 30, 2018

Yeah, that's a lot of work. Bear with me while I fully understand the problem 😄
I gave your problem some thought and I think you have a legit use case for "keeping" the offsets outside of Kafka. The idea is that you keep the offsets based on your aggregation logic (similar to what you have in the example) and you resolve all messages as you process them, once you figure out that the aggregation is over you call seek to the correct offset and start processing the new aggregation. Have you thought about that?

Another note, roundRobin is the default partition assigner, so you don't need to configure.

I'm going on vacation for a week and a half, so I'll be a bit silent here. @Nevon can take it from here.

I'm still not sure about a generic commitOffsets on the each* handlers; I will give that some more thought and get back to you soon.

@bowendeng
Copy link
Author

bowendeng commented Aug 30, 2018

@tulios Thanks again for replying. Yes, we thought about saving offsets outside Kafka. For us, reducing dependency is the main reason for preferring Kafka's offset management. So we don't need to maintain another service or media for offset storage. Besides, it probably need extra effort to handle the rebalancing cases.

We can pick up the discussion later. Have a nice vacation!

@sklose
Copy link
Contributor

sklose commented Sep 11, 2018

I have a different use case for which it would be useful to be able to manually commit offsets. I have a few services that consumes a mixture of compacted topics and regular topics. I am using compacted topics to recover the state of the service - meaning that topic should always be read from the beginning and I do not want to commit any offsets here at all whereas with the other topics I want to use normal offset commits for checkpointing.

An example to illustrate the use case:

  • topic = 'customers' ... this contains the latest version of each customer, I keep an in-memory cache of them in the service for fast access, rare updates are flowing in here in real-time
  • topic = 'impressions' ... some sort of high throughput transaction related to a customer ... the service performs an in-memory join of this data with the customer profile that is cached and then produces some output

So i would want to commit offsets for 'impressions', but not for 'customers'

@tulios
Copy link
Owner

tulios commented Sep 11, 2018

Hi @bwdeng and @sklose, I just came back from vacation. I will make some suggestions soon.

@sklose I think you can achieve what you want in a different way: 1) you could use different group ids whenever you need to read the topic from the beginning, something like group-id-${Date.now()}; 2) you can use seek to change the offset, or 3) you can use the admin client to reset the offsets after you load the data in memory.

I don't see the use case for manual commits on the each* callbacks; I think something like the Java API (poll, commit, etc) is better for it. I don't want to make any suggestions yet, but I will go through the thread again and see if I can find something that you can work with. Thanks.

@sklose
Copy link
Contributor

sklose commented Sep 11, 2018

Unfortunately all three options have some downsides I'd like to avoid

  1. different group ids: I used that approach to simulate the behavior - the problem is that I end up with a long list of group ids in kafka manager and it becomes really hard to find what I am looking for during troubleshooting
  2. seek seems to have a race condition: the documentation says I can only call seek after I called run, which means I might already be receiving messages from behind the stored offset before the seek is complete
    also seek requires me to know the partition I am subscribed to, which I can't seem to query ... I only seem to have access to it when I receive a message/batch - which means I need to wait for the first update on my compacted topic/partition before I can rewind it back to the beginning
  3. the admin client only works if there is no active consumer, I might have multiple services running for scaling purposes though (with purpose built partitioning so the right data flows to the right instance and the joins will work)

I feel like not committing he offsets in the first place would be the cleanest solution and avoids any unwanted side effects.

@ianwsperber
Copy link
Contributor

As @tulios pointed out this is at least partially addressed by #232. I'm doing some related work now and will see if there's still a need to expose/change a commit method to accomplish this.

@Pingze-github
Copy link

Unfortunately all three options have some downsides I'd like to avoid

1. different group ids: I used that approach to simulate the behavior - the problem is that I end up with a long list of group ids in kafka manager and it becomes really hard to find what I am looking for during troubleshooting

2. seek seems to have a race condition: the documentation says I can only call seek after I called run, which means I might already be receiving messages from behind the stored offset before the seek is complete
   also seek requires me to know the partition I am subscribed to, which I can't seem to query ... I only seem to have access to it when I receive a message/batch - which means I need to wait for the first update on my compacted topic/partition before I can rewind it back to the beginning

3. the admin client only works if there is no active consumer, I might have multiple services running for scaling purposes though (with purpose built partitioning so the right data flows to the right instance and the joins will work)

I feel like not committing he offsets in the first place would be the cleanest solution and avoids any unwanted side effects.

You are right. In Python, I can use the code below to implement this:

def getMsgs(consumer, num):
  msgs = []
  for i in range(10):
    msgs.append(consumer.consume())
  # commit offset manully
  consumer.commit_offsets()
  return msgs
msgs = getMsgs(consumer, 10)

Is is very easy to do this when I can commit offset myself.

@tulios
Copy link
Owner

tulios commented Mar 26, 2019

@Pingze-github the pull API usually adds a lot of overhead to userland code, if you have all primitives as individual methods you will have to cater for rebalances, etc. The code I usually see around follows the same pattern, it processes messages as fast as they can, committing successful offsets on a certain threshold; on errors, the offset is not committed and the whole process is retried. KafkaJS implements this pattern, the eachMessage handler is where you implement your process logic, if the handler doesn't throw an error the library will checkpoint the offset and commit following a threshold defined by the user (time, amount of messages, etc). In case of any errors the offset won't be committed and the library will make sure that the previous offsets are committed.

If you need more flexibility you can always use eachBatch, which provides all the primitives for that, the API is different than the pull API seen in the Java and Python libraries, but this allows us to automatically handle Kafka related errors, simplifying the userland code and it also helps the library to be future proof, since we don't leak the internals of how the Kafka API works.

I find extremely valuable to understand the use cases and I'm always open to change or add new APIs, if you have the time I would love to know why this API doesn't work for you and how we can improve it. Thanks.

@Pingze-github
Copy link

@Pingze-github the pull API usually adds a lot of overhead to userland code, if you have all primitives as individual methods you will have to cater for rebalances, etc. The code I usually see around follows the same pattern, it processes messages as fast as they can, committing successful offsets on a certain threshold; on errors, the offset is not committed and the whole process is retried. KafkaJS implements this pattern, the eachMessage handler is where you implement your process logic, if the handler doesn't throw an error the library will checkpoint the offset and commit following a threshold defined by the user (time, amount of messages, etc). In case of any errors the offset won't be committed and the library will make sure that the previous offsets are committed.

If you need more flexibility you can always use eachBatch, which provides all the primitives for that, the API is different than the pull API seen in the Java and Python libraries, but this allows us to automatically handle Kafka related errors, simplifying the userland code and it also helps the library to be future proof, since we don't leak the internals of how the Kafka API works.

I find extremely valuable to understand the use cases and I'm always open to change or add new APIs, if you have the time I would love to know why this API doesn't work for you and how we can improve it. Thanks.

I want to create a function that can return messages of a centain number. It can also update the offset to make sure next time I can start where I left off.
I have tried using eachMessage and eachBatch, but found no solution to meet my demand. Looks like I cannot control offset accurately? (I can only set autoCommit settings).
Am I wrong? Or is there a best way to do this? Thanks for your reply.

@tulios
Copy link
Owner

tulios commented Jul 23, 2019

Fixed by #436

@tulios tulios closed this as completed Jul 23, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants