diff --git a/bin/pump b/bin/pump index c141ff026..7e96190f4 100755 --- a/bin/pump +++ b/bin/pump @@ -95,9 +95,9 @@ var launchApps = function(config) { } // Useful for some processes in each worker - config.workers = cnt; + config.workers = cnt > 0 ? 1 : cnt; - if (cluster.isMaster) { + if (cluster.isMaster && config.children !== 0) { Dispatch.start(log); diff --git a/lib/pumpsocket.js b/lib/pumpsocket.js index 4dce1c072..8cc19bce0 100644 --- a/lib/pumpsocket.js +++ b/lib/pumpsocket.js @@ -52,7 +52,12 @@ var connect = function(app, log) { id2conn = {}, follow = function(url, id) { if (!_.has(url2id, url)) { - cluster.worker.send({cmd: "follow", url: url}); + var msg = {cmd: "follow", url: url}; + if (app.config.children !== 0) { + cluster.worker.send(msg); + } else { + handleWorkerMessage(msg); + } url2id[url] = [id]; } if (!_.contains(url2id[url], id)) { @@ -66,7 +71,12 @@ var connect = function(app, log) { if (_.has(url2id, url) && _.contains(url2id[url], id)) { url2id[url].splice(url2id[url].indexOf(id), 1); if (url2id[url].length === 0) { - cluster.worker.send({cmd: "unfollow", url: url}); + var msg = {cmd: "unfollow", url: url}; + if (app.config.children !== 0) { + cluster.worker.send(msg); + } else { + handleWorkerMessage(msg); + } delete url2id[url]; } } @@ -254,56 +264,57 @@ var connect = function(app, log) { } } ); - }; - - cluster.worker.on("message", function(msg) { - var ids; - if (msg.cmd == "update") { - ids = url2id[msg.url]; - slog.info({activity: msg.activity.id, connections: (ids) ? ids.length : 0}, "Delivering activity to connections"); - if (ids && ids.length) { - _.each(ids, function(id) { - var act, profile, conn = id2conn[id]; - if (!conn) { - return; - } - act = new Activity(msg.activity); - Step( - function() { - var profile = (conn.user) ? conn.user.profile : null; - act.checkRecipient(profile, this); - }, - function(err, ok) { - if (err) throw err; - if (!ok) { - conn.log.info({activity: msg.activity.id}, "No access; not delivering activity"); - return; - } - addLiked(profile, [act.object], this.parallel()); - addLikers(profile, [act.object], this.parallel()); - addShared(profile, [act.object], this.parallel()); - firstFewReplies(profile, [act.object], this.parallel()); - firstFewShares(profile, [act.object], this.parallel()); - if (act.object.isFollowable()) { - addFollowed(profile, [act.object], this.parallel()); - } - }, - function(err) { - var tosend; - if (err) { - conn.log.error({err: err}, "Error finishing object"); - } else { - tosend = _.pick(msg, "cmd", "url"); - tosend.activity = act; - conn.log.info({activity: msg.activity.id}, "Delivering activity"); - conn.write(JSON.stringify(tosend)); - } + }, + handleWorkerMessage = function(msg) { + var ids; + if (msg.cmd == "update") { + ids = url2id[msg.url]; + slog.info({activity: msg.activity.id, connections: (ids) ? ids.length : 0}, "Delivering activity to connections"); + if (ids && ids.length) { + _.each(ids, function(id) { + var act, profile, conn = id2conn[id]; + if (!conn) { + return; } - ); - }); + act = new Activity(msg.activity); + Step( + function() { + var profile = (conn.user) ? conn.user.profile : null; + act.checkRecipient(profile, this); + }, + function(err, ok) { + if (err) throw err; + if (!ok) { + conn.log.info({activity: msg.activity.id}, "No access; not delivering activity"); + return; + } + addLiked(profile, [act.object], this.parallel()); + addLikers(profile, [act.object], this.parallel()); + addShared(profile, [act.object], this.parallel()); + firstFewReplies(profile, [act.object], this.parallel()); + firstFewShares(profile, [act.object], this.parallel()); + if (act.object.isFollowable()) { + addFollowed(profile, [act.object], this.parallel()); + } + }, + function(err) { + var tosend; + if (err) { + conn.log.error({err: err}, "Error finishing object"); + } else { + tosend = _.pick(msg, "cmd", "url"); + tosend.activity = act; + conn.log.info({activity: msg.activity.id}, "Delivering activity"); + conn.write(JSON.stringify(tosend)); + } + } + ); + }); + } } - } - }); + }; + + cluster.worker.on("message", handleWorkerMessage); server = sockjs.createServer(options);