Skip to content

Commit

Permalink
Factor out exploding in callback API
Browse files Browse the repository at this point in the history
It's not quite as convenient as the promise API, in which errors pop
up at the end of the then-chaining (provided you remember to return
things correctly). This is a lowest-common denominator approach, and
it keeps the programs self-contained.

The repetition indicates there's probably a nicer way to deal with
errors. Or not (because JavaScript).
  • Loading branch information
squaremo committed May 5, 2014
1 parent 8b65b4d commit b4cdf11
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 43 deletions.
9 changes: 7 additions & 2 deletions examples/tutorials/callback_api/emit_log.js
Expand Up @@ -2,13 +2,18 @@

var amqp = require('amqplib/callback_api');

function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err);

var ex = 'logs';

function on_channel_open(err, ch) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);
ch.assertExchange(ex, 'fanout', {durable: false});
var msg = process.argv.slice(2).join(' ') ||
'info: Hello World!';
Expand Down
10 changes: 8 additions & 2 deletions examples/tutorials/callback_api/emit_log_direct.js
Expand Up @@ -6,13 +6,19 @@ var args = process.argv.slice(2);
var severity = (args.length > 0) ? args[0] : 'info';
var message = args.slice(1).join(' ') || 'Hello World!';

function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err);

var ex = 'direct_logs';
var exopts = {durable: false};

function on_channel_open(err, ch) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);
ch.assertExchange(ex, 'direct', exopts, function(err, ok) {
ch.publish(ex, severity, new Buffer(message));
ch.close(function() { conn.close(); });
Expand Down
9 changes: 7 additions & 2 deletions examples/tutorials/callback_api/emit_log_topic.js
Expand Up @@ -6,12 +6,17 @@ var args = process.argv.slice(2);
var key = (args.length > 0) ? args[0] : 'info';
var message = args.slice(1).join(' ') || 'Hello World!';

function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err);
var ex = 'topic_logs', exopts = {durable: false};
conn.createChannel(function(err, ch) {
ch.assertExchange(ex, 'topic', exopts, function(err, ok) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);
ch.publish(ex, key, new Buffer(message));
console.log(" [x] Sent %s:'%s'", key, message);
ch.close(function() { conn.close(); });
Expand Down
16 changes: 10 additions & 6 deletions examples/tutorials/callback_api/new_task.js
Expand Up @@ -2,22 +2,26 @@

var amqp = require('amqplib/callback_api');

function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err);

var q = 'task_queue';

function on_channel_open(err, ch) {
if (err !== null) return console.error(err);
conn.createChannel(function(err, ch) {
if (err !== null) return bail(err, conn);
ch.assertQueue(q, {durable: true}, function(err, _ok) {
if (err !== null) return bail(err, conn);
var msg = process.argv.slice(2).join(' ') || "Hello World!";
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
console.log(" [x] Sent '%s'", msg);
ch.close(function() { conn.close(); });
});
}

conn.createChannel(on_channel_open);
});
}

amqp.connect(on_connect);
9 changes: 7 additions & 2 deletions examples/tutorials/callback_api/receive.js
Expand Up @@ -2,15 +2,20 @@

var amqp = require('amqplib/callback_api');

function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err);
process.once('SIGINT', function() { conn.close(); });

var q = 'hello';

function on_channel_open(err, ch) {
ch.assertQueue(q, {durable: false}, function(err, ok) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);
ch.consume(q, function(msg) { // message callback
console.log(" [x] Received '%s'", msg.content.toString());
}, {noAck: true}, function(_consumeOk) { // consume callback
Expand Down
13 changes: 9 additions & 4 deletions examples/tutorials/callback_api/receive_logs.js
Expand Up @@ -2,19 +2,24 @@

var amqp = require('amqplib/callback_api');

function on_connect(err, conn) {
if (err !== null) return console.error(err);
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
if (err !== null) return bail(err);
process.once('SIGINT', function() { conn.close(); });

var ex = 'logs';

function on_channel_open(err, ch) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);
ch.assertQueue('', {exclusive: true}, function(err, ok) {
var q = ok.queue;
ch.bindQueue(q, ex, '');
ch.consume(q, logMessage, {noAck: true}, function(err, ok) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);
console.log(" [*] Waiting for logs. To exit press CTRL+C.");
});
});
Expand Down
16 changes: 11 additions & 5 deletions examples/tutorials/callback_api/receive_logs_direct.js
Expand Up @@ -11,29 +11,35 @@ if (severities.length < 1) {
process.exit(1);
}

function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err);
process.once('SIGINT', function() { conn.close(); });

