Permalink
Browse files

data-type support from Vasili Sviridov's patches

  • Loading branch information...
1 parent 4dfea19 commit 45c1dbe65186f58f470eca0cba7c4090783d2824 @postwait committed Jun 1, 2011
Showing with 610 additions and 27 deletions.
  1. +128 −27 amqp.js
  2. +481 −0 jspack.js
  3. +1 −0 package.json
View
155 amqp.js
@@ -2,6 +2,7 @@ var events = require('events'),
sys = require('sys'),
net = require('net'),
protocol,
+ jspack = require('./jspack').jspack,
Buffer = require('buffer').Buffer,
Promise = require('./promise').Promise;
@@ -271,7 +272,7 @@ function parseInt (buffer, size) {
function parseShortString (buffer) {
var length = buffer[buffer.read++];
- var s = buffer.toString('utf-8', buffer.read, buffer.read+length);
+ var s = buffer.toString('utf8', buffer.read, buffer.read+length);
buffer.read += length;
return s;
}
@@ -310,18 +311,31 @@ function parseTable (buffer) {
break;
case 'D'.charCodeAt(0):
- var decimals = buffer[buffer.read++];
- var int = parseInt(buffer, 4);
- // TODO make into float...?
- // FIXME this isn't correct
- table[field] = '?';
+ var dec = parseInt(buffer, 1);
+ var num = parseInt(buffer, 4);
+ table[field] = num / (dec * 10);
break;
+ case 'd'.charCodeAt(0):
+ var b = [];
+ for (var i = 0; i < 8; ++i)
+ b[i] = buffer[buffer.read++];
+
+ table[field] = (new jspack(true)).Unpack('d', b);
+ break;
+
+ case 'f'.charCodeAt(0):
+ var b = [];
+ for (var i = 0; i < 4; ++i)
+ b[i] = buffer[buffer.read++];
+
+ table[field] = (new jspack(true)).Unpack('f', b);
+ break;
+
case 'T'.charCodeAt(0):
- // 64bit time stamps. Awesome.
var int = parseInt(buffer, 8);
- // TODO FIXME this isn't correct
- table[field] = '?';
+ table[field] = new Date();
+ table[field].setTime(int * 1000);
break;
case 'F'.charCodeAt(0):
@@ -331,11 +345,16 @@ function parseTable (buffer) {
case 'l'.charCodeAt(0):
table[field] = parseInt(buffer, 8);
break;
-
+
case 't'.charCodeAt(0):
table[field] = (parseInt(buffer, 1) > 0);
break;
+ case 'a'.charCodeAt(0):
+ var len = parseInt(buffer, 4);
+ table[field] = buffer.splice(buffer.read, buffer.read + len);
+ break;
+
default:
throw new Error("Unknown field value type " + buffer[buffer.read-1]);
}
@@ -466,9 +485,27 @@ AMQPParser.prototype._parseHeaderFrame = function (channel, buffer) {
}
};
+function serializeFloat(b, size, value, bigEndian) {
+ var jp = new jspack(bigEndian);
+
+ switch(size) {
+ case 4:
+ var x = jp.pack('f', [value]);
+ for (var i = 0; i < x.length; ++i)
+ b[b.used++] = x[i];
+ break;
+
+ case 8:
+ var x = jp.pack('d', [value]);
+ for (var i = 0; i < x.length; ++i)
+ b[b.used++] = x[i];
+ break;
+
+ default:
+ throw new Error("Unknown floating point size");
+ }
+}
-// Network byte order serialization
-// (NOTE: javascript always uses network byte order for its ints.)
function serializeInt (b, size, int) {
if (b.used + size > b.length) {
throw new Error("write out of bounds");
@@ -538,7 +575,7 @@ function serializeLongString (b, string) {
if (typeof(string) == 'string') {
var byteLength = Buffer.byteLength(string, 'utf8');
serializeInt(b, 4, byteLength);
- b.write(string, b.used, 'utf-8');
+ b.write(string, b.used, 'utf8');
b.used += byteLength;
} else if (typeof(string) == 'object') {
serializeTable(b, string);
@@ -551,14 +588,48 @@ function serializeLongString (b, string) {
}
}
+function serializeDate(b, date) {
+ serializeInt(b, 8, date.valueOf() / 1000);
+}
+
+function serializeBuffer(b, buffer) {
+ serializeInt(b, 4, buffer.length);
+
+ for (var i = 0; i < buffer.length; ++i) {
+ b[b.used++] = buffer[i];
+ }
+}
+
+function serializeBase64(b, buffer) {
+ serializeLongString(b, buffer.toString('base64'));
+}
+
+function isBigInt(value) {
+ return value > 0xffffffff;
+}
+
+function getCode(dec) {
+ var hexArray = "0123456789ABCDEF".split('');
+
+ var code1 = Math.floor(dec / 16);
+ var code2 = dec - code1 * 16;
+ return hexArray[code2];
+}
+
+function isFloat(value)
+{
+ return value === +value && value !== (value|0);
+}
function serializeTable (b, object) {
if (typeof(object) != "object") {
throw new Error("param must be an object");
}
+ // Save our position so that we can go back and write the length of this table
+ // at the beginning of the packet (once we know how many entries there are).
var lengthIndex = b.used;
- b.used += 4; // for the long
+ b.used += 4; // sizeof long
var startIndex = b.used;
@@ -576,22 +647,43 @@ function serializeTable (b, object) {
break;
case 'number':
- if (value <= 0xFFFFFFFF) {
- b[b.used++] = 'I'.charCodeAt(0);
- serializeInt(b, 4, value);
- } else if (value > 0xFFFFFFFF) {
- b[b.used++] = 'T'.charCodeAt(0);
- serializeInt(b, 8, value);
- }
- // TODO decimal? meh.
+ if (!isFloat(value)) {
+ if (isBigInt(value)) {
+ // 64-bit uint
+ b[b.used++] = 'l'.charCodeAt(0);
+ serializeInt(b, 8, value);
+ } else {
+ //32-bit uint
+ b[b.used++] = 'I'.charCodeAt(0);
+ serializeInt(b, 4, value);
+ }
+ } else {
+ //64-bit float
+ b[b.used++] = 'd'.charCodeAt(0);
+ serializeFloat(b, 8, value);
+ }
break;
- case 'object':
- serializeTable(b, value);
+ case 'boolean':
+ b[b.used++] = 't'.charCodeAt(0);
+ b[b.used++] = value;
break;
default:
- throw new Error("unsupported type in amqp table");
+ if(value instanceof Date) {
+ b[b.used++] = 'T'.charCodeAt(0);
+ serializeDate(b.value);
+ } else if (value instanceof Buffer) {
+ b[b.used++] = 'a'.charCodeAt(0);
+ serializeBuffer(b, value);
+ } else {
+ if(typeof(value) === 'object') {
+ b[b.used++] = 'F'.charCodeAt(0);
+ serializeTable(b, value);
+ } else {
+ this.throwError("unsupported type in amqp table: " + typeof(value));
+ }
+ }
}
}
@@ -1359,8 +1451,17 @@ Queue.prototype.subscribe = function (/* options, messageListener */) {
json._routingKey = m.routingKey;
json._deliveryTag = m.deliveryTag;
- if(messageListener) messageListener(json);
- self.emit('message', json, this.headers);
+ var headers = {};
+ for (var i in this.headers) {
+ if(this.headers.hasOwnProperty(i)) {
+ if(this.headers[i] instanceof Buffer)
+ headers[i] = this.headers[i].toString();
+ else
+ headers[i] = this.headers[i];
+ }
+ }
+ if (messageListener) messageListener(json, headers);
+ self.emit('message', json, headers);
});
});
};
Oops, something went wrong.

0 comments on commit 45c1dbe

Please sign in to comment.