Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: consume per partition #2

Merged
merged 6 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ deps/*
.DS_Store

.vscode
.idea
9 changes: 9 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ so if you feel something is missing feel free to send a pull request.
* [Contributor Agreement](#contributor-agreement)

[How Can I Contribute?](#how-can-i-contribute)
* [Setting up the repository](#setting-up-the-repository)
* [Reporting Bugs](#reporting-bugs)
* [Suggesting Enhancements](#suggesting-enhancements)
* [Pull Requests](#pull-requests)
Expand All @@ -37,6 +38,14 @@ Not currently required.

## How can I contribute?

### Setting up the repository

To set up the library locally, do the following:

1) Clone this repository.
2) Install librdkafka with `git submodule update --init --recursive`
3) Install the dependencies `npm install`

### Reporting Bugs

Please use __Github Issues__ to report bugs. When filling out an issue report,
Expand Down
50 changes: 26 additions & 24 deletions README.md

Large diffs are not rendered by default.

127 changes: 127 additions & 0 deletions examples/consumer-per-partition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
A consumer that is subscribed to multiple partitions can control the mix of messages consumed from each partition. How this is done is explained [here](https://github.com/confluentinc/librdkafka/wiki/FAQ#what-are-partition-queues-and-why-are-some-partitions-slower-than-others).

The example below simulates a partition 0 which is slow (2s per consume). Other partitions consume at a rate of 0.5s. To use the example, create a topic "test" with two partitions. Produce 500 message to both partitions. This example does not require an active producer. Run the example to see the result. Run multiple instances to see the rebalancing take effect.

```js
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/

var Kafka = require('../');

var consumer = new Kafka.KafkaConsumer({
//'debug': 'all',
'metadata.broker.list': 'localhost:9092',
'group.id': 'node-rdkafka-consumer-per-partition-example',
'enable.auto.commit': false,
'rebalance_cb': true,
}, {
'auto.offset.reset': 'earliest', // start from the beginning
});

var topicName = 'test';

// Keep track of which partitions are assigned.
var assignments = [];

//logging debug messages, if debug is enabled
consumer.on('event.log', function(log) {
console.log(log);
});

//logging all errors
consumer.on('event.error', function(err) {
console.error('Error from consumer');
console.error(err);
});

consumer.on('ready', function(arg) {
console.log('consumer ready: ' + JSON.stringify(arg));

consumer.subscribe([topicName]);

// Remove the default timeout so that we won't wait on each consume
consumer.setDefaultConsumeTimeout(0);

// start a regular consume loop in flowing mode. This won't result in any
// messages because will we start consuming from a partition directly.
// This is required to serve the rebalancing events
consumer.consume();
});

// Start our own consume loops for all newly assigned partitions
consumer.on('rebalance', function(err, updatedAssignments) {
console.log('rebalancing done, got partitions assigned: ', updatedAssignments.map(function(a) {
return a.partition;
}));

// find new assignments
var newAssignments = updatedAssignments.filter(function (updatedAssignment) {
return !assignments.some(function (assignment) {
return assignment.partition === updatedAssignment.partition;
});
});

// update global assignments array
assignments = updatedAssignments;

// then start consume loops for the new assignments
newAssignments.forEach(function (assignment) {
startConsumeMessages(assignment.partition);
});
});

function startConsumeMessages(partition) {
console.log('partition: ' + partition + ' starting to consume');

function consume() {
var isPartitionAssigned = assignments.some(function(assignment) {
return assignment.partition === partition;
});

if (!isPartitionAssigned) {
console.log('partition: ' + partition + ' stop consuming');
return;
}

// consume per 5 messages
consumer.consume(5, topicName, partition, callback);
}

function callback(err, messages) {
messages.forEach(function(message) {
// consume the message
console.log('partition ' + message.partition + ' value ' + message.value.toString());
consumer.commitMessage(message);
});

if (messages.length > 0) {
consumer.commitMessage(messages.pop());
}

// simulate performance
setTimeout(consume, partition === 0 ? 2000 : 500);
}

// kick-off recursive consume loop
consume();
}

consumer.on('disconnected', function(arg) {
console.log('consumer disconnected. ' + JSON.stringify(arg));
});

//starting the consumer
consumer.connect();

//stopping this example after 30s
setTimeout(function() {
consumer.disconnect();
}, 30000);

```
1 change: 1 addition & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
committed(toppars: TopicPartition[], timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;
committed(timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;

consume(number: number, topic: string, partition: number, cb?: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(number: number, cb?: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(): void;
Expand Down
74 changes: 72 additions & 2 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,20 @@ KafkaConsumer.prototype.unsubscribe = function() {
};

/**
* Read a number of messages from a specific topic and partition.
*
* Can be useful if the consume performance differs per partition. Consuming
* per partition could prevent slow performance on one partition from affecting
* the consumption of other partitions.
*
* To select the right partition it is required to set a topic param, because a
* consumer can be subscribed to multiple topics.
*
* @param {number} size - Number of messages to read
* @param {string} topic - Name of topic to read
* @param {number} partition - Identifier of partition to read
* @param {KafkaConsumer~readCallback} cb - Callback to return when work is done.
*//**
* Read a number of messages from Kafka.
*
* This method is similar to the main one, except that it reads a number
Expand All @@ -384,11 +398,20 @@ KafkaConsumer.prototype.unsubscribe = function() {
* @param {KafkaConsumer~readCallback} cb - Callback to return when a message
* is fetched.
*/
KafkaConsumer.prototype.consume = function(number, cb) {
KafkaConsumer.prototype.consume = function(number, topic, partition, cb) {
var timeoutMs = this._consumeTimeout !== undefined ? this._consumeTimeout : DEFAULT_CONSUME_TIME_OUT;
var self = this;

if ((number && typeof number === 'number') || (number && cb)) {
if ((number && typeof number === 'number') && typeof topic === 'string' && typeof partition === 'number') {

if (cb === undefined) {
cb = function() {};
} else if (typeof cb !== 'function') {
throw new TypeError('Callback must be a function');
}

this._consumeNumOfPartition(timeoutMs, number, topic, partition, cb);
} else if ((number && typeof number === 'number') || (number && cb)) {

if (cb === undefined) {
cb = function() {};
Expand Down Expand Up @@ -499,6 +522,53 @@ KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) {

};

/**
* Consume a number of messages from a specific topic and partition
* Wrapped in a try catch with proper error reporting. Should not be
* called directly, and instead should be called using consume.
*
* @private
* @see consume
*/
KafkaConsumer.prototype._consumeNumOfPartition = function(timeoutMs, numMessages, topic, partition, cb) {
var self = this;

this._client.consume(timeoutMs, numMessages, topic, partition, function(err, messages, eofEvents) {
if (err) {
err = LibrdKafkaError.create(err);
if (cb) {
cb(err);
}
return;
}

var currentEofEventsIndex = 0;

function emitEofEventsFor(messageIndex) {
while (currentEofEventsIndex < eofEvents.length && eofEvents[currentEofEventsIndex].messageIndex === messageIndex) {
delete eofEvents[currentEofEventsIndex].messageIndex;
self.emit('partition.eof', eofEvents[currentEofEventsIndex])
++currentEofEventsIndex;
}
}

emitEofEventsFor(-1);

for (var i = 0; i < messages.length; i++) {
self.emit('data', messages[i]);
emitEofEventsFor(i);
}

emitEofEventsFor(messages.length);

if (cb) {
cb(null, messages);
}

});

};

/**
* This callback returns the message read from Kafka.
*
Expand Down
Loading