Skip to content

Commit

Permalink
feat: manual commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Yankov committed Apr 8, 2024
1 parent 1bf2588 commit c1959a5
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 0 deletions.
17 changes: 17 additions & 0 deletions packages/kafka-client/src/events/provider/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,20 @@ export class Topic {
});
}

/**
* Manually commit the current offset.
*/
async commitOffset(): Promise<void> {
try {
// Commit the current offset
await this.commitCurrentOffsets();
this.provider.logger.verbose('Offset committed manually');
} catch (error) {
this.provider.logger.error('Failed to commit offset manually', { code: error.code, message: error.message, stack: error.stack });
throw error;
}
}

/**
* Internal function for receiving event messages from Kafka and
* forwarding them to local listeners.
Expand All @@ -439,6 +453,9 @@ export class Topic {
this.provider.logger.debug(`kafka received event with topic ${context.topic} and event name ${eventName}`, { decodedMsg });
this.emitter.emit(eventName, decodedMsg, context,
this.config, eventName);

// Manual offset commit
this.commitOffset();
}
}
}
Expand Down
36 changes: 36 additions & 0 deletions packages/kafka-client/test/kafka.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ describe('Kafka provider test', () => {
const client: Events = new Events(kafkaConfig.events.kafka, logger);
const topicName = 'com.example.test';
const eventName = 'exampleEvent';
let initialOffset: number;
before(async () => {
// start the client
await client.start();
initialOffset = await (await client.topic(topicName)).$offset(-1);
});
after(async function() {
// stop the client
Expand Down Expand Up @@ -113,4 +115,38 @@ describe('Kafka provider test', () => {
countArr.length.should.equal(5);
});
});

describe('Manual Commit', () => {
it('should manually commit offset after processing message', async () => {
// Create topic object
const topic: Topic = await client.topic(topicName);
let offset: number;

// Subscribe to topic for example-event with listener as callback.
await topic.on(eventName, async (message, context) => {
// Ensure that message is processed
should.exist(message);
// Simulate processing time
await new Promise(resolve => setTimeout(resolve, 1000));

// Manually commit offset after processing the message
await topic.commitOffset();
});

// Get the current offset
offset = await topic.$offset(-1);

// Emit the message to Kafka
await topic.emit(eventName, { value: 'value', count: 1 });

// Wait for processing to complete
await new Promise(resolve => setTimeout(resolve, 2000));

// Get the latest offset after processing
const finalOffset = await topic.$offset(-1);

// Verify that offset has been manually committed and updated accordingly
should(finalOffset).be.above(initialOffset);
});
});
});

0 comments on commit c1959a5

Please sign in to comment.