Permalink
Browse files

trying out message queues

  • Loading branch information...
1 parent 44c3123 commit 95e87867cded00b35fbb6b2fb6af06626c7ba660 @remy committed Feb 11, 2012
Showing with 45 additions and 41 deletions.
  1. +27 −16 app.js
  2. +18 −25 prefix.js
View
43 app.js
@@ -5,10 +5,14 @@
var express = require('express'),
md = require('node-markdown').Markdown,
- Prefix = require('./prefix'),
fs = require('fs'),
path = require('path'),
- cp = require('child_process');
+ // cp = require('child_process'),
+ zmq = require('zmq'),
+ sock = zmq.socket('req');
+
+sock.bindSync('tcp://127.0.0.1:3000');
+console.log('Producer bound to port 3000');
path.exists(__dirname + '/jobs', function (exists) {
if (!exists) {
@@ -61,8 +65,9 @@ app.get(/favicon.ico|humans.txt/i, function (req, res) {
app.get('/check.json', function (req, res) {
var url = Object.keys(req.query)[0];
- var prefix = cp.fork(__dirname + '/prefix.js');
- prefix.on('message', function(event) {
+
+ sock.once('message', function(event) {
+ event = JSON.parse(event);
console.log(event);
if (event.type == 'dirty') {
res.send({ pass: false, lint: event.lint });
@@ -71,7 +76,7 @@ app.get('/check.json', function (req, res) {
}
});
- prefix.send({ type: 'start', url: url, dirtyExit: true });
+ sock.send(JSON.stringify({ type: 'start', url: url, dirtyExit: true }));
});
app.get('/check', function (req, res) {
@@ -83,19 +88,25 @@ app.get('/check', function (req, res) {
res.send('<a href="/jobs/' + job + '.zip">' + job + '.zip</a>');
};
- if (!exists) {
- var prefix = cp.fork(__dirname + '/prefix.js');
-
- prefix.on('message', function(event) {
- // console.log(event);
- if (event.type == 'end') {
- ready();
- }
- // res.writeHead(200, { 'content-type': 'text/css' });
- // res.end('');
+ if (true || !exists) {
+ sock.send(JSON.stringify({ type: 'start', url: url }));
+ sock.once('message', function (data) {
+ console.log(JSON.parse(data));
+ ready();
});
- prefix.send({ type: 'start', url: url });
+ // var prefix = cp.fork(__dirname + '/prefix.js');
+
+ // prefix.on('message', function(event) {
+ // // console.log(event);
+ // if (event.type == 'end') {
+ // ready();
+ // }
+ // // res.writeHead(200, { 'content-type': 'text/css' });
+ // // res.end('');
+ // });
+
+ // prefix.send({ type: 'start', url: url });
} else {
ready();
}
View
@@ -8,16 +8,10 @@ var CSSLint = require("csslint").CSSLint,
events = require('events'),
jsdom = require('jsdom'),
request = require('request'),
- exec = require('child_process').exec;
-
-if (!process.send) {
- process.send = function (data) {
- console.log(data);
- // var event = data.event;
- // delete data.event;
- // process.emit(event, data);
- }
-}
+ exec = require('child_process').exec,
+ zmq = require('zmq'),
+ sock = zmq.socket('rep');
+
/**
* TODO:
@@ -49,7 +43,7 @@ Prefix.prototype.end = function () {
console.log('exec error: ' + error);
}
- process.send({ type: 'end', path: self.dir, job: self.job });
+ sock.send(JSON.stringify({ type: 'end', path: self.dir, job: self.job }));
self.complete && self.complete();
});
};
@@ -64,7 +58,7 @@ Prefix.prototype.parseURL = function (url) {
} catch (e) {}
- process.send({ type: 'message', job: self.job });
+ // process.send({ type: 'message', job: self.job });
if (url.indexOf('http') === -1) {
url = 'http://' + url;
@@ -132,9 +126,9 @@ Prefix.prototype.parseHTML = function (html, url) {
// this tells the parent process to show a fail or success whilst we continue with the processing
self.dirty = true;
- process.send({ type: 'dirty', lint: message });
+ sock.send(JSON.stringify({ type: 'dirty', lint: message }));
if (self.dirtyExit) {
- process.exit();
+ // process.exit();
}
}
});
@@ -320,28 +314,27 @@ function urlAsPath(url) {
return url.toLowerCase().replace(/.*?:\/\//, '').replace(/\?/, '-').replace(/\//g, '_');
}
-
if (!module.parent && process.argv[2]) {
(new Prefix()).parseURL(process.argv[2]);
-} else {
+} else if (module.parent) {
module.exports = Prefix;
-}
+} else {
+ sock.connect('tcp://127.0.0.1:3000');
+ console.log('Worker connected to port 3000');
-process.on('message', function (data) {
- console.log(data);
- if (data.type == 'start') {
+ sock.on('message', function(data) {
+ console.log('work: %s', data);
+ data = JSON.parse(data);
var prefix = new Prefix(function () {
// process.send()
console.log('all done');
+ sock.send(JSON.stringify({ type: 'done ' }));
});
prefix.dirtyExit = data.dirtyExit;
console.log('>>' + data.url);
prefix.parseURL(data.url);
- }
-});
-
-
-
+ });
+}

0 comments on commit 95e8786

Please sign in to comment.