Skip to content
This repository has been archived by the owner on Oct 31, 2021. It is now read-only.

Commit

Permalink
consuming more responsibly
Browse files Browse the repository at this point in the history
  • Loading branch information
tenorviol committed Jul 2, 2011
1 parent ae457c9 commit 36837b6
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
13 changes: 8 additions & 5 deletions lib/mongodb/MongoStream.js
@@ -1,5 +1,7 @@
var Stream = require('stream').Stream;
var util = require('util');
var BinaryParser = require('mongodb').BinaryParser;
var BSON = require('mongodb').BSONPure.BSON;

module.exports = MongoStream;

Expand All @@ -16,18 +18,19 @@ MongoStream.prototype.write = function (buffer) {
try {
this.parse();
} catch (err) {
console.log(err.stack);
console.log('waiting');
if (err !== 'waiting') {
// if this is not just a 'wait for it' throw, rethrow
throw err;
}
}
};

MongoStream.prototype.consume = function(size) {
var result = this.current;
var needed = size - result.length;
console.log('ho');
while (needed && this.buffers.length) {
var buffer = this.buffers[0];
if (buffer.length - this.marker > needed) {
if (buffer.length - this.marker < needed) {
result += buffer.toString('binary', this.marker);
this.buffers.shift();
this.marker = 0;
Expand All @@ -40,7 +43,7 @@ MongoStream.prototype.consume = function(size) {

if (needed) {
this.current = result;
//throw new Error('waiting');
throw 'waiting';
}

this.current = '';
Expand Down
9 changes: 8 additions & 1 deletion test/stream.test.js
Expand Up @@ -6,5 +6,12 @@ stream.on('message', function(msg) {
console.log(msg);
});

var buffer = new Buffer(['f','o','o','d','f','o','o','d','f','o','o','d','f','o','o','d','f','o','o','d'])
var buffer = new Buffer([
0x39, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0xd4, 0x07, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x74, 0x65,
0x73, 0x74, 0x2e, 0x24, 0x63, 0x6d, 0x64, 0x00, 0x00, 0x00, 0x00,
0x00, 0xff, 0xff, 0xff, 0xff, 0x13, 0x00, 0x00, 0x00, 0x10, 0x69,
0x73, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x00, 0x01, 0x00, 0x00,
0x00, 0x00
]);
stream.write(buffer);

0 comments on commit 36837b6

Please sign in to comment.