Permalink
Browse files

Clusters: implemented cache reseting in nodes.

  • Loading branch information...
1 parent 6bb5395 commit f1f8932710d1e23ba00911431a56e97bc814af03 Pedro Franceschi committed Jan 9, 2011
Showing with 81 additions and 73 deletions.
  1. +1 −0 README.markdown
  2. +13 −0 controllers/admin.js
  3. +2 −2 controllers/pages.js
  4. +6 −6 controllers/posts.js
  5. +1 −1 servers/cluster_config.json
  6. +39 −54 servers/cluster_server.js
  7. +19 −10 servers/server.js
View
@@ -19,6 +19,7 @@ TODO
* Refactor with Tests
* Add other databases
* Cluster data balance
+* Wordpress: import tags too.
* Check for what plugins we should run for the request
* Enable/disable plugins
* Make everything RESTful
View
@@ -108,6 +108,8 @@ exports.createPost = function(req, res) {
posts.createPost(req.param('title'), req.param('body'), req.session.user_id, 0, function(postId) {
tags.createTagsForPost(postId, tagsArr, function(){
+ global.sendCommandToClusterServer("DESTROY_CACHE_POSTS");
+ global.sendCommandToClusterServer("DESTROY_CACHE_TAGS");
postsController.destroyCache();
postsController.updateTagsCache();
return res.redirect('/admin/posts/' + postId);
@@ -136,6 +138,8 @@ exports.updatePost = function(req, res) {
posts.updatePost(req.param('id'), req.param('title'), req.param('body'), function() {
tags.deletePostTags(req.param('id'), function(){
tags.createTagsForPost(req.param('id'), tagsArr, function(){
+ global.sendCommandToClusterServer("DESTROY_CACHE_POSTS");
+ global.sendCommandToClusterServer("DESTROY_CACHE_TAGS");
postsController.destroyCache();
postsController.updateTagsCache();
return res.redirect('/admin/posts/' + req.param('id'));
@@ -152,6 +156,8 @@ exports.destroyPost = function(req, res) {
}
posts.destroyPost(req.param('id'), function () {
tags.deletePostTags(req.param('id'), function(){
+ global.sendCommandToClusterServer("DESTROY_CACHE_POSTS");
+ global.sendCommandToClusterServer("DESTROY_CACHE_TAGS");
postsController.destroyCache();
postsController.updateTagsCache();
return res.redirect('/admin/posts/')
@@ -454,6 +460,10 @@ exports.saveImport = function(req, res) {
if(!req.param('xml')) {
return res.send("Missing parameters.");
}
+
+ global.sendCommandToClusterServer("DESTROY_CACHE_POSTS");
+ global.sendCommandToClusterServer("DESTROY_CACHE_TAGS");
+
postsController.destroyCache();
importer.importXMLDump(req.param('xml'), function(){
return res.redirect("/admin/posts");
@@ -508,6 +518,7 @@ exports.createPage = function(req, res) {
return res.redirect("/admin/pages/new");
}
pages.createPage(req.param('title'), req.param('body'), req.session.user_id, 0, function(postId) {
+ global.sendCommandToClusterServer("DESTROY_CACHE_PAGES");
pagesController.updatePagesCache();
return res.redirect('/admin/pages/' + postId);
});
@@ -518,6 +529,7 @@ exports.updatePage = function(req, res) {
return res.redirect("/admin/pages/new");
}
pages.updatePage(req.param('id'), req.param('title'), req.param('body'), function() {
+ global.sendCommandToClusterServer("DESTROY_CACHE_PAGES");
pagesController.updatePagesCache();
return res.redirect('/admin/pages/' + req.param('id'));
});
@@ -529,6 +541,7 @@ exports.destroyPage = function(req, res) {
}
pages.destroyPage(req.param('id'), function () {
pagesController.updatePagesCache();
+ global.sendCommandToClusterServer("DESTROY_CACHE_PAGES");
return res.redirect('/admin/pages/')
});
};
@@ -6,12 +6,12 @@ var pagesCache;
exports.updatePagesCache = function() {
pages.getPages(function(pages){
- pagesCache = pages;
+ global.pagesCache = pages;
});
}
exports.getPagesCache = function() {
- return pagesCache;
+ return global.pagesCache;
}
// exports.index = function(req, res) {
View
@@ -17,12 +17,12 @@ var tagsCache;
exports.updateTagsCache = function() {
tags.getAllTags(function(tags){
- tagsCache = tags;
+ global.tagsCache = tags;
});
}
exports.getTagsCache = function() {
- return tagsCache;
+ return global.tagsCache;
}
exports.index = function(req, res){
@@ -45,18 +45,18 @@ exports.index = function(req, res){
// cache functions
exports._getPostsUsingCache = function(callback) {
- if(postsCache == undefined) {
+ if(global.postsCache == undefined) {
posts.getPosts(0, postsPerPage, function (err, posts_){
- postsCache = posts_;
+ global.postsCache = posts_;
callback(posts_);
});
} else {
- callback(postsCache);
+ callback(global.postsCache);
}
};
exports.destroyCache = function() {
- postsCache = undefined;
+ global.postsCache = undefined;
}
// normal functions
@@ -6,6 +6,6 @@
{
"host": "127.0.0.1",
"port": "3001",
- "socket_port": "6110"
+ "socket_port": "6111"
}
]
@@ -32,41 +32,19 @@ exports.startClusterServer = function(serverPort) {
function sendCommandToNode(node, command, callback) {
var serverResponded = false;
- var server = dgram.createSocket("udp4");
+ var client = dgram.createSocket("udp4");
- server.on("message", function (msg, rinfo) {
- serverResponded = true;
- server.close();
- callback(msg.toString("ascii"));
- });
- server.on("listening", function () {
- var client = dgram.createSocket("udp4");
- var message = new Buffer(command);
-
- var port = 6108;
-
- if(cluster.socket_port != undefined) {
- if(!isNaN(parseInt(cluster.socket_port))) {
- port = parseInt(cluster.socket_port);
- }
- }
-
- client.send(message, 0, message.length, port, node.host, function (err, bytes) {
- client.close();
- if(err) {
- server.close();
- callback(null);
- } else {
- setTimeout(function(){
- if(serverResponded == false) {
- server.close();
- callback(null);
- }
- }, 500);
- }
- });
- });
- server.bind(6109);
+ var port = 0;
+ if(node.socket_port) {
+ port = parseInt(node.socket_port);
+ } else {
+ port = 6108;
+ }
+
+ var message = new Buffer(command);
+
+ client.send(message, 0, message.length, port, node.host);
+ client.close();
}
var updateClusterStatus = function(callback) {
@@ -137,6 +115,7 @@ exports.startClusterServer = function(serverPort) {
function runNextClusterClientCheck() {
setTimeout(function(){
loadClusters(function(){
+ console.log('Clusters: ' + sys.inspect(clusterInstances));
runNextClusterClientCheck();
});
}, 5000);
@@ -229,37 +208,43 @@ exports.startClusterServer = function(serverPort) {
});
});
- // proxy_client.addListener("error", function (error) {
- // for(var i=0; i<_cluster.length; i++) {
- // if(node.host == _cluster[i].host && node.port == _cluster[i].port) {
- // sys.puts('error, deactivating: '+node.host+':'+node.port);
- // _cluster[i].active = false;
- // _updateActives();
- // }
- //
- // clearTimeout(_checkTimeout[_cluster[i].host + ':' + _cluster[i].port]);
- // _clusterNodeCheck(_cluster[i]);
- // }
- //
- // setTimeout(function() {
- // _requestHandler(request, response);
- // }, 200);
- // });
+ proxy_client.addListener("error", function (error) {
+ for(var i=0; i < clusterInstances.length; i++) {
+ if(node.host == clusterInstances[i].host && node.port == clusterInstances[i].port) {
+ sys.puts('[CLUSTER SERVER] [ERROR] Deactivating cluster ' + node.host + ':' + node.port);
+ clusterInstances[i].isAvailable = false;
+ }
+ }
+ });
proxy_request.end();
});
}
};
+ function verifyCommandOrigin(rinfo) {
+ sys.inspect(rinfo);
+ return true;
+ for(var i=0; i < clusterInstances.length; i++) {
+ // if(rinfo.)
+ }
+ }
+
function initializeNodeCommandsListener(port, callback) {
var server = dgram.createSocket("udp4");
server.on("message", function (msg, rinfo) {
- var message = msg.toString("ascii");
- var activeNodes = getAvailableNodes();
- if(message == "DESTROY_CACHE_POSTS") {
+ if(verifyCommandOrigin(rinfo)) {
+ var message = msg.toString("ascii");
+ var activeNodes = getAvailableNodes();
+ if(message.indexOf("DESTROY_CACHE") >= 0) {
+ console.log('[CLUSTER SERVER] Got destroy cache command from node.')
+ for(var i=0; i < activeNodes.length; i++) {
+ sendCommandToNode(activeNodes[i], message, function(){})
+ }
+ }
}
- });
+ });
server.on("listening", function () {
console.log('[CLUSTER SERVER] Initialized node command listener on port ' + port);
});
View
@@ -17,28 +17,37 @@ exports.startServer = function(serverPort, clusterServerIp, clusterServerPort, c
, adminFilter = require('../filters/admin');
// Configure cluster comunication
+
+ global.sendCommandToClusterServer = function(command) {
+ var message = new Buffer(command);
+ var client = dgram.createSocket("udp4");
+ client.send(message, 0, message.length, 6110, clusterServerIp);
+ client.close();
+ };
if(clusterServerIp != 0) {
var server = dgram.createSocket("udp4");
- // var client = dgram.createSocket("udp4");
- // client.send(message, 0, message.length, clusterServerPort, clusterServerIp.toString());
- // client.close();
-
- // var server = dgram.createSocket("udp4");
server.on("message", function (msg, rinfo) {
if((rinfo.address == clusterServerIp || (rinfo.address == "127.0.0.1" && clusterServerIp == "0.0.0.0"))) {
- // console.log("[CLUSTER INSTANCE] Received commands from cluster server. Executing... ");
- if(msg.toString("ascii") == "$_is_online_$") {
+ var message = msg.toString("ascii");
+ if(message == "$_is_online_$") {
var client = dgram.createSocket("udp4");
var message = new Buffer("OK");
client.send(message, 0, message.length, 6109, clusterServerIp.toString());
client.close();
- }
+ } else if(message == "DESTROY_CACHE_POSTS") {
+ console.log('[CLUSTER INSTANCE] Destroying posts cache...')
+ postsController.destroyCache();
+ } else if(message == "DESTROY_CACHE_TAGS") {
+ console.log('[CLUSTER INSTANCE] Destroying tags cache...')
+ postsController.updateTagsCache();
+ } else if(message == "DESTROY_CACHE_PAGES") {
+ console.log('[CLUSTER INSTANCE] Destroying pages cache...')
+ pagesController.updatePagesCache();
+ }
} else {
console.log("[CLUSTER INSTANCE] [ERROR] Command origin is invalid. ");
}
- // console.log("server got: " + msg + " from " +
- // rinfo.address + ":" + rinfo.port);
});
server.on("listening", function () {
var address = server.address();

0 comments on commit f1f8932

Please sign in to comment.