Skip to content

Commit

Permalink
Merge pull request #83 from dterei/master
Browse files Browse the repository at this point in the history
Support for prepend, append, touch + cleaning
  • Loading branch information
dterei committed Apr 30, 2016
2 parents 8b06961 + 13de9b5 commit 4edaa5d
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 40 deletions.
2 changes: 0 additions & 2 deletions README.md
Expand Up @@ -131,9 +131,7 @@ can also implement a feature not a list if you think it would be good.
### TODOS ###

* Support flags
* Support prepend, append
* Support multi commands
* Support touch
* Support CAS
* Consistent hashing for keys and/or pluggable hashing algorithm

Expand Down
13 changes: 7 additions & 6 deletions lib/memjs/header.js
@@ -1,4 +1,7 @@
var fromBuffer = function(headerBuf) {
// # MemJS Memcache binary protocol header

// fromBuffer converts a serialized header to a JS object.
exports.fromBuffer = function(headerBuf) {
if (!headerBuf) {
return {};
}
Expand All @@ -15,7 +18,8 @@ var fromBuffer = function(headerBuf) {
};
};

var toBuffer = function(header) {
// toBuffer converts a JS memcache header object to a binary memcache header
exports.toBuffer = function(header) {
var headerBuf = new Buffer(24);
headerBuf.fill();
headerBuf.writeUInt8(header.magic, 0);
Expand All @@ -29,11 +33,8 @@ var toBuffer = function(header) {
if (header.cas) {
header.cas.copy(headerBuf, 16);
} else {
headerBuf.fill('\0', 16);
headerBuf.fill('\x00', 16);
}
return headerBuf;
};

exports.fromBuffer = fromBuffer;
exports.toBuffer = toBuffer;

112 changes: 107 additions & 5 deletions lib/memjs/memjs.js
Expand Up @@ -320,6 +320,103 @@ Client.prototype.decrement = function(key, amount, callback, expires, initial) {
});
};

// APPEND
//
// Append the given _value_ to the value associated with the given _key_ in
// memcache. The operation only succeeds if the key is already present. The
// callback signature is:
//
// callback(err, success)
Client.prototype.append = function(key, value, callback) {
this.seq++;
var request = makeRequestBuffer(0x0E, key, '', value, this.seq);

var logger = this.options.logger;
this.perform(key, request, function(err, response) {
if (err) {
if (callback) { callback(err, null); }
return;
}
switch (response.header.status) {
case 0:
if (callback) { callback(null, true); }
break;
case 1:
if (callback) { callback(null, false); }
break;
default:
var errorMessage = 'MemJS APPEND: ' + errors[response.header.status];
logger.log(errorMessage);
if (callback) { callback(new Error(errorMessage), null); }
}
});
};

// PREPEND
//
// Prepend the given _value_ to the value associated with the given _key_ in
// memcache. The operation only succeeds if the key is already present. The
// callback signature is:
//
// callback(err, success)
Client.prototype.prepend = function(key, value, callback) {
this.seq++;
var request = makeRequestBuffer(0x0E, key, '', value, this.seq);

var logger = this.options.logger;
this.perform(key, request, function(err, response) {
if (err) {
if (callback) { callback(err, null); }
return;
}
switch (response.header.status) {
case 0:
if (callback) { callback(null, true); }
break;
case 1:
if (callback) { callback(null, false); }
break;
default:
var errorMessage = 'MemJS PREPEND: ' + errors[response.header.status];
logger.log(errorMessage);
if (callback) { callback(new Error(errorMessage), null); }
}
});
};

// TOUCH
//
// Touch sets an expiration value, given by _expires_, on the given _key_ in
// memcache. The operation only succeeds if the key is already present. The
// callback signature is:
//
// callback(err, success)
Client.prototype.touch = function(key, expires, callback) {
this.seq++;
var extras = makeExpiration(expires || this.options.expires);
var request = makeRequestBuffer(0x1C, key, extras, '', this.seq);

var logger = this.options.logger;
this.perform(key, request, function(err, response) {
if (err) {
if (callback) { callback(err, null); }
return;
}
switch (response.header.status) {
case 0:
if (callback) { callback(null, true); }
break;
case 1:
if (callback) { callback(null, false); }
break;
default:
var errorMessage = 'MemJS TOUCH: ' + errors[response.header.status];
logger.log(errorMessage);
if (callback) { callback(new Error(errorMessage), null); }
}
});
};

// FLUSH
//
// Flushes the cache on each connected server. The callback signature is:
Expand All @@ -335,6 +432,7 @@ Client.prototype.flush = function(callback) {
var count = this.servers.length;
var result = {};
var lastErr = null;
var i;

var handleFlush = function(seq, serv) {
serv.onResponse(seq, function(/* response */) {
Expand All @@ -355,7 +453,7 @@ Client.prototype.flush = function(callback) {
serv.write(seq, request);
};

for (var i in this.servers) {
for (i = 0; i < this.servers.length; i++) {
handleFlush(this.seq, this.servers[i]);
}
};
Expand All @@ -373,6 +471,7 @@ Client.prototype.statsWithKey = function(key, callback) {
this.seq++;
var request = makeRequestBuffer(0x10, key, '', '', this.seq);
var logger = this.options.logger;
var i;

var handleStats = function(seq, serv) {
var result = {};
Expand Down Expand Up @@ -405,7 +504,7 @@ Client.prototype.statsWithKey = function(key, callback) {
serv.write(seq, request);
};

for (var i in this.servers) {
for (i = 0; i < this.servers.length; i++) {
handleStats(this.seq, this.servers[i]);
}
};
Expand Down Expand Up @@ -447,6 +546,8 @@ Client.prototype.quit = function() {
// TODO: Nicer perhaps to do QUITQ (0x17) but need a new callback for when
// write is done.
var request = makeRequestBuffer(0x07, '', '', '', this.seq); // QUIT
var serv;
var i;

var handleQuit = function(seq, serv) {
serv.onResponse(seq, function(/* response */) {
Expand All @@ -458,8 +559,8 @@ Client.prototype.quit = function() {
serv.write(seq, request);
};

for (var i in this.servers) {
var serv = this.servers[i];
for (i = 0; i < this.servers.length; i++) {
serv = this.servers[i];
handleQuit(this.seq, serv);
}
};
Expand All @@ -468,7 +569,8 @@ Client.prototype.quit = function() {
//
// Closes (abruptly) connections to all the servers.
Client.prototype.close = function() {
for (var i in this.servers) {
var i;
for (i = 0; i < this.servers.length; i++) {
this.servers[i].close();
}
};
Expand Down
2 changes: 2 additions & 0 deletions lib/memjs/protocol.js
@@ -1,3 +1,5 @@
// # MemJS Memcache binary protocol errors

exports.errors = {};
exports.errors[0x0000] = 'No error';
exports.errors[0x0001] = 'Key not found';
Expand Down
2 changes: 1 addition & 1 deletion lib/memjs/server.js
Expand Up @@ -73,7 +73,7 @@ Server.prototype.listSasl = function() {
};

Server.prototype.saslAuth = function() {
var authStr = '\0' + this.username + '\0' + this.password;
var authStr = '\x00' + this.username + '\x00' + this.password;
var buf = makeRequestBuffer(0x21, 'PLAIN', '', authStr);
this.writeSASL(buf);
};
Expand Down
12 changes: 9 additions & 3 deletions lib/memjs/utils.js
@@ -1,3 +1,5 @@
// # MemJS utility functions

var header = require('./header');

var bufferify = function(val) {
Expand Down Expand Up @@ -54,13 +56,17 @@ exports.parseMessage = function(dataBuf) {
return false;
}
var responseHeader = header.fromBuffer(dataBuf);
if (dataBuf.length < responseHeader.totalBodyLength + 24 || responseHeader.totalBodyLength < responseHeader.keyLength + responseHeader.extrasLength) {
if (dataBuf.length < responseHeader.totalBodyLength + 24 ||
responseHeader.totalBodyLength <
responseHeader.keyLength + responseHeader.extrasLength) {
return false;
}

var pointer = 24;
var extras = dataBuf.slice(pointer, (pointer += responseHeader.extrasLength));
var key = dataBuf.slice(pointer, (pointer += responseHeader.keyLength));
var extras = dataBuf.slice(pointer, pointer + responseHeader.extrasLength);
pointer += responseHeader.extrasLength;
var key = dataBuf.slice(pointer, pointer + responseHeader.keyLength);
pointer += responseHeader.keyLength;
var val = dataBuf.slice(pointer, 24 + responseHeader.totalBodyLength);

return {header: responseHeader, key: key, extras: extras, val: val};
Expand Down

0 comments on commit 4edaa5d

Please sign in to comment.