Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'refactoring-read-write-slaves' into unstable

  • Loading branch information...
commit afe9f5a72ab0f88a2c4083905ad74a33ca93d3d2 2 parents b0adf8a + 2392535
@sreeix authored
View
3  config/config.json
@@ -8,9 +8,10 @@
, "port": 6389
}
]
+ ,"mode": "readsToSlaves"
,"listen_port": 9999
,"check_period": 5000
,"pool_size": 50
, "debug": false
- , "slave_balance": "none"
+ , "slave_balance": "roundrobin"
}
View
68 lib/redis_proxy.js
@@ -22,14 +22,25 @@ var RedisProxy = module.exports = function(o){
this._activeSlaves = [];
this.slaveIndex = 0;
- this.options = {listen_port: 6379, softErrorCount: 5, pool_size: 10};
+ this.options = {listen_port: 6379, softErrorCount: 5, pool_size: 10, mode: "allToMaster"};
_.extend(this.options, o);
- if(o.servers && o.servers.size === 0 ) {
- throw new Error("Expected to have atleast one redis to proxy");
- }
+ if(o.servers && o.servers.size === 0)
+ throw new Error("Expected to have at least one redis to proxy");
+ this.sendCommand = this[this.options.mode];
+ logger.info("Using the "+ this.options.mode +" mode.");
this.allServers = _.map(o.servers, function(server){
- return new Server(_.defaults(server, {pool_size: self.options.pool_size, softErrorCount: self.options.softErrorCount})).on('up', function() {
+ var onDown = function(){
+ if(_.isEqual(self._active.options, this.options)){
+ logger.error("Main server down PANIC");
+ logger.info("finding next active server.");
+ self.nextActive();
+ } else {
+ self._activeSlaves = _.without(self._activeSlaves, this);
+ }
+ };
+
+ var onUp = function() {
logger.debug("We have a server that went up");
if(_.isNull(self._active) && this.client.server_info.role === 'master'){
self._active = this;
@@ -41,19 +52,17 @@ var RedisProxy = module.exports = function(o){
if(self._active){
this.slave(self._active);
}
- if(! _.include(self._activeSlaves, this)) self._activeSlaves.push(this);
- }
- }).on('down', function(){
- if(_.isEqual(self._active.options, this.options)){
- logger.error("Main server down PANIC");
- logger.info("finding next active server.");
- self.nextActive();
- } else {
- self._activeSlaves = _.without(self._activeSlaves, this);
+ if(! _.include(self._activeSlaves, this)) {
+ self._activeSlaves.push(this);
+ }
}
- });
+ };
+
+ return new Server(_.defaults(server, {pool_size: self.options.pool_size, softErrorCount: self.options.softErrorCount}))
+ .on('up', onUp)
+ .on('down', onDown);
});
-};
+}
RedisProxy.prototype.readyup = function(active){
logger.debug("Creating the pool for active server"+ active.options.port);
@@ -61,11 +70,13 @@ RedisProxy.prototype.readyup = function(active){
active.setMaster();
_.each(this.allServers, function(s){
if(!_.isEqual(s, active)){
- s.slave(active);
- if(! _.include(self._activeSlaves, this)) self._activeSlaves.push(s);
+ if(!_.include(self._activeSlaves, this)){
+ s.slave(active);
+ self._activeSlaves.push(s);
+ }
}
});
-};
+}
RedisProxy.prototype.nextActive = function() {
this._active = _.find(this.allServers, function(server) {
@@ -78,13 +89,14 @@ RedisProxy.prototype.nextActive = function() {
} else {
throw new Error("Expected to have atleast one redis to proxy");
}
-};
+}
Object.defineProperty(RedisProxy.prototype, 'active', {
get: function() { return this._active;}
-});
+})
-RedisProxy.prototype.sendCommand = function(command, id, callback) {
+// balancing strategies
+RedisProxy.prototype.readsToSlaves = function(command, id, callback) {
if(redisCommand.readOnly(command)) {
logger.debug('Read only command sending to the slave');
this.nextSlave().sendCommand(command, id, callback);
@@ -92,17 +104,21 @@ RedisProxy.prototype.sendCommand = function(command, id, callback) {
logger.debug('mutating command sending to the active master');
this._active.sendCommand(command, id, callback);
}
-};
+}
+
+RedisProxy.prototype.allToMaster = function(command, id, callback) {
+ this._active.sendCommand(command, id, callback);
+}
RedisProxy.prototype.nextSlave = function() {
var slave = this._activeSlaves[this.slaveIndex];
this.slaveIndex = (this.slaveIndex + 1) % this._activeSlaves.length;
return slave;
-};
+}
RedisProxy.prototype.quit = function(id) {
return this.active.close(id);
-};
+}
RedisProxy.prototype.watch = function(){
var self = this;
@@ -112,4 +128,4 @@ RedisProxy.prototype.watch = function(){
server.ping();
});
}, this.options.check_period);
-};
+}
View
73 lib/server.js
@@ -15,6 +15,11 @@ var Server = module.exports = function(serverInfo){
this.errorCount = 0;
this.options = _.defaults(serverInfo, {softErrorCount: 5});
};
+// Following events are fired as appropriate
+// up: when the server comes up.
+// down: when an existing server goes down.
+// slave: when the server becomes a slave of another server( Needs to be fleshed)
+// master: When a server becomes master.
util.inherits(Server, EventEmitter);
@@ -25,11 +30,13 @@ Server.prototype._attachHandlers = function(client){
});
client.on('error', function(data){
logger.error("error happened " + data);
- self.incrErrorCount();
+ self._incrErrorCount();
});
client.on('end', function(data){
logger.info("end happened "+ data);
- self.connections.release(client);
+ if(self.connections){
+ self.connections.release(client);
+ }
self.down();
});
};
@@ -38,37 +45,23 @@ Server.prototype.up = function(){
if( this.status !== 'up'){
this.status = 'up';
this.errorCount = 0;
+ if(_.isNull(this.connections) || _.isEmpty(this.connections) ){
+ this.connections = this._createConnections();
+ }
this.emit('up');
}
return this;
};
Server.prototype.setMaster = function(){
- var self = this;
- this.connections = new Pool({
- create: function(cb){
- try {
- var client = redis.createClient(self.options.port, self.options.host, {max_attempts: 1});
- self._attachHandlers(client);
- return cb(null, client);
- } catch(err) {
- self.incrErrorCount();
- logger.error('Creating Connection to redis server failed with error '+ err);
- return cb(err);
- }
- }
- , maxSize: this.options.pool_size
- , startSize: this.options.pool_size
- , delayCreation: false
- });
- this.slavenone();
+ this._master();
};
Server.prototype.sendCommand = function(command, id, cb){
var self = this;
this.connections.take(id, function(err, conn){
if(err){
- self.incrErrorCount();
+ self._incrErrorCount();
return cb(err);
}
return conn.sendRaw(command, cb);
@@ -78,7 +71,8 @@ Server.prototype.sendCommand = function(command, id, cb){
Server.prototype.close = function(id){
this.connections.close(id);
};
-Server.prototype.setupControlClient = function(serverInfo){
+
+Server.prototype._setupControlClient = function(serverInfo){
try{
this.client = redis.createClient(serverInfo.port, serverInfo.host);
this._attachHandlers(this.client);
@@ -89,17 +83,16 @@ Server.prototype.setupControlClient = function(serverInfo){
}
};
-Server.prototype.slave = function(server){
- logger.info('Marking '+ this.host + ':' + this.port + ' as slave of '+ server.host+': '+ server.port);
+Server.prototype._createConnections = function(){
var self = this;
- this.connections = new Pool({
+ return new Pool({
create: function(cb){
try {
var client = redis.createClient(self.options.port, self.options.host, {max_attempts: 1});
self._attachHandlers(client);
return cb(null, client);
} catch(err) {
- self.incrErrorCount();
+ self._incrErrorCount();
logger.error('Creating Connection to redis server failed with error '+ err);
return cb(err);
}
@@ -107,21 +100,33 @@ Server.prototype.slave = function(server){
, maxSize: this.options.pool_size
, startSize: this.options.pool_size
, delayCreation: false
- })
+ });
+}
+
+Server.prototype.slave = function(server){
+ var self = this;
+ logger.info('Marking '+ this.host + ':' + this.port + ' as slave of '+ server.host+': '+ server.port);
this.client.slaveof(server.client.host, server.client.port, function(err, message){
if(err){
return logger.error(err);
}
+ self.emit('slave');
logger.debug(message);
});
};
-Server.prototype.slavenone = function (){
+Server.prototype._master = function (){
+ var self = this;
logger.info(this.options.host+":"+this.options.port+ " is slave of no one");
- this.client.slaveof('no', 'one');
+ this.client.slaveof('no', 'one', function(err, message){
+ if(err){
+ return logger.error(err);
+ }
+ return self.emit('master');
+ });
};
-Server.prototype.incrErrorCount = function(){
+Server.prototype._incrErrorCount = function(){
this.errorCount++;
if(this.errorCount > this.options.softErrorCount){
return this.down();
@@ -131,7 +136,7 @@ Server.prototype.incrErrorCount = function(){
Server.prototype.ping = function(){
var self = this;
if(!this.client){
- this.setupControlClient(this.options);
+ this._setupControlClient(this.options);
} else {
this.client.ping(function(err, data){
if(err){
@@ -151,10 +156,16 @@ Server.prototype.down = function(){
if( this.status !== 'down'){
this.status = 'down';
this.emit('down');
+ this._clearConnections();
}
return this;
};
+Server.prototype._clearConnections = function clearConnections(){
+ this.connections.closeAll();
+ this.connections = null;
+};
+
Object.defineProperty(Server.prototype, 'host', {
get: function() {
return this.options.host;
View
18 lib/slaves.js
@@ -1,18 +0,0 @@
-var _ = require('underscore');
-
-module.exports = function Slaves (){
- this.slaveIndex = 0;
- this._activeSlaves = [];
-};
-
-Slaves.prototype.up = function(item){
- if(! _.include(self._activeSlaves, item)) self._activeSlaves.push(item);
-};
-
-Slaves.prototype.down = function(item){
- self._activeSlaves = _.without(self._activeSlaves, item);
-};
-
-Slaves.prototype.empty = function(){
- return (this._activeSlaves.length === 0);
-}
Please sign in to comment.
Something went wrong with that request. Please try again.