Browse files

initial commit: extracted from genericore-mailsrc

  • Loading branch information...
0 parents commit 6dff5d909006b7fdb0a570dbdc3d0f96cbf5e6c3 @4z3 4z3 committed Jan 18, 2011
Showing with 209 additions and 0 deletions.
  1. +70 −0 README.md
  2. +6 −0 package.json
  3. +128 −0 src/lib/auto-amqp.js
  4. +5 −0 src/main.js
70 README.md
@@ -0,0 +1,70 @@
+# node-genericore
+
+This package contains an utility library for
+[Genericore](https://shackspace.de/wiki/doku.php?id=project:genericore)
+modules written for [node](http://nodejs.org/).
+
+*Please note that this is early development software
+ that lacks proper documentation, tests, and a lot of useful or even necessary features.*
+
+## System requirements
+
+- [node](http://nodejs.org/) v0.2.5 or compatible
+- [node-amqp](https://github.com/ry/node-amqp) v0.0.2 or compatible
+
+## API documentation
+
+### genericore.connect(config, callbacks)
+
+Connect to Genericore (using AMQP).
+Possible config object properties:
+
+* ``connection``:
+ The options object of [node-amqp](https://github.com/ry/node-amqp)'s
+ ``amqp.createConnection()``
+
+* ``reconnect_timeout``:
+ If set to a number then trigger a reconnect that number of milliseconds
+ after a connection error occured.
+
+* ``input``:
+ If set to an AMQP exchange name then subscribe to that exchange
+ (using an auto-generated, exclusive queue).
+
+* ``output``:
+ If set to an AMQP exchange name then publish to that exchange
+ (using type = fanout).
+
+
+Possible callbacks object properties
+(if any of the properties is undefined when the event occurs,
+then the default (*no op* if not specified else) gets called.
+I. e. callbacks may be modified at any time):
+
+
+* ``ready``:
+ Called when the ``output`` exchange was opened successfully with an
+ object parameter with following properties:
+
+ * ``publish``:
+ A function taking a single string parameter to publish to the
+ ``output`` exchange.
+ Undefined if no ``input`` exchange was named.
+ * ``end``:
+ A function taking no parameters to shutdown the connection to Genericore.
+
+* ``message``:
+ Called when a message gets published at the ``input`` exchange
+ with the message as a string parameter.
+
+* ``end``:
+ Called when the connection to Genericore is closed.
+
+* ``error``:
+ (default: *reconnect* or, if ``reconnect_timeout`` is not set, *throw error*)
+ Called when a connection error occurs.
+
+* ``debug``:
+ Called when a debug message occurs.
+ Defaults to ``console.log``.
+
6 package.json
@@ -0,0 +1,6 @@
+{ "name": "genericore"
+, "version": "0.0.1"
+, "description": "Utilities for Genericore modules"
+, "author": "tv"
+, "main": "./src/main"
+}
128 src/lib/auto-amqp.js
@@ -0,0 +1,128 @@
+
+var amqp = require('amqp');
+var inspect = require('sys').inspect;
+var nop = function () {};
+
+// Return keys of an object as array.
+var keys = function (object) {
+ var keys = [];
+ for (key in object) {
+ if (object.hasOwnProperty(key)) {
+ keys.push(key);
+ };
+ };
+ return keys;
+};
+
+var connect = function (config, callbacks) {
+
+ var log_debug = function (message) {
+ (callbacks.debug || nop)(message);
+ };
+
+ // 'reconnect()' is 'connect()' initially.
+ (function reconnect () {
+ var connection = amqp.createConnection(config.connection);
+
+ if (config.debug_connection_events === true) {
+ [
+ 'connect', 'secure', 'data', 'end', 'timeout', 'drain', 'close',
+ 'error'
+ ].forEach(function (event) {
+ connection.on(event, function () {
+ log_debug('event ' + inspect(event) + ' ' + inspect(arguments));
+ });
+ });
+ }
+
+ var capabilities = {
+ end: function () {
+ connection.end();
+ }
+ };
+ connection.on('close', function (has_error) {
+ (callbacks.end || nop)(has_error);
+ });
+
+ var ready_debit = 0;
+ var ready = function () {
+ if (--ready_debit === 0) {
+ log_debug('ready, capabilities: ' + inspect(keys(capabilities)));
+ (callbacks.ready || nop)(capabilities);
+ }
+ };
+
+ connection.on('ready', function () {
+ log_debug(
+ 'connected to '
+ + connection.serverProperties.product
+ + ' ' + connection.serverProperties.version
+ + ' ' + connection.serverProperties.platform
+ );
+ });
+
+ if (config.input) {
+ ++ready_debit;
+ connection.on('ready', function () {
+ var name = config.input;
+ var options = { exclusive: true };
+ log_debug('input exchange: ' + inspect(name) + ' ' + inspect(options));
+ var queue = connection.queue('', options,
+ function (messageCount, consumerCount) {
+ //log_debug('queue: ' + inspect(queue));
+ //log_debug('messageCount: ' + messageCount);
+ //log_debug('consumerCount: ' + consumerCount);
+ //log_debug('bind ' + inspect(name));
+ queue.bind(name, '').addCallback(function () {
+ queue.subscribe(function (message) {
+ (callbacks.message || nop)(message.data);
+ });
+ ready();
+ });
+ });
+ });
+ }
+
+ if (config.output) {
+ ++ready_debit;
+ connection.on('ready', function () {
+ var name = config.output;
+ var options = { type: 'fanout' };
+ log_debug('output exchange: ' + inspect(name) + ' ' + inspect(options));
+ var exchange = connection.exchange(name, options);
+ exchange.on('open', function () {
+ capabilities.publish = function(message) {
+ message = String(message);
+ log_debug('publish ' + inspect(message));
+ exchange.publish(name, message);
+ };
+ ready();
+ });
+ });
+ }
+
+ // Call ready() even if neither setup of input nor output was requested.
+ if (ready_debit === 0) {
+ ++ready_debit;
+ connection.on('ready', ready);
+ }
+
+ connection.on('error', function (err) {
+ if (config.reconnect_timeout) {
+ log_debug(err + '; retrying in ' + config.reconnect_timeout + 'ms');
+ // TODO only retry on ETIMEDOUT?
+ // TODO do we have to clean up something here?
+ setTimeout(reconnect, config.reconnect_timeout);
+ }
+ (callbacks.error || nop)(err);
+
+ if (!config.reconnect_timeout || !callbacks.error) {
+ // No one cares about us? Well, node will...
+ throw new Error(err.message);
+ }
+ });
+ })();
+};
+
+exports.connect = connect;
+
5 src/main.js
@@ -0,0 +1,5 @@
+
+var auto_amqp = require('./lib/auto-amqp');
+
+exports.connect = auto_amqp.connect;
+

0 comments on commit 6dff5d9

Please sign in to comment.