conn.createChannel(function(err, ch) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);
var ex = 'direct_logs', exopts = {durable: false};

ch.assertExchange(ex, 'direct', exopts);
ch.assertQueue('', {exclusive: true}, function(err, ok) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);

var queue = ok.queue, i = 0;

function sub(err) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);
else if (i < severities.length) {
ch.bindQueue(queue, ex, severities[i], {}, sub);
i++;
}
}

ch.consume(queue, logMessage, {noAck: true}, function(err) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
sub(null);
});
Expand Down
16 changes: 11 additions & 5 deletions examples/tutorials/callback_api/receive_logs_topic.js
Expand Up @@ -10,29 +10,35 @@ if (keys.length < 1) {
process.exit(1);
}

function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err);
process.once('SIGINT', function() { conn.close(); });

conn.createChannel(function(err, ch) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);
var ex = 'topic_logs', exopts = {durable: false};

ch.assertExchange(ex, 'topic', exopts);
ch.assertQueue('', {exclusive: true}, function(err, ok) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);

var queue = ok.queue, i = 0;

function sub(err) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);
else if (i < keys.length) {
ch.bindQueue(queue, ex, keys[i], {}, sub);
i++;
}
}

ch.consume(queue, logMessage, {noAck: true}, function(err) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
sub(null);
});
Expand Down
12 changes: 9 additions & 3 deletions examples/tutorials/callback_api/rpc_client.js
Expand Up @@ -15,21 +15,27 @@ catch (e) {
process.exit(1);
}

function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err);
conn.createChannel(function(err, ch) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);

var correlationId = uuid();
function maybeAnswer(msg) {
if (msg.properties.correlationId === correlationId) {
console.log(' [.] Got %d', msg.content.toString());
}
else return bail(new Error('Unexpected message'), conn);
ch.close(function() { conn.close(); });
}

ch.assertQueue('', {exclusive: true}, function(err, ok) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);
var queue = ok.queue;
ch.consume(queue, maybeAnswer, {noAck:true});
console.log(' [x] Requesting fib(%d)', n);
Expand Down
9 changes: 7 additions & 2 deletions examples/tutorials/callback_api/rpc_server.js
Expand Up @@ -11,8 +11,13 @@ function fib(n) {
return a;
}

function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err);

process.once('SIGINT', function() { conn.close(); });

Expand All @@ -22,7 +27,7 @@ function on_connect(err, conn) {
ch.assertQueue(q, {durable: false});
ch.prefetch(1);
ch.consume(q, reply, {noAck:false}, function(err) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err, conn);
console.log(' [x] Awaiting RPC requests');
});

Expand Down
13 changes: 9 additions & 4 deletions examples/tutorials/callback_api/send.js
Expand Up @@ -2,16 +2,21 @@

var amqp = require('amqplib/callback_api');

function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err);

var q = 'hello';
var msg = 'Hello World!';

function on_channel_open(err, ch) {
if (err !== null) return console.error(err);
ch.assertQueue(q, {durable: false}, function(e, ok) {
if (e !== null) return console.error(err);
if (err !== null) return bail(err, conn);
ch.assertQueue(q, {durable: false}, function(err, ok) {
if (err !== null) return bail(err, conn);
ch.sendToQueue(q, new Buffer(msg));
console.log(" [x] Sent '%s'", msg);
ch.close(function() { conn.close(); });
Expand Down
15 changes: 9 additions & 6 deletions examples/tutorials/callback_api/worker.js
Expand Up @@ -2,14 +2,19 @@

var amqp = require('amqplib/callback_api');

function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
if (err !== null) return console.error(err);
if (err !== null) return bail(err);
process.once('SIGINT', function() { conn.close(); });

var q = 'task_queue';

function on_channel_open(err, ch) {
if (err !== null) return console.error(err);
conn.createChannel(function(err, ch) {
if (err !== null) return bail(err, conn);
ch.assertQueue(q, {durable: true}, function(err, _ok) {
ch.consume(q, doWork, {no_ack: false});
console.log(" [*] Waiting for messages. To exit press CTRL+C");
Expand All @@ -24,9 +29,7 @@ function on_connect(err, conn) {
ch.ack(msg);
}, secs * 1000);
}
}

conn.createChannel(on_channel_open);
});
}

amqp.connect(on_connect);

0 comments on commit b4cdf11

Please sign in to comment.