Skip to content

Commit

Permalink
Managing all fail overs correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
sreeix committed Feb 26, 2012
1 parent b45781d commit 7ac73b1
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 21 deletions.
2 changes: 1 addition & 1 deletion config/config.json
Expand Up @@ -9,5 +9,5 @@
} }
] ]
,"listen_port": 9999 ,"listen_port": 9999
,"check_period": 5 ,"check_period": 5000
} }
55 changes: 39 additions & 16 deletions lib/redis_proxy.js
Expand Up @@ -23,23 +23,49 @@ var RedisProxy = module.exports = function(o){
} }


this.allServers = _.map(o.servers, function(server){ this.allServers = _.map(o.servers, function(server){
var s =new Server(server); return new Server(server).on('up', function() {
s.on('up', function(){
console.log("We have a server that went up"); console.log("We have a server that went up");
console.log(this); if(_.isNull(self._active) || _.isUndefined(self._active)){
}); self._active = this;
s.on('down', function(){ console.log("setting up the active "+ self._active.options.port);
console.log("We have a server that went DOWN"); self.readyup(this);
console.log(this); }
}).on('down', function(){
if(_.isEqual(self._active.options, this.options)){
console.log("Main server down PANIC");
console.log("finding next active server.");
self.nextActive();
}
}); });
}); });
}; };

RedisProxy.prototype.readyup = function(server){ RedisProxy.prototype.readyup = function(server){
console.log("Creating the pool for active server"+ server.options.host); console.log("Creating the pool for active server"+ server.options.port);
this.connections = new Pool({create: function(){ this.connections = new Pool({create: function(){
return redis.createClient(server.options.port, server.options.host); try {
var client = redis.createClient(server.options.port, server.options.host, {max_attempts: 1});
client.on('error', redis.print);
} catch(err) {
console.log(err);
console.log(">>>>>>>>>>>>>>>");
}
}}); }});
}; };

RedisProxy.prototype.nextActive = function() {
this._active = _.find(this.allServers, function(server) {
return server.isUp()
});

if(this._active){
this.readyup(this._active);
console.log("Setting up as active "+ this._active.options.port);
} else {
throw new Error("Expected to have atleast one redis to proxy");
}
}

RedisProxy.prototype.active = function(command, callback) { RedisProxy.prototype.active = function(command, callback) {
return this._active; return this._active;
} }
Expand All @@ -48,16 +74,13 @@ RedisProxy.prototype.sendCommand = function(command, callback) {
return this.connections.take().sendRaw(command, callback); return this.connections.take().sendRaw(command, callback);
}; };


RedisProxy.prototype.findByClient = function(client){
return _.find(this.allServers, function(server){
return (server.client.port == client.port && server.client.host === client.host);
});
};

RedisProxy.prototype.watch = function(){ RedisProxy.prototype.watch = function(){
var self = this; var self = this;
setInterval(function(){ setInterval(function(){
_.each(this.allServers, function(server){ console.log("Timeout.....");
_.each(self.allServers, function(server){
console.log("Pinging ");
console.log(server.options.port);
server.ping(); server.ping();
}); });
}, this.options.check_period); }, this.options.check_period);
Expand Down
13 changes: 10 additions & 3 deletions lib/server.js
Expand Up @@ -22,6 +22,8 @@ Server.prototype.setupClient = function(serverInfo){
self.up(); self.up();
}); });
this.client.on('error', function(data){ this.client.on('error', function(data){
console.log("EEEEEEEERRRRRRRRRRRRROOOOOOOOOORRRRRRRR");
console.log(data);
self.down(); self.down();
}); });


Expand All @@ -30,6 +32,7 @@ Server.prototype.setupClient = function(serverInfo){
self.down(); self.down();
}); });
} catch(err) { } catch(err) {
console.log("-------------");
console.log(err); console.log(err);
self.down(); self.down();
// Its ok... we will bring this guy up some point in time // Its ok... we will bring this guy up some point in time
Expand All @@ -49,13 +52,17 @@ Server.prototype.ping = function(){
if(!this.client){ if(!this.client){
this.setupClient(this.options); this.setupClient(this.options);
} else { } else {
this.client.ping(function(){ this.client.ping(function(err, data){
console.log("xxxxxxxxxx"); if(err){
return self.down();
}
self.up(); self.up();
}); });
} }
}; };

Server.prototype.isUp = function(){
return this.status === 'up';
}
Server.prototype.down = function(){ Server.prototype.down = function(){
this.status = 'down'; this.status = 'down';
this.emit('down'); this.emit('down');
Expand Down
2 changes: 1 addition & 1 deletion server.js
Expand Up @@ -26,6 +26,6 @@ var server = net.createServer(function (socket) {
}); });
}); });
}); });

redis_proxy.watch();
server.listen(config.listen_port, "127.0.0.1"); server.listen(config.listen_port, "127.0.0.1");
console.log("server is listening on 127.0.0.1:"+ config.listen_port); console.log("server is listening on 127.0.0.1:"+ config.listen_port);

0 comments on commit 7ac73b1

Please sign in to comment.