Skip to content

Commit

Permalink
Subscribe now works with assigned offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
ottomata committed Aug 23, 2016
1 parent 0331fee commit 47ddc14
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 8 deletions.
4 changes: 1 addition & 3 deletions README.md
Expand Up @@ -79,11 +79,9 @@ socket.disconnect();

## TODO

- Use blizzard's flowing consumer for start() interface
- use generic config properties as Kasocki constructor arg
- websocket tests, integration tests.
- return topic, partition offset outside of event/message?
- figure out proper logging
- figure out error responses
- figure out kafka group non assigment
- docker with kafka

Expand Down
42 changes: 42 additions & 0 deletions consume.js
Expand Up @@ -174,4 +174,46 @@ var tests = {
.delay(2000).then(emit.bind(null, 'disconnect'));
},

subscribeAllowed: function() {
emit('subscribe', ['test']);
emit('subscribe', ['test', 'test4']);
},

subscribeNotAllowed: function() {
emit('subscribe', ['test', 'nopeee']);
},

assignAllowed: function() {
emit('subscribe', [{'topic': 'test4', 'partition': 0, 'offset': 33326 }]);
},

assignNotAllowed: function() {
emit('subscribe', [{'topic': 'nopers', 'partition': 0, 'offset': 33326 }]);
},

assign: function() {
var assignments = [{'topic': 'test4', 'partition': 0, 'offset': 33326 }];

emit('subscribe',assignments);
emit('start');
},

assignEarliest: function() {
emit('subscribe', [{'topic': 'test4', 'partition': 0, offset: 'earliest'}]);
emit('start');
},

assignNoOffset: function() {
emit('subscribe', [{'topic': 'test4', 'partition': 0}]);
emit('start');
},

assignMulti: function() {
emit('subscribe', [
{'topic': 'test4', 'partition': 0, offset: 33326},
{'topic': 'test', 'partition': 0, offset: 1027270},
]);
emit('start');
},

}
47 changes: 44 additions & 3 deletions lib/Kasocki.js
Expand Up @@ -36,6 +36,7 @@ class Kasocki {
constructor(
socket,
kafkaConfig,
allowedTopics,
logger
) {
this.socket = socket;
Expand All @@ -45,6 +46,8 @@ class Kasocki {
this.running = false;
this.closing = false;

this.allowedTopics = allowedTopics;

var bunyanConfig = {
'name': 'kasocki',
'socket': this.name,
Expand Down Expand Up @@ -239,7 +242,7 @@ class Kasocki {
* the previously subscribed topics.
* TODO: documentation about regexes. See:
* https://github.com/edenhill/librdkafka/blob/master/src-cpp/rdkafkacpp.h#L1212
* // TODO: provide starting offsets, maybe offset reset.
* TODO: if we configure allowedTopics, maybe topic regexes shouldn't be allowed.
* @param {Array} topics
*/
on_subscribe(topics) {
Expand All @@ -249,10 +252,48 @@ class Kasocki {
}

this.kafkaConsumer.unsubscribe();
this.log.info('Subscribing to topics.', {'topics': topics})
this.kafkaConsumer.subscribe(topics);

if (!topics || topics.constructor.name != 'Array') {
throw new Error(
'Must provide either an array topic names, or ' +
' an array of objects with topic and partition set.'
);
}

// If we are given an array of objects, assume we are trying to assign
// at a particular offset. Note that this does not check that the topic
// partition assignment makes any sense. E.g. it is possible to
// subscribe to non existent topic-partition this way. In that case,
// nothing will happen.
// TODO: Better validation for topics
if (topics[0].constructor.name == 'Object') {
this._checkTopicsAllowed(topics.map(e => e.topic))
this.log.info({topics: topics}, 'Subscribing to topics, starting at assigned partition offsets.')
this.kafkaConsumer.assign(topics);
}
else {
this._checkTopicsAllowed(topics);
this.log.info({topics: topics}, 'Subscribing to topics, starting at latest in each partition.');
this.kafkaConsumer.subscribe(topics);
}
}

/**
* Throws an Error if this.allowedTopics is configured and
* any of the topics are not in this list of allowed topics.
*
* @param {Array} topics
* @throws Error if any of the topics are not allowed.
*/
_checkTopicsAllowed(topics) {
if (this.allowedTopics) {
topics.map((topic) => {
if (this.allowedTopics.indexOf(topic) < 0) {
throw new Error(`Topic '${topic}' is not available for consumption.`);
}
});
}
}

/**
* Iterates through filters. If any filter looks like
Expand Down
3 changes: 1 addition & 2 deletions server.js
Expand Up @@ -21,10 +21,9 @@ class KasockiServer {
// You could alternatively pass a socket.io namespace.
console.log(socket.id + ' connected');
// Kafka broker should be running at localhost:9092
this.kasocki = new Kasocki(socket);
this.kasocki = new Kasocki(socket, {}, ['test', 'test4']);
});


}

listen() {
Expand Down

0 comments on commit 47ddc14

Please sign in to comment.