Permalink
Browse files

Supporting read write fork on the connection pool

  • Loading branch information...
1 parent 77af298 commit fd4d2e0c620550dbda0d0976299650639ae684e2 @sreeix committed Jun 5, 2012
Showing with 80 additions and 31 deletions.
  1. +1 −0 config/config.json
  2. +4 −0 lib/redis_command.js
  3. +8 −3 lib/redis_proxy.js
  4. +49 −28 lib/server.js
  5. +18 −0 lib/slaves.js
View
@@ -12,4 +12,5 @@
,"check_period": 5000
,"pool_size": 50
, "debug": false
+ , "slave_balance": "none"
}
View
@@ -6,9 +6,13 @@ var readOnlyCommands = ['smembers', 'hlen', 'hmget', 'srandmember', 'hvals', 'ra
'zcount', 'exists', 'sdiff', 'zrange', 'mget', 'zrank', 'get', 'getbit', 'getrange',
'zrevrange', 'zrevrangebyscore', 'hexists', 'object', 'sinter', 'zrevrank', 'hget',
'zscore', 'hgetall', 'sismember'];
+var connectionCommands = ['auth', 'select'];
module.exports = redisCommand = {
readOnly: function(command){
return _.include(readOnlyCommands, command.split("\r\n")[2]);
}
+ , connection: function(command){
+ return _.include(connectionCommands, command.split("\r\n")[2]);
+ }
};
View
@@ -59,7 +59,7 @@ RedisProxy.prototype.readyup = function(active){
logger.debug("Creating the pool for active server"+ active.options.port);
var self = this;
active.setMaster();
- _.each(this.allServers, function (s){
+ _.each(this.allServers, function(s){
if(!_.isEqual(s, active)){
s.slave(active);
if(! _.include(self._activeSlaves, this)) self._activeSlaves.push(s);
@@ -87,14 +87,19 @@ Object.defineProperty(RedisProxy.prototype, 'active', {
RedisProxy.prototype.sendCommand = function(command, id, callback) {
if(redisCommand.readOnly(command)) {
logger.debug('Read only command sending to the slave');
- this._activeSlaves[this.slaveIndex].sendCommand(command, id, callback);
- this.slaveIndex = (this.slaveIndex + 1) % this._activeSlaves.length;
+ this.nextSlave().sendCommand(command, id, callback);
} else {
logger.debug('mutating command sending to the active master');
this._active.sendCommand(command, id, callback);
}
};
+RedisProxy.prototype.nextSlave = function() {
+ var slave = this._activeSlaves[this.slaveIndex];
+ this.slaveIndex = (this.slaveIndex + 1) % this._activeSlaves.length;
+ return slave;
+};
+
RedisProxy.prototype.quit = function(id) {
return this.active.close(id);
};
View
@@ -14,23 +14,10 @@ var Server = module.exports = function(serverInfo){
this.status = 'down';
this.errorCount = 0;
this.options = _.defaults(serverInfo, {softErrorCount: 5});
- this.client = null;
- this.setupControlClient(serverInfo);
};
util.inherits(Server, EventEmitter);
-Server.prototype.setupControlClient = function(serverInfo){
- try{
- this.client = redis.createClient(serverInfo.port, serverInfo.host);
- this._attachHandlers(this.client);
- } catch(err) {
- logger.error(err);
- this.down();
- // Its ok... we will bring this guy up some point in time
- }
-};
-
Server.prototype._attachHandlers = function(client){
var self = this;
client.on('ready', function(data){
@@ -79,26 +66,48 @@ Server.prototype.setMaster = function(){
Server.prototype.sendCommand = function(command, id, cb){
var self = this;
- // Temporary hack. Could be bad for performance
- if(!this.connections){
- return this.client.sendRaw(command, cb);
- } else {
- this.connections.take(id, function(err, conn){
- if(err){
- self.incrErrorCount();
- return cb(err);
- }
- return conn.sendRaw(command, cb);
- });
- }
+ this.connections.take(id, function(err, conn){
+ if(err){
+ self.incrErrorCount();
+ return cb(err);
+ }
+ return conn.sendRaw(command, cb);
+ });
};
Server.prototype.close = function(id){
this.connections.close(id);
};
+Server.prototype.setupControlClient = function(serverInfo){
+ try{
+ this.client = redis.createClient(serverInfo.port, serverInfo.host);
+ this._attachHandlers(this.client);
+ } catch(err) {
+ logger.error(err);
+ this.down();
+ // Its ok... we will bring this guy up some point in time
+ }
+};
Server.prototype.slave = function(server){
- logger.info('Marking '+ this.client.host + ':' + this.client.port + ' as slave of '+ server.client.host+': '+ server.client.port);
+ logger.info('Marking '+ this.host + ':' + this.port + ' as slave of '+ server.host+': '+ server.port);
+ var self = this;
+ this.connections = new Pool({
+ create: function(cb){
+ try {
+ var client = redis.createClient(self.options.port, self.options.host, {max_attempts: 1});
+ self._attachHandlers(client);
+ return cb(null, client);
+ } catch(err) {
+ self.incrErrorCount();
+ logger.error('Creating Connection to redis server failed with error '+ err);
+ return cb(err);
+ }
+ }
+ , maxSize: this.options.pool_size
+ , startSize: this.options.pool_size
+ , delayCreation: false
+ })
this.client.slaveof(server.client.host, server.client.port, function(err, message){
if(err){
return logger.error(err);
@@ -110,7 +119,7 @@ Server.prototype.slave = function(server){
Server.prototype.slavenone = function (){
logger.info(this.options.host+":"+this.options.port+ " is slave of no one");
this.client.slaveof('no', 'one');
-};
+};
Server.prototype.incrErrorCount = function(){
this.errorCount++;
@@ -144,4 +153,16 @@ Server.prototype.down = function(){
this.emit('down');
}
return this;
-};
+};
+
+Object.defineProperty(Server.prototype, 'host', {
+ get: function() {
+ return this.options.host;
+ }
+});
+
+Object.defineProperty(Server.prototype, 'port', {
+ get: function() {
+ return this.options.port;
+ }
+});
View
@@ -0,0 +1,18 @@
+var _ = require('underscore');
+
+module.exports = function Slaves (){
+ this.slaveIndex = 0;
+ this._activeSlaves = [];
+};
+
+Slaves.prototype.up = function(item){
+ if(! _.include(self._activeSlaves, item)) self._activeSlaves.push(item);
+};
+
+Slaves.prototype.down = function(item){
+ self._activeSlaves = _.without(self._activeSlaves, item);
+};
+
+Slaves.prototype.empty = function(){
+ return (this._activeSlaves.length === 0);
+}

0 comments on commit fd4d2e0

Please sign in to comment.