Permalink
Browse files

Implemented multiple topics

Api is changed and consumer is now
required to call "subscribeTopic(topic)" before being able to
receive messages for the given topic
  • Loading branch information...
1 parent d683e3c commit a432ed5945152a2e595a1f7d1c0db4f7ff7a5a97 Taylor Gautier committed Aug 21, 2011
Showing with 68 additions and 29 deletions.
  1. +68 −29 lib/Consumer.js
View
@@ -20,45 +20,83 @@ module.exports = std.Class(Client, function(supr) {
supr(this, '_init', arguments)
opts = std.extend(opts, defaults)
this._pollInterval = opts.pollInterval
- this._topic = opts.topic
this._partition = opts.partition
- this._offset = opts.offset
this._buffer = new Buffer(opts.maxSize)
+ this._topics = []
+ this._pendingTopicAdds = [];
+ this._pendingTopicDeletes = {};
+ this._topicsIndex = 0
+ this._remainingTopics = 0
+ this._shouldPoll = false
+
+ this.on('pollForMessages', std.bind(this, '_pollForMessages'))
}
this.connect = function() {
supr(this, 'connect', arguments)
this._connection.on('connect', std.bind(this, '_onConnect'))
return this
}
-
+
this.close = function() {
supr(this, 'close')
clearTimeout(this._timeoutID)
delete this._timeoutID
}
+ // TODO allow for optional offset to be specified
+ this.subscribeTopic = function(name) {
+ var topic = {name:name, offset:0, partition: 0}
+ this._pendingTopicAdds.push(topic)
+ if (this._topics.length == 0) {
+ this._schedulePoll()
+ this.emit('pollForMessages')
+ }
+ }
+
+ this.unsubscribeTopic = function(name) {
+ this._pendingTopicDeletes[name] = true
+ }
+
this._onConnect = function(callback) {
if (callback) { callback() }
this._connection.on('data', std.bind(this, '_onData'))
- this._pollForMessages()
}
this._pollForMessages = function() {
- delete this._timeoutID
- var encodedRequest = this._encodeFetchRequest()
- this._connection.write(encodedRequest)
+ if (this._topicsIndex < this._topics.length || !this._shouldPoll) return
+ this._shouldPoll = false
+
+ // add any pending additions
+ for (i in this._pendingTopicAdds) this._topics.push(this._pendingTopicAdds[i])
+ this._pendingTopicAdds = [];
+
+ // remove any pending deletions
+ var deletes = this._pendingTopicDeletes;
+ this._topics = this._topics.filter(function(x) { return deletes[x.name] == undefined})
+ this._pendingTopicDeletes = {}
+ if (this._topics.length == 0) {
+ this._unschedulePoll()
+ return
+ }
+
+ this._topicsIndex = 0;
+ for (i in this._topics) this._writeFetchRequest(this._topics[i])
+ }
+
+ this._writeFetchRequest = function(t) {
+ this._connection.write(this._encodeFetchRequest(t))
}
- this._encodeFetchRequest = function() {
+ this._encodeFetchRequest = function(t) {
var request = std.pack('n', this._requestType)
- + std.pack('n', this._topic.length) + this._topic
- + std.pack('N', this._partition)
+ + std.pack('n', t.name.length) + t.name
+ + std.pack('N', t.partition)
// TODO: need to store a 64bit integer (bigendian). For now, set first 32 bits to 0
- + std.pack('N2', 0, this._offset)
+ + std.pack('N2', 0, t.offset)
+ std.pack('N', this._buffer.length)
- var requestSize = 2 + 2 + this._topic.length + 4 + 8 + 4
+ var requestSize = 2 + 2 + t.name.length + 4 + 8 + 4
return this._bufferPacket(std.pack('N', requestSize) + request)
}
@@ -75,16 +113,17 @@ module.exports = std.Class(Client, function(supr) {
index += 6
}
- if (index == buf.length) {
- this._schedulePoll()
+ // check for 0 messages
+ if (this.remainingBytes == 0) {
+ this._topicsIndex++
return
}
var dataSize = buf.length - index
buf.copy(this._buffer, this._readBytes, index, index + dataSize)
this._remainingBytes -= dataSize
this._readBytes += dataSize
-
+
if (this._remainingBytes == 0) { this._parseBuffer() }
}
@@ -97,24 +136,24 @@ module.exports = std.Class(Client, function(supr) {
var payload = this._buffer.toString('utf8', index, index + payloadLength)
index += payloadLength
try {
- this.emit('message', payload)
+ this.emit('message', this._topics[this._topicsIndex].name, payload)
} catch(e) { console.log("Message handler threw", e) }
}
- this._offset += this._readBytes
- this._schedulePoll()
- }
+ this._topics[this._topicsIndex].offset += this._readBytes
+ if (++this._topicsIndex == this._topicsIndex.length) {
+ this.emit('pollForMessages')
+ }
+ }
+
this._schedulePoll = function() {
- if (this._timeoutID) { return }
- this._timeoutID = setTimeout(std.bind(this, '_pollForMessages'), this._pollInterval)
+ this._timeoutID = setTimeout(std.bind(this, '_schedulePoll'), this._pollInterval)
+ this._shouldPoll = true
+ this.emit('pollForMessages')
}
-
- this._debufferPacket = function(buf) {
- var len = buf.length,
- result = ''
- for (var i=0; i<len; i++) {
- result += String.fromCharCode(buf[i])
- }
- return result
+
+ this._unschedulePoll = function() {
+ clearTimeout(this._timeoutID)
+ this._shouldPoll = false
}
})

0 comments on commit a432ed5

Please sign in to comment.