Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

routine commit of rtmp dev

  • Loading branch information...
commit e79bd2cbf859d425b5085f8407f6e705765ca169 1 parent 3fe87a3
Tim Whitlock authored
23 node-rtmp/RtmpChunk.js
View
@@ -26,6 +26,7 @@ var leParser = bin.parser( false, true ); // little endian binary parser
* Constructor
*/
function RtmpChunk( data ){
+ this.payload = '';
// 6.1.1 chunk basic header - defines stream id and chunk type
var b1 = data.charCodeAt(0);
this.chunkStreamId = b1 & 63; // <- (0-5) 6 bit mask
@@ -68,9 +69,7 @@ function RtmpChunk( data ){
* Inherit properties from an existing Chunk
* @var Array existing chunks in our stream
*/
-RtmpChunk.prototype.inheritStream = function( Stream ){
- // latest chunk in stream should hold all parameters we need
- var Previous = Stream[ Stream.length -1 ];
+RtmpChunk.prototype.inheritPrevious = function( Previous ){
// format:0 should only be the first chunk in a stream
if( this.format === 0 || ! Previous ){
return;
@@ -86,6 +85,9 @@ RtmpChunk.prototype.inheritStream = function( Stream ){
this.messageLen = Previous.messageLen;
this.messageType = Previous.messageType;
}
+ // inherit payload and save memory in previous chunk
+ this.payload += Previous.payload;
+ Previous.payload = null;
}
@@ -128,15 +130,12 @@ RtmpChunk.prototype.parse = function( data ){
if( this.timestamp === 0x00ffffff ){
throw new Error('@todo extended timestamp');
}
-
- // Snip off the message data, and return the next chunk
- //this.message = data.substr( this.offset + this.headerLen, this.messageLen );
- //return data.slice( this.offset + this.headerLen + this.messageLen );
-
- // @todo really unsure about this, I'm getting message length falling short of bytes available, should be the opposite?
- this.message = data.slice( this.offset + this.headerLen );
- return '';
-}
+ // Snip off the payload, @todo 128 bytes at a time
+ this.payload += data.slice( this.offset + this.headerLen );
+};
+
+
+
67 node-rtmp/RtmpConnection.js
View
@@ -22,6 +22,8 @@ function RtmpConnection( socket ){
this.handShake = null;
this.messageStreams = [];
this.chunkStreams = [];
+ // temporary buffer for incomplete packets
+ this.buffer = '';
// listen for data
var Conn = this;
socket.addListener( 'data', function(data){
@@ -55,7 +57,7 @@ function RtmpConnection( socket ){
* Common socket writing function
*/
RtmpConnection.prototype.write = function( data ){
- this.socket.write( data, 'binary' );
+ return this.socket.write( data, 'binary' );
}
@@ -70,52 +72,53 @@ RtmpConnection.prototype.onSocketData = function( data ){
sys.puts('socket.data [length:'+data.length+']');
// complete handshake if not already
if( ! this.handShake ){
+ // wait for full 1537 bytes
+ this.buffer += data;
+ if( this.buffer.length < 1537 ){
+ return;
+ }
sys.puts('# handshake 1');
this.handShake = new RtmpHandshake();
//sys.puts( sys.inspect(this.handShake) );
- response = this.handShake.initialize(data);
+ response = this.handShake.initialize( this.buffer );
+ this.buffer = '';
return this.write( response );
}
if( ! this.handShake.acknowledged ){
+ // wait for at least 1536 bytes
+ this.buffer += data;
+ if( this.buffer.length < 1536 ){
+ return;
+ }
sys.puts('# handshake 2');
- response = this.handShake.acknowledge(data)
+ response = this.handShake.acknowledge( this.buffer );
//sys.puts( sys.inspect(this.handShake) );
this.write( response );
- data = data.slice(1536);
+ data = this.buffer.slice(1536);
+ this.buffer = '';
}
- while( data ){
- sys.puts('# processing chunk, have '+data.length+' bytes');
- sys.puts( utils.hex(data,16) );
- // process a chunk
- var Chunk = new RtmpChunk( data );
- // add to stream and inherit any known properties
- var Stream = this.chunkStreams[Chunk.chunkStreamId];
- if( Stream ){
- Chunk.inheritStream( Stream );
- }
- else {
- Stream = this.chunkStreams[Chunk.chunkStreamId] = [];
- }
- Stream.push( Chunk );
- // parse chunk returning any leftover chunk data
- data = Chunk.parse( data );
- sys.puts( sys.inspect(Chunk) );
+
+ sys.puts('# processing chunk, have '+data.length+' bytes');
+ //sys.puts( utils.hex(data,16) );
+ // process a chunk
+ var Chunk = new RtmpChunk( data );
+ // add to stream and inherit any known properties
+ var Previous = this.chunkStreams[Chunk.chunkStreamId];
+ if( Previous ){
+ Chunk.inheritPrevious( Previous );
+ }
+ else {
+ this.chunkStreams[Chunk.chunkStreamId] = Previous;
+ }
+ Chunk.parse( data );
+ if( Chunk.payload.length >= Chunk.messageLen ){
+ sys.puts('# processing message, have '+Chunk.payload.length+'/'+Chunk.messageLen+' bytes');
+ var Msg = new RtmpMessage( Chunk.payload, Chunk.messageLen );
}
}
catch( Er ){
sys.puts( 'Error onSocketData: '+Er.message );
}
- // @todo I AM HERE - testing message parsing with first single message chunk
-
- // @todo getting rogue 0xC3 bytes - something to do with byte allignment?
- if( Chunk.messageLen < Chunk.message.length ){
- sys.puts( Chunk.messageLen +' < '+ Chunk.message.length );
- // hack out 0xC3s?????
- Chunk.message = Chunk.message.split('\xC3').join('');
- }
-
-
- var Msg = new RtmpMessage( Chunk.message );
}
16 node-rtmp/RtmpMessage.js
View
@@ -22,7 +22,7 @@ var leParser = bin.parser( false, true );
/**
* Constructor
*/
-function RtmpMessage( data ){
+function RtmpMessage( data, messageLen ){
// 4.1. messages begin with a type - dictates payload structure
this.type = beParser.toByte( data.slice(0,1) );
/*
@@ -35,12 +35,18 @@ function RtmpMessage( data ){
// type 2: message is an AMF encoded command from the client
// @todo test message type from chunk to determine AMF0/AMF3
if( this.type === 2 ){
- var str = data.slice(1);
- var des = amf.deserializer( str );
+ // AMF payload is separated every 128 bytes by "0xC3"
+ // todo optimize this, and check if needed for other types
+ var message = '', i = 0;
+ while( message.length < messageLen ){
+ message += data.substr( i, 128 );
+ i += 129;
+ }
+ var des = amf.deserializer( message.slice(1) );
var cmd = des.readUTF8( amf.AMF0 );
sys.puts('command = ' + cmd );
- var unknown = des.shiftBytes( 9 );
- sys.puts( utils.hex(unknown) );
+ var unknown = des.shiftBytes( 9 ); // <- ?
+ sys.puts( 'unknown = '+utils.hex(unknown) );
var obj = des.readValue( amf.AMF0 );
sys.puts( sys.inspect(obj) );
}
Please sign in to comment.
Something went wrong with that request. Please try again.