Skip to content
Browse files

Use Buffer#copy instead of inspecting every byte, and make the code a…

… bit easier otherwise by using closures to encapsulate the parser state.
  • Loading branch information...
1 parent c136497 commit 8e6c757632cacca4e61ac5a877c5c8f3a16653c4 @squaremo squaremo committed with
Showing with 81 additions and 93 deletions.
  1. +81 −93 amqp.js
View
174 amqp.js
@@ -136,8 +136,85 @@ function AMQPParser (version, type) {
}
})(); // end anon scope
- this.frameHeader = new Buffer(7);
- this.frameHeader.used = 0;
+ var frameHeader = new Buffer(7);
+ frameHeader.used = 0;
+ var frameBuffer, frameType, frameChannel;
+
+ var self = this;
+
+ function header(data) {
+ var fh = frameHeader;
+ var needed = fh.length - fh.used;
+ data.copy(fh, fh.used, 0, fh.length);
+ fh.used += data.length; // sloppy
+ if (fh.used >= fh.length) {
+ fh.read = 0;
+ frameType = fh[fh.read++];
+ frameChannel = parseInt(fh, 2);
+ var frameSize = parseInt(fh, 4);
+ fh.used = 0; // for reuse
+ if (frameSize > maxFrameBuffer) {
+ self.throwError("Oversized frame " + frameSize);
+ }
+ frameBuffer = new Buffer(frameSize);
+ frameBuffer.used = 0;
+ return frame(data.slice(needed));
+ }
+ else { // need more!
+ return header;
+ }
+ }
+
+ function frame(data) {
+ var fb = frameBuffer;
+ var needed = fb.length - fb.used;
+ data.copy(fb, 0, fb.used, fb.length);
+ if (data.length > needed) {
+ return frameEnd(data.slice(needed));
+ }
+ else if (data.length == needed) {
+ return frameEnd;
+ }
+ else {
+ return frame;
+ }
+ }
+
+ function frameEnd(data) {
+ if (data.length > 0) {
+ if (data[0] === 206) {
+ switch (frameType) {
+ case 1:
+ self._parseMethodFrame(frameChannel, frameBuffer);
+ break;
+ case 2:
+ self._parseHeaderFrame(frameChannel, frameBuffer);
+ break;
+ case 3:
+ if (self.onContent) {
+ self.onContent(frameChannel, frameBuffer);
+ }
+ break;
+ case 8:
+ debug("hearbeat");
+ if (self.onHeartBeat) self.onHeartBeat();
+ break;
+ default:
+ self.throwError("Unhandled frame type " + frameType);
+ break;
+ }
+ return header(data.slice(1));
+ }
+ else {
+ self.throwError("Missing frame end marker");
+ }
+ }
+ else {
+ return frameEnd;
+ }
+ }
+
+ self.parse = header;
}
// If there's an error in the parser, call the onError handler or throw
@@ -150,98 +227,9 @@ AMQPParser.prototype.throwError = function (error) {
// parsing.
AMQPParser.prototype.execute = function (data) {
// This function only deals with dismantling and buffering the frames.
- // It delegats to other functions for parsing the frame-body.
+ // It delegates to other functions for parsing the frame-body.
debug('execute: ' + data.toString());
- for (var i = 0; i < data.length; i++) {
- switch (this.state) {
- case 'frameHeader':
- // Here we buffer the frame header. Remember, this is a fully
- // interruptible parser - it could be (although unlikely)
- // that we receive only several octets of the frame header
- // in one packet.
- this.frameHeader[this.frameHeader.used++] = data[i];
-
- if (this.frameHeader.used == this.frameHeader.length) {
- // Finished buffering the frame header - parse it
- //var h = this.frameHeader.unpack("oonN", 0);
-
- this.frameHeader.read = 0;
- this.frameType = this.frameHeader[this.frameHeader.read++];
- this.frameChannel = parseInt(this.frameHeader, 2);
- this.frameSize = parseInt(this.frameHeader, 4);
-
- this.frameHeader.used = 0; // for reuse
-
- debug("got frame: " + JSON.stringify([ this.frameType
- , this.frameChannel
- , this.frameSize
- ]));
-
- if (this.frameSize > maxFrameBuffer) {
- this.throwError("Oversized frame " + this.frameSize);
- }
-
- // TODO use a free list and keep a bunch of 8k buffers around
- this.frameBuffer = new Buffer(this.frameSize);
- this.frameBuffer.used = 0;
- this.state = 'bufferFrame';
- }
- break;
-
- case 'bufferFrame':
- // Buffer the entire frame. I would love to avoid this, but doing
- // otherwise seems to be extremely painful.
-
- // Copy the incoming data byte-by-byte to the buffer.
- // FIXME This is slow! Can be improved with a memcpy binding.
- if(this.frameSize > 0)
- this.frameBuffer[this.frameBuffer.used++] = data[i];
- else
- i--; // the frame ending is actuall this frame (rewind 1)
-
- if (this.frameBuffer.used == this.frameSize) {
- // Finished buffering the frame. Parse the frame.
- switch (this.frameType) {
- case 1:
- this._parseMethodFrame(this.frameChannel, this.frameBuffer);
- break;
-
- case 2:
- this._parseHeaderFrame(this.frameChannel, this.frameBuffer);
- break;
-
- case 3:
- if (this.onContent) {
- this.onContent(this.frameChannel, this.frameBuffer);
- }
- break;
-
- case 8:
- debug("hearbeat");
- if (this.onHeartBeat) this.onHeartBeat();
- break;
-
- default:
- this.throwError("Unhandled frame type " + this.frameType);
- break;
- }
- this.state = 'frameEnd';
- }
- break;
-
- case 'frameEnd':
- // Frames are terminated by a single octet.
- if (data[i] != 206 /* constants.frameEnd */) {
- debug('data[' + i + '] = ' + data[i].toString(16));
- debug('data = ' + data.toString());
- debug('frameHeader: ' + this.frameHeader.toString());
- debug('frameBuffer: ' + this.frameBuffer.toString());
- this.throwError("Oversized frame");
- }
- this.state = 'frameHeader';
- break;
- }
- }
+ this.parse = this.parse(data);
};

0 comments on commit 8e6c757

Please sign in to comment.
Something went wrong with that request. Please try again.