Skip to content

Commit

Permalink
Merge pull request JustinTulloss#99 from utvara/master
Browse files Browse the repository at this point in the history
Zmq Device examples written for node
  • Loading branch information
tj committed Mar 15, 2012
2 parents b04e212 + 81fe802 commit c70bbbf
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 3 deletions.
69 changes: 69 additions & 0 deletions examples/devices/forwarder.js
@@ -0,0 +1,69 @@
/*
*
* Forwarder device
*
*/

var zmq = require('../../')
, frontPort = 'tcp://127.0.0.1:12345'
, backPort = 'tcp://127.0.0.1:12346';

function createClient (port) {
var socket = zmq.socket('pub');

socket.identity = 'client' + process.pid;

socket.connect(port);
console.log('client connected!');

setInterval(function() {
var value = Math.floor(Math.random()*100);

console.log(socket.identity + ': broadcasting ' + value);
socket.send(value);
}, 100);
};

function createWorker (port) {
var socket = zmq.socket('sub');

socket.identity = 'worker' + process.pid;

socket.subscribe('');
socket.on('message', function(data) {
console.log(socket.identity + ': got ' + data.toString());
});

socket.connect(port, function(err) {
if (err) throw err;
console.log('worker connected!');
});
};

function createForwarderDevice(frontPort, backPort) {
var frontSocket = zmq.socket('sub'),
backSocket = zmq.socket('pub');

frontSocket.identity = 'sub' + process.pid;
backSocket.identity = 'pub' + process.pid;

frontSocket.subscribe('');
frontSocket.bind(frontPort, function (err) {
console.log('bound', frontPort);
});

frontSocket.on('message', function() {
//pass to back
console.log('forwarder: recasting', arguments[0].toString());
backSocket.send(Array.prototype.slice.call(arguments));
});

backSocket.bind(backPort, function (err) {
console.log('bound', backPort);
});
}

createForwarderDevice(frontPort, backPort);

createClient(frontPort);
createWorker(backPort);
78 changes: 78 additions & 0 deletions examples/devices/queue.js
@@ -0,0 +1,78 @@
/*
*
* Queue device
*
*/

var zmq = require('../../')
, frontPort = 'tcp://127.0.0.1:12345'
, backPort = 'tcp://127.0.0.1:12346';

function createClient (port) {
var socket = zmq.socket('req');

socket.identity = 'client' + process.pid;

socket.on('message', function(data) {
console.log(socket.identity + ': answer data ' + data);
});

socket.connect(port);
console.log('client connected!');

setInterval(function() {
var value = Math.floor(Math.random()*100);

console.log(socket.identity + ': asking ' + value);
socket.send(value);
}, 100);
};

function createServer (port) {
var socket = zmq.socket('rep');

socket.identity = 'server' + process.pid;

socket.on('message', function(data) {
console.log(socket.identity + ': received ' + data.toString());
socket.send(data * 2);
});

socket.connect(port, function(err) {
if (err) throw err;
console.log('server connected!');
});
};

function createQueueDevice(frontPort, backPort) {
var frontSocket = zmq.socket('router'),
backSocket = zmq.socket('dealer');

frontSocket.identity = 'router' + process.pid;
backSocket.identity = 'dealer' + process.pid;

frontSocket.bind(frontPort, function (err) {
console.log('bound', frontPort);
});

frontSocket.on('message', function() {
//pass to back
console.log('router: sending to server', arguments[0].toString(), arguments[2].toString());
backSocket.send(Array.prototype.slice.call(arguments));
});

backSocket.bind(backPort, function (err) {
console.log('bound', backPort);
});

backSocket.on('message', function() {
//pass to front
console.log('dealer: sending to client', arguments[0].toString(), arguments[2].toString());
frontSocket.send(Array.prototype.slice.call(arguments));
});
}

createQueueDevice(frontPort, backPort);

createClient(frontPort);
createServer(backPort);
67 changes: 67 additions & 0 deletions examples/devices/streamer.js
@@ -0,0 +1,67 @@
/*
*
* Forwarder device
*
*/

var zmq = require('../../')
, frontPort = 'tcp://127.0.0.1:12345'
, backPort = 'tcp://127.0.0.1:12346';

function createClient (port) {
var socket = zmq.socket('push');

socket.identity = 'client' + process.pid;

socket.connect(port);
console.log('client connected!');

setInterval(function() {
var value = Math.floor(Math.random()*100);

console.log(socket.identity + ': pushing ' + value);
socket.send(value);
}, 100);
};

function createWorker (port) {
var socket = zmq.socket('pull');

socket.identity = 'worker' + process.pid;

socket.on('message', function(data) {
console.log(socket.identity + ': pulled ' + data.toString());
});

socket.connect(port, function(err) {
if (err) throw err;
console.log('worker connected!');
});
};

function createStreamerDevice(frontPort, backPort) {
var frontSocket = zmq.socket('pull'),
backSocket = zmq.socket('push');

frontSocket.identity = 'sub' + process.pid;
backSocket.identity = 'pub' + process.pid;

frontSocket.bind(frontPort, function (err) {
console.log('bound', frontPort);
});

frontSocket.on('message', function() {
//pass to back
console.log('forwarder: sending downstream', arguments[0].toString());
backSocket.send(Array.prototype.slice.call(arguments));
});

backSocket.bind(backPort, function (err) {
console.log('bound', backPort);
});
}

createStreamerDevice(frontPort, backPort);

createClient(frontPort);
createWorker(backPort);
6 changes: 3 additions & 3 deletions examples/req_rep.js
Expand Up @@ -4,9 +4,9 @@
*
*/

var cluster = require('cluster'),
zeromq = require('zmq'),
port = 'tcp://127.0.0.1:12345';
var cluster = require('cluster')
, zeromq = require('../')
, port = 'tcp://127.0.0.1:12345';

if (cluster.isMaster) {
//Fork servers.
Expand Down

0 comments on commit c70bbbf

Please sign in to comment.