Skip to content
This repository

Fix #84 - slice outgoing messages according to frameMax #127

Closed
wants to merge 1 commit into from

2 participants

Marek Theo Schlossnagle
Marek

Content frames are ignoring the frame max setting, and just send the entire body as one frame. (see #84)

This commit does three things:
1) amends maxFrameBuffer to the value advertised by the server
2) extracts main logic from _sendBody function to a _bodyToBuffer
3) _sendBody is rewritten in a simpler fashion, assuming it only deals with buffers (non-buffers are being converted to a buffer in _bodyToBuffer)

I don't know how to run the tests from test directory, I verified this code manually.

Theo Schlossnagle postwait commented on the diff September 18, 2012
@@ -983,6 +983,10 @@ Connection.prototype._onMethod = function (channel, method, args) {
983 983
 
984 984
     // 4. The server responds with a connectionTune request
985 985
     case methods.connectionTune:
  986
+      if (args.frameMax) {
  987
+          debug("tweaking maxFrameBuffer to " + args.frameMax);
  988
+          maxFrameBuffer = args.frameMax;
  989
+      }
2
Theo Schlossnagle Owner

Please split this change into its own commit and pull request.

Marek
majek added a note September 26, 2012

The whole point of this pull request is to adhere to frameMax advertised by the server. Additionally frameMax advertised by the server by default is equal to the predefined maxFrameBuffer (ie: 128KiB), so this line is effectively NOP for rabbit with default settings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Theo Schlossnagle
Owner

Have you benchmarked the code there to make sure you had no regressions in performance?

Marek

Simple benchmark: https://gist.github.com/3787602

On default (80fc049):
$ time node t.js 1 > 0.408s
$ time node t.js 2 > 0.348s
$ time node t.js 3 > 0.349s

On my branch (1b322a5) + hardcoded max frame size for outgoing messages to 1024 bytes (ie: very small frame):
$ time node t.js 1 > 0.458s
$ time node t.js 2 > 0.391s
$ time node t.js 3 > 0.395s

I can't benchmark it for larger messages / larger number of messages because node-amqp breaks when the outgoing buffer is large (for my benchmark - it looks like all messages are being sent to meta queue rather than test)

Theo Schlossnagle
Owner

merged

Theo Schlossnagle postwait closed this January 08, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 1 unique commit by 1 author.

Sep 18, 2012
Marek Fix #84 - slice outgoing messages according to frameMax advertised by…
… the server
1b322a5
This page is out of date. Refresh to see the latest.

Showing 1 changed file with 34 additions and 46 deletions. Show diff stats Hide diff stats

  1. 80  amqp.js
80  amqp.js
@@ -983,6 +983,10 @@ Connection.prototype._onMethod = function (channel, method, args) {
983 983
 
984 984
     // 4. The server responds with a connectionTune request
985 985
     case methods.connectionTune:
  986
+      if (args.frameMax) {
  987
+          debug("tweaking maxFrameBuffer to " + args.frameMax);
  988
+          maxFrameBuffer = args.frameMax;
  989
+      }
986 990
       // 5. We respond with connectionTuneOk
987 991
       this._sendMethod(0, methods.connectionTuneOk,
988 992
           { channelMax: 0
@@ -1142,67 +1146,51 @@ function sendHeader (connection, channel, size, properties) {
1142 1146
 
1143 1147
 
1144 1148
 Connection.prototype._sendBody = function (channel, body, properties) {
1145  
-  // Handles 3 cases
1146  
-  // - body is utf8 string
1147  
-  // - body is instance of Buffer
1148  
-  // - body is an object and its JSON representation is sent
1149  
-  // Does not handle the case for streaming bodies.
1150  
-  if (typeof(body) == 'string') {
1151  
-    var length = Buffer.byteLength(body);
1152  
-    //debug('send message length ' + length);
  1149
+  var r = this._bodyToBuffer(body);
  1150
+  var props = r[0], buffer = r[1];
1153 1151
 
1154  
-    sendHeader(this, channel, length, properties);
  1152
+  properties = mixin(props, properties);
1155 1153
 
1156  
-    //debug('header sent');
  1154
+  sendHeader(this, channel, buffer.length, properties);
1157 1155
 
1158  
-    var b = new Buffer(7+length+1);
  1156
+  var pos = 0, len = buffer.length;
  1157
+  while (len > 0) {
  1158
+    var sz = len < maxFrameBuffer ? len : maxFrameBuffer;
  1159
+
  1160
+    var b = new Buffer(7 + sz + 1);
1159 1161
     b.used = 0;
1160 1162
     b[b.used++] = 3; // constants.frameBody
1161 1163
     serializeInt(b, 2, channel);
1162  
-    serializeInt(b, 4, length);
1163  
-
1164  
-    b.write(body, b.used, 'utf8');
1165  
-    b.used += length;
1166  
-
  1164
+    serializeInt(b, 4, sz);
  1165
+    buffer.copy(b, b.used, pos, pos+sz);
  1166
+    b.used += sz;
1167 1167
     b[b.used++] = 206; // constants.frameEnd;
1168  
-    return this.write(b);
  1168
+    this.write(b);
1169 1169
 
1170  
-    //debug('body sent: ' + JSON.stringify(b));
  1170
+    len -= sz;
  1171
+    pos += sz;
  1172
+  }
  1173
+  return;
  1174
+}
1171 1175
 
  1176
+Connection.prototype._bodyToBuffer = function (body) {
  1177
+  // Handles 3 cases
  1178
+  // - body is utf8 string
  1179
+  // - body is instance of Buffer
  1180
+  // - body is an object and its JSON representation is sent
  1181
+  // Does not handle the case for streaming bodies.
  1182
+  // Returns buffer.
  1183
+  if (typeof(body) == 'string') {
  1184
+    return [null, new Buffer(body, 'utf8')];
1172 1185
   } else if (body instanceof Buffer) {
1173  
-    sendHeader(this, channel, body.length, properties);
1174  
-
1175  
-    var b = new Buffer(7);
1176  
-    b.used = 0;
1177  
-    b[b.used++] = 3; // constants.frameBody
1178  
-    serializeInt(b, 2, channel);
1179  
-    serializeInt(b, 4, body.length);
1180  
-    this.write(b);
1181  
-    this.write(body);
1182  
-
1183  
-    return this.write(new Buffer([206])); // frameEnd
  1186
+    return [null, body];
1184 1187
   } else {
1185 1188
     var jsonBody = JSON.stringify(body);
1186  
-    var length = Buffer.byteLength(jsonBody);
1187 1189
 
1188 1190
     debug('sending json: ' + jsonBody);
1189 1191
 
1190  
-    properties = mixin({contentType: 'application/json' }, properties);
1191  
-
1192  
-    sendHeader(this, channel, length, properties);
1193  
-
1194  
-    var b = new Buffer(7+length+1);
1195  
-    b.used = 0;
1196  
-
1197  
-    b[b.used++] = 3; // constants.frameBody
1198  
-    serializeInt(b, 2, channel);
1199  
-    serializeInt(b, 4, length);
1200  
-
1201  
-    b.write(jsonBody, b.used, 'utf8');
1202  
-    b.used += length;
1203  
-
1204  
-    b[b.used++] = 206; // constants.frameEnd;
1205  
-    return this.write(b);
  1192
+    var props = {contentType: 'application/json'};
  1193
+    return [props, new Buffer(jsonBody, 'utf8')];
1206 1194
   }
1207 1195
 };
1208 1196
 
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.