Cursor ReadStream support #458

Merged
merged 2 commits into from Dec 22, 2011
Jump to file or symbol
Failed to load files and symbols.
+395 −1
Diff settings

Always

Just for now

View
@@ -32,6 +32,9 @@ test_nodeunit_pure:
@echo "\n == Execute Test Suite using Pure JS BSON Parser == "
@$(NODEUNIT) test/ test/gridstore test/bson
+test_js:
+ @$(NODEUNIT) $(TESTS)
+
test_nodeunit_replicaset_pure:
@echo "\n == Execute Test Suite using Pure JS BSON Parser == "
@$(NODEUNIT) test/replicaset
@@ -56,4 +59,4 @@ clean:
rm ./external-libs/bson/bson.node
rm -r ./external-libs/bson/build
-.PHONY: total
+.PHONY: total
View
@@ -2,6 +2,7 @@ var QueryCommand = require('./commands/query_command').QueryCommand,
GetMoreCommand = require('./commands/get_more_command').GetMoreCommand,
KillCursorCommand = require('./commands/kill_cursor_command').KillCursorCommand,
Long = require('./goog/math/long').Long,
+ CursorStream = require('./cursorstream'),
debug = require('util').debug,
inspect = require('util').inspect;
@@ -652,6 +653,14 @@ Cursor.prototype.streamRecords = function(options) {
return stream;
};
+/**
+ * Returns a Node ReadStream interface for this cursor.
+ */
+
+Cursor.prototype.stream = function stream () {
+ return new CursorStream(this);
+}
+
/**
* Close this cursor.
*
View
@@ -0,0 +1,135 @@
+
+/**
+ * Module dependecies.
+ */
+
+var Stream = require('stream').Stream;
+
+/**
+ * CursorStream
+ *
+ * Returns a stream interface for the `cursor`.
+ *
+ * @param {Cursor} cursor
+ * @return {Stream}
+ */
+
+function CursorStream (cursor) {
+ Stream.call(this);
+
+ this.readable = true;
+ this.paused = false;
+ this._cursor = cursor;
+ this._destroyed = null;
+
+ // give time to hook up events
+ var self = this;
+ process.nextTick(function () {
+ self._init();
+ });
+}
+
+/**
+ * Inherit from Stream
+ * @private
+ */
+
+CursorStream.prototype.__proto__ = Stream.prototype;
+
+/**
+ * Flag stating whether or not this stream is readable.
+ */
+
+CursorStream.prototype.readable;
+
+/**
+ * Flag stating whether or not this stream is paused.
+ */
+
+CursorStream.prototype.paused;
+
+/**
+ * Initialize the cursor.
+ * @private
+ */
+
+CursorStream.prototype._init = function () {
+ if (this._destroyed) return;
+ this._next();
+}
+
+/**
+ * Pull the next document from the cursor.
+ * @private
+ */
+
+CursorStream.prototype._next = function () {
+ if (this.paused || this._destroyed) return;
+
+ var self = this;
+
+ // nextTick is necessary to avoid stack overflows when
+ // dealing with large result sets.
+ process.nextTick(function () {
+ self._cursor.nextObject(function (err, doc) {
+ self._onNextObject(err, doc);
+ });
+ });
+}
+
+/**
+ * Handle each document as its returned from the cursor.
+ * @private
+ */
+
+CursorStream.prototype._onNextObject = function (err, doc) {
+ if (err) return this.destroy(err);
+
+ // when doc is null we hit the end of the cursor
+ if (!doc) return this.destroy();
+
+ this.emit('data', doc);
+ this._next();
+}
+
+/**
+ * Pauses this stream.
+ */
+
+CursorStream.prototype.pause = function () {
+ this.paused = true;
+}
+
+/**
+ * Resumes this stream.
+ */
+
+CursorStream.prototype.resume = function () {
+ this.paused = false;
+ this._next();
+}
+
+/**
+ * Destroys the stream, closing the underlying
+ * cursor. No more events will be emitted.
+ */
+
+CursorStream.prototype.destroy = function (err) {
+ if (this._destroyed) return;
+ this._destroyed = true;
+ this.readable = false;
+
+ this._cursor.close();
+
+ if (err) {
+ this.emit('error', err);
+ }
+
+ this.emit('close');
+}
+
+// TODO - maybe implement the raw option to pass binary?
+//CursorStream.prototype.setEncoding = function () {
+//}
+
+module.exports = exports = CursorStream;
Oops, something went wrong.