Skip to content

Commit

Permalink
AMF serialization works with the NetConnection.connect() response.
Browse files Browse the repository at this point in the history
Still needs to do splitting in chunks.
  • Loading branch information
nalply committed Oct 31, 2011
1 parent 7b97145 commit 8746aca
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 34 deletions.
12 changes: 8 additions & 4 deletions examples/connector.js
Expand Up @@ -3,6 +3,7 @@
var net = require('net');
var mtrude = require('mtrude');
var rtmp = mtrude.rtmp;
var AMF = rtmp.AMF;
var ChunkStream = rtmp.ChunkStream;
var MessageStream = rtmp.MessageStream;
var Application = rtmp.Application;
Expand All @@ -21,11 +22,13 @@ function workWithChunkStream(chunkStream) {

function main() {
var optimist = require('optimist')
.usage('Usage: $0 [--debug] [in [out]]')
.boolean('debug')
.usage('Usage: $0 [--debugamf] [--debugmessage] [in [out]]')
.boolean('debugamf')
.boolean('debugmessage')
.boolean('help')
.alias('h', 'help')
.describe('debug', 'Set MessageStream.DBG = true')
.describe('debugamf', 'Set AMF.DBG = true')
.describe('debugmessage', 'Set MessageStream.DBG = true')
.describe('nocolor', 'Don\'t use colors')
;
var argv = optimist.argv;
Expand All @@ -35,7 +38,8 @@ function main() {
return;
}

if (argv.debug) MessageStream.DBG = true;
MessageStream.DBG = !!argv.debugmessage;
AMF.DBG = !!argv.debugamf;

if (argv._.length == 0) {
var server = net.createServer();
Expand Down
2 changes: 1 addition & 1 deletion examples/dumpAmf.js
Expand Up @@ -35,7 +35,7 @@ function main() {
return;
}

if (argv.nocolor) dumpTools.dontColor();
if (argv.nocolor) utils.dontColor();

if (argv.debug) MessageStream.DBG = true;

Expand Down
13 changes: 8 additions & 5 deletions examples/dumpMessages.js
Expand Up @@ -28,7 +28,10 @@ function main() {
return;
}

if (argv.nocolor) dumpTools.dontColor();
if (argv.nocolor) utils.dontColor();

// todo PING shoulg also dump ti (it's always the same but it is nice to show
// that).

console.log('Dump format:\n'
+ 'MSG : %s %s %s %s %s:%s %s\n'
Expand Down Expand Up @@ -66,10 +69,10 @@ function main() {
dumpMessageStream(messageStream);
}

var dump8 = dumpTools.dump8;
var hex2 = dumpTools.hex2;
var hex6 = dumpTools.hex6;
var ascii8 = dumpTools.ascii8;
var dump8 = utils.dump8;
var hex2 = utils.hex2;
var hex6 = utils.hex6;
var ascii8 = utils.ascii8;

function dumpMessageStream(messageStream) {
messageStream.on('error', function(errorMessage) {
Expand Down
35 changes: 22 additions & 13 deletions lib/rtmp/AMF.js
Expand Up @@ -2,10 +2,13 @@

var util = require('util');
var assert = require('assert');
var utils = require('../utils');
var BufferChain = require('../BufferChain');

var AMF = module.exports = {};

AMF.DBG = false;

AMF.deserializeZ = function(data, emitter) {
if (data.ptr == null) data.ptr = 0;

Expand All @@ -22,7 +25,7 @@ AMF.deserializeZ = function(data, emitter) {
return values;
}

AMF.amfZMarkers = {
var a0 = AMF.amfZMarkers = {
NUMBER: 0x00,
BOOLEAN: 0x01,
STRING: 0x02,
Expand All @@ -41,9 +44,7 @@ AMF.amfZMarkers = {
XML_DOCUMENT: 0x0f,
TYPED_OBJECT: 0x10,
AVMPLUS: 0x11, // switch to AMF3
}

var a0 = AMF.amfZMarkers;
};

function amfZAny(data) {
var marker = data.readUInt8(data.ptr++);
Expand Down Expand Up @@ -77,7 +78,7 @@ function amfZObject(data) {
var key = amfZString(data);
if (key == '') {
var marker = data.readUInt8(data.ptr++);
if (marker != AMF.amfZMarkers.OBJECT_END) throw new Error(
if (marker != a0.OBJECT_END) throw new Error(
'Bad end-of-object marker 0x%s != 0x9', marker.toString(16));
break;
}
Expand Down Expand Up @@ -111,20 +112,24 @@ AMF.serializeZ = function() {
var serBuffer = { buffer: new Buffer(200), ptr: 0 };
for (var i = 0; i < arguments.length; i++)
serZAny(serBuffer, arguments[i]);
var buffer = serBuffer.buffer.slice(0, serBuffer.ptr);

dbg(hexdump(serBuffer.buffer));
dbg(utils.hexDump(buffer));

return serBuffer.buffer;
return buffer;
}

function dbg() { if (AMF.DBG) console.log.apply(console, arguments) }


function serZAny(serBuffer, value) {
dbgSerBuffer('Any', serBuffer);
if (value === null) return serZNull(serBuffer);
if (value === void 0) return serZUndefined(serBuffer);
if (value === true) return serZTrue(serBuffer);
if (value === false) return serZFalse(serBuffer);
if (typeof value == 'number') return serZNumber(serBuffer, value);
if (typeof value == 'string') return serZString(serBuffer, value);
if (typeof value == 'string') return serZString(serBuffer, value, true);
if (Array.isArray(value)) return serZArray(serBuffer, value);
if (typeof value == 'object') return serZObject(serBuffer, value);
throw new Error('value type not supported');
Expand All @@ -138,7 +143,7 @@ function dbgSerBuffer(text, serBuffer, value) {
return s;
}
var at = ' @ ' + serBuffer.ptr + ' / ' + serBuffer.buffer.length;
console.log('serZ' + text + spaces(20 - text.length)
dbg('serZ' + text + spaces(20 - text.length)
+ at + spaces(16 - at.length)
+ (value == null ? '' : util.inspect(value)));
}
Expand Down Expand Up @@ -173,8 +178,8 @@ function serZNumber(serBuffer, value) {
dbgSerBuffer('Number', serBuffer, value);
maybeExpand(serBuffer, 9);

serBuffer.buffer.writeUInt8(a0.NUMBER, serBuffer.ptr);
serBuffer.buffer.writeDoubleBE(value, serBuffer.ptr + 1);
serBuffer.buffer.writeUInt8(a0.NUMBER, serBuffer.ptr++);
serBuffer.buffer.writeDoubleBE(value, serBuffer.ptr);
serBuffer.ptr += 8;
}

Expand Down Expand Up @@ -204,6 +209,9 @@ function serZArray(serBuffer, value) {

function serZObject(serBuffer, value) {
dbgSerBuffer('Object', serBuffer, {keys: Object.keys(value).join(' ')});
maybeExpand(serBuffer, 1);
serBuffer.buffer.writeUInt8(a0.OBJECT, serBuffer.ptr++);

for (var key in value) {
serZString(serBuffer, key, false);
serZAny(serBuffer, value[key]);
Expand Down Expand Up @@ -240,7 +248,7 @@ AMF.deserialize3 = function(data, emitter) {
}


AMF.amf3Markers = {
var a3 = AMF.amf3Markers = {
UNDEFINED: 0x00,
NULL: 0x01,
FALSE: 0x02,
Expand All @@ -257,7 +265,6 @@ AMF.amf3Markers = {
};

function amf3Any(data) {
var a3 = AMF.amf3Markers;
var marker = data.readUInt8(data.ptr++);
var ptr = data.ptr;

Expand Down Expand Up @@ -345,6 +352,8 @@ function amf3Object(data) {
}
}

Object.seal(AMF);




Expand Down
6 changes: 2 additions & 4 deletions lib/rtmp/Application.js
Expand Up @@ -31,12 +31,10 @@ p.handleMessage = function(message) {
if (command[0] == 'connect') {
var ok = this.connect ? this.connect(message) : true;
if (ok) this.messageStream.send(types.INVOKE, message.msid, message.csid,
new Date().getTime(), AMF.serializeZ(['_result', 1,
new Date().getTime(), AMF.serializeZ('_result', 1,
{ fmsVer: 'FMS/3,0,1,123', capabilities: 31 },
{ level: 'status', code: 'NetConnection.Connect.Success',
description: 'Connection succeeded', objectEncoding: 3 }
])
);
description: 'Connection succeeded', objectEncoding: 3 }));
return;
}
}
Expand Down
46 changes: 39 additions & 7 deletions lib/utils.js
@@ -1,5 +1,6 @@
"use strict";

require('colors');
var fs = require('fs');

var asSocket = exports.asSocket = function(incoming, outgoing, icb, ocb) {
Expand Down Expand Up @@ -40,21 +41,52 @@ var dump8 = exports.dump8 = function(data) {
return b(0) + b(1) + b(2) + b(3) + ' ' + b(4) + b(5) + b(6) + b(7);
}

var hex2 = exports.hex2 = function(byte) {
function hex1(nybble) { return "0123456789abcdef"[nybble & 0xf]; }
return hex1(byte >> 4) + hex1(byte);
var hex1 = exports.hex1 = function(nybble) {
return "0123456789abcdef"[nybble & 0xf];
}

var hex2 = exports.hex2 = function(int8) {
return hex1(int8 >> 4) + hex1(int8);
}

var hex3 = exports.hex3 = function(int12) {
return hex2(int12 >> 4) + hex1(int12);
}

var hex6 = exports.hex6 = function(int24) {
return hex2(int24 >> 16) + hex2(int24 >> 8) + hex2(int24);
}

var ascii8 = exports.ascii8 = function(data) {
var ascii8 = exports.ascii8 = function(data, ptr) {
if (ptr == null) ptr = 0;
function asChar(b) { return String.fromCharCode(b); }
function ascii(b) { return b > 31 && b < 128 ? asChar(b).yellow : '·'.black; }
function b(i) { return i < data.length ? ascii(data.readUInt8(i)) : '·'.white; }
function ascii(b) {
return b > 31 && b < 128 ? asChar(b).yellow : '·'.white;
}
function b(i) {
return i + ptr < data.length ? ascii(data.readUInt8(i + ptr)) : ' ';
}

return b(0) + b(1) + b(2) + b(3) + ' ' + b(4) + b(5) + b(6) + b(7);
return b(0) + b(1) + b(2) + b(3) + b(4) + b(5) + b(6) + b(7);
}

var hexDump = exports.hexDump = function(data, indent, max) {
function b(i) { return i < data.length ? hex2(data.readUInt8(i)) : ' '; }

indent = indent || '';
max = max || 4096;
var length = Math.min(max, data.length);
var s = '';
for (var i = 0; i < length; i += 16) {
s += indent + hex6(i) + ' ';
for (var j = 0; j < 16; j += 4) {
s += (b(i + j) + ' ' + b(i + j + 1) + ' '
+ b(i + j + 2) + ' ' + b(i + j + 3)).blue + ' ';
}
s += '|' + ascii8(data, i) + ascii8(data, i + 8) + '|\n';
}

return s;
}

var dontColor = exports.dontColor = function() {
Expand Down

0 comments on commit 8746aca

Please sign in to comment.