Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

removed async.queue to fix memory leak, more optimizing

  • Loading branch information...
commit c6fc0be42610f85065f406c470cf504bfd746f46 1 parent e5b5991
Nathan LaFreniere authored
Showing with 72 additions and 31 deletions.
  1. +1 −0  .jshintignore
  2. +26 −0 .jshintrc
  3. +44 −29 index.js
  4. +1 −2  package.json
1  .jshintignore
View
@@ -0,0 +1 @@
+node_modules
26 .jshintrc
View
@@ -0,0 +1,26 @@
+{
+ "asi": false,
+ "expr": true,
+ "loopfunc": true,
+ "curly": false,
+ "evil": true,
+ "white": true,
+ "undef": true,
+ "browser": true,
+ "es5": true,
+ "predef": [
+ "app",
+ "$",
+ "FormBot",
+ "socket",
+ "confirm",
+ "alert",
+ "require",
+ "__dirname",
+ "process",
+ "exports",
+ "console",
+ "Buffer",
+ "module"
+ ]
+}
73 index.js
View
@@ -1,6 +1,5 @@
var net = require('net'),
protobuf = require('protobuf.js'),
- async = require('async'),
butils = require('butils');
var messageCodes = {
@@ -49,33 +48,46 @@ function RiakPBC(options) {
self.connected = false;
self.client.on('end', self.disconnect);
self.client.on('error', self.disconnect);
- self.queue = async.queue(function (task, callback) {
- var mc, reply = {};
- var checkReply = function (chunk) {
- splitPacket(chunk).forEach(function (packet) {
- mc = messageCodes['' + packet[0]];
- reply = _merge(reply, self.translator.decode(mc, packet.slice(1)));
- if (!task.expectMultiple || reply.done || mc === 'RpbErrorResp') {
- self.client.removeListener('data', checkReply);
- task.callback(reply);
- callback();
- }
- });
- }
- self.client.on('data', checkReply);
- self.client.write(task.message);
- }, 1);
+ self.paused = false;
+ self.queue = [];
+ var mc, reply = {};
function splitPacket(pkt) {
- var ret = [];
- while (pkt.length > 0) {
- var len = pkt.readUInt32BE(0);
- ret.push(pkt.slice(4, len + 4));
- pkt = pkt.slice(len + 4);
+ var ret = [], len, pos = 0, end = pkt.length;
+ while (pos < end) {
+ len = butils.readInt32(pkt, pos);
+ ret.push(pkt.slice(pos + 4, pos + len + 4));
+ pos += len + 4;
}
return ret;
}
-};
+
+ self.client.on('data', function (chunk) {
+ splitPacket(chunk).forEach(function (packet) {
+ mc = messageCodes['' + packet[0]];
+ reply = _merge(reply, self.translator.decode(mc, packet.slice(1)));
+ if (!self.task.expectMultiple || reply.done || mc === 'RpbErrorResp') {
+ self.task.callback(reply);
+ mc = undefined;
+ self.task = undefined;
+ reply = {};
+ self.processNext();
+ }
+ });
+ });
+
+ self.processNext = function () {
+ if (self.queue.length > 0) {
+ self.connect(function () {
+ self.task = self.queue.shift();
+ self.client.write(self.task.message);
+ self.paused = true;
+ });
+ } else {
+ self.paused = false;
+ }
+ };
+}
function _merge(obj1, obj2) {
var obj = {};
@@ -91,7 +103,7 @@ function _merge(obj1, obj2) {
});
});
return obj;
-};
+}
RiakPBC.prototype.makeRequest = function (type, data, callback, expectMultiple) {
var self = this,
@@ -102,9 +114,8 @@ RiakPBC.prototype.makeRequest = function (type, data, callback, expectMultiple)
butils.writeInt32(message, buffer.length + 1);
butils.writeInt(message, messageCodes[type], 4);
message = message.concat(buffer);
- this.connect(function () {
- self.queue.push({ message: new Buffer(message), callback: callback, expectMultiple: expectMultiple });
- });
+ self.queue.push({ message: new Buffer(message), callback: callback, expectMultiple: expectMultiple });
+ if (!self.paused) self.processNext();
};
RiakPBC.prototype.getBuckets = function (callback) {
@@ -166,7 +177,7 @@ RiakPBC.prototype.ping = function (callback) {
RiakPBC.prototype.connect = function (callback) {
if (this.connected) return callback();
var self = this;
- self.client = net.connect(self.port, self.host, function () {
+ self.client.connect(self.port, self.host, function () {
self.connected = true;
callback();
});
@@ -174,8 +185,12 @@ RiakPBC.prototype.connect = function (callback) {
RiakPBC.prototype.disconnect = function () {
if (!this.connected) return;
- this.connected = false;
this.client.end();
+ this.connected = false;
+ if (this.task) {
+ this.queue.unshift(this.task);
+ this.task = undefined;
+ }
};
exports.createClient = function (options) {
3  package.json
View
@@ -1,10 +1,9 @@
{
"name": "riakpbc",
- "version": "0.0.1",
+ "version": "0.0.2",
"description": "a very rough and basic riak protobuf client",
"main": "index.js",
"dependencies": {
- "async": "~0.1.22",
"protobuf.js": "",
"butils": ""
},
Please sign in to comment.
Something went wrong with that request. Please try again.