Skip to content

Commit

Permalink
Passing current cursor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Jul 2, 2014
1 parent 61a1dcd commit d399f7d
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 181 deletions.
119 changes: 73 additions & 46 deletions lib/cursor.js
Expand Up @@ -23,6 +23,12 @@ var Cursor = function(bson, ns, cmd, connection, callbacks, options) {
var tailableRetryInterval = options.tailableRetryInterval || 500;
var currentNumberOfRetries = numberOfRetries;

// connection.once('close', function() {
// console.log("++++++++++++++++++++++++++++++++++++++++++++++ CONNECTION CLOSE")
// console.log("++++++++++++++++++++++++++++++++++++++++++++++ CONNECTION CLOSE")
// console.log("++++++++++++++++++++++++++++++++++++++++++++++ CONNECTION CLOSE")
// })

// Set up
Readable.call(this, {objectMode: true});

Expand All @@ -32,26 +38,23 @@ var Cursor = function(bson, ns, cmd, connection, callbacks, options) {
get: function() { return cmd.orderby; }
});

// Db close listener
var closeListener = function() {
state = Cursor.CLOSED;
self.kill();
// // If we have an end event emit close for backward comp
// self.once('end', function() {
// self.emit('close')
// });

if(self.listeners('close').length > 0) {
self.emit('close');
}
}
// try {
// self.once('readable', function() {
// // self._read(0);
// });

// If we have an end event emit close for backward comp
self.once('end', function() {
self.emit('close')
});

// Handle a connection close
options.db.once('close', closeListener);
// } catch (err) {
// console.dir(err)
// }

this.nextObject = function(options, callback) {
if('function' === typeof options) callback = options, options = {};
if(state == Cursor.CLOSED || self.isDead()) return callback(new MongoError("Cursor is closed"));
if(state == Cursor.INIT && cmd.orderby) {
try {
cmd.orderby = formattedOrderClause(cmd.orderby);
Expand All @@ -76,17 +79,11 @@ var Cursor = function(bson, ns, cmd, connection, callbacks, options) {
});
}

var eachObject = function(callback) {
return function() {
return eachObject
}
}

// Trampoline emptying the number of retrieved items
// without incurring a nextTick operation
var loop = function(self, callback) {
// No more items we are done
if(self.bufferedDocuments().length == 0) return;
if(self.bufferedCount() == 0) return;
// Get the next document
self.next(callback);
// Loop
Expand All @@ -95,14 +92,15 @@ var Cursor = function(bson, ns, cmd, connection, callbacks, options) {

this.each = function(callback) {
if(!callback) throw new MongoError('callback is mandatory');
if(state == Cursor.CLOSED) return callback(new MongoError("Cursor is closed"), null);
if(state == Cursor.CLOSED || self.isDead()) return callback(new MongoError("Cursor is closed"), null);
// Trampoline all the entries
if(self.bufferedDocuments().length > 0) {
if(self.bufferedCount() > 0) {
while(fn = loop(self, callback)) fn(self, callback);
self.each(callback);
} else {
self.next(function(err, item) {
if(err) return callback(err);
if(item == null) return callback(null, null);
callback(null, item);
self.each(callback);
})
Expand All @@ -123,19 +121,29 @@ var Cursor = function(bson, ns, cmd, connection, callbacks, options) {
this.toArray = function(callback) {
if(!callback) throw new MongoError('callback is mandatory');
if(options.tailable) return callback(new MongoError("Tailable cursor cannot be converted to array"), null);
if(state == Cursor.CLOSED) return callback(new MongoError("Cursor is closed"), null);
if(state == Cursor.CLOSED || self.isDead()) return callback(new MongoError("Cursor is closed"), null);
var items = [];

// Fetch all the documents
var fetchDocs = function() {
// Get the first doc
self.next(function(err, doc) {
// console.log("+++++++++++++++++++++++++++++++++++++ " + self.bufferedCount())
// console.log(items.length)
// console.dir(doc)

if(err) return callback(err);
if(doc == null) return callback(null, items);
if(doc == null) {
// console.log(items.length)
state = Cursor.CLOSED;
return callback(null, items);
}

// Add doc to items
items.push(doc)
// Get all buffered objects
if(self.bufferedDocuments().length > 0) {
items = items.concat(self.bufferedDocuments().splice(0));
if(self.bufferedCount() > 0) {
// console.log("+++++++++++++++++++++++++++++++++++++ eee " + self.bufferedCount())
items = items.concat(self.readBufferedDocuments(self.bufferedCount()));
}

// Attempt a fetch
Expand All @@ -146,7 +154,9 @@ var Cursor = function(bson, ns, cmd, connection, callbacks, options) {
fetchDocs();
}

this.count = function(applySkipLimit, callback) {
this.count = function(applySkipLimit, options, callback) {
if(typeof options == 'function') callback = options, options = {};
options = options || {};
if(cmd.query == null) callback(new MongoError("count can only be used with find command"));
if(typeof applySkipLimit == 'function') {
callback = applySkipLimit;
Expand All @@ -155,8 +165,8 @@ var Cursor = function(bson, ns, cmd, connection, callbacks, options) {

var options = {};
if(applySkipLimit) {
if(typeof this.skipValue == 'number') options.skip = this.skipValue;
if(typeof this.limitValue == 'number') options.limit = this.limitValue;
if(typeof this.cursorSkip == 'number') options.skip = this.cursorSkip;
if(typeof this.cursorLimit == 'number') options.limit = this.cursorLimit;
}

// If maxTimeMS set
Expand All @@ -168,6 +178,11 @@ var Cursor = function(bson, ns, cmd, connection, callbacks, options) {
, 'fields': null
}

// Merge in any options
if(options.skip) command.skip = options.skip;
if(options.limit) command.limit = options.limit;
if(options.hint) command.hint = options.hint;

// Build Query object
var query = new Query(bson, f("%s.$cmd", ns.split('.').shift()), command, {
numberToSkip: 0, numberToReturn: -1
Expand All @@ -186,7 +201,7 @@ var Cursor = function(bson, ns, cmd, connection, callbacks, options) {

this.limit = function(value) {
if(options.tailable) throw new Error("Tailable cursor doesn't support limit");
if(state == Cursor.OPEN || state == Cursor.CLOSED) throw new Error("Cursor is closed");
if(state == Cursor.OPEN || state == Cursor.CLOSED || self.isDead()) throw new Error("Cursor is closed");
if(typeof value != 'number') throw new Error("limit requires an integer");
cmd.limit = value;
this.cursorLimit = value;
Expand All @@ -195,16 +210,18 @@ var Cursor = function(bson, ns, cmd, connection, callbacks, options) {

this.skip = function(value) {
if(options.tailable) throw new Error("Tailable cursor doesn't support skip");
if(state == Cursor.OPEN || state == Cursor.CLOSED) throw new Error("Cursor is closed");
if(state == Cursor.OPEN || state == Cursor.CLOSED || self.isDead()) throw new Error("Cursor is closed");
if(typeof value != 'number') throw new Error("skip requires an integer");
cmd.skip = value;
this.cursorSkip = value;
return self;
}

this.batchSize = function(value) {
// console.log("CURRENT CURSOR STATE :: " + state + " :: " + connection.isConnected())

if(options.tailable) throw new Error("Tailable cursor doesn't support limit");
if(state == Cursor.CLOSED) throw new Error("Cursor is closed");
if(state == Cursor.CLOSED || self.isDead()) throw new Error("Cursor is closed");
if(typeof value != 'number') throw new Error("batchSize requires an integer");
cmd.batchSize = value;
this.cursorBatchSize = value;
Expand All @@ -213,7 +230,7 @@ var Cursor = function(bson, ns, cmd, connection, callbacks, options) {

this.sort = function(keyOrList, direction) {
if(options.tailable) throw new MongoError("Tailable cursor doesn't support sorting");
if(state == Cursor.CLOSED || state == Cursor.OPEN) throw new MongoError("Cursor is closed");
if(state == Cursor.CLOSED || state == Cursor.OPEN || self.isDead()) throw new MongoError("Cursor is closed");
var order = keyOrList;

if(direction != null) {
Expand All @@ -225,13 +242,13 @@ var Cursor = function(bson, ns, cmd, connection, callbacks, options) {
}

this.close = function(callback) {
options.db.removeListener('close', closeListener);
state = Cursor.CLOSED;
this.emit('close');
this.kill(function() {
if(callback) return callback(null, self);
return self;
});
// Kill the cursor
this.kill();
// Emit the close event for the cursor
this.emit('close');
// Callback if provided
if(callback) return callback(null, self);
}

this.isClosed = function() {
Expand All @@ -256,23 +273,33 @@ var Cursor = function(bson, ns, cmd, connection, callbacks, options) {
}

this._read = function(n) {
if(state == Cursor.CLOSED) {
options.db.removeListener('close', closeListener);
// console.log("+++++++++++++++++++++++++++++++++++++++++ _READ 0")
if(state == Cursor.CLOSED || self.isDead()) {
// options.db.removeListener('close', closeListener);
return self.push(null);
}

// console.log("+++++++++++++++++++++++++++++++++++++++++ _READ 1")
// // Zero read
// if(n == 0) {
// console.log("ZERO READ")
// }

// Get the next item
self.nextObject(function(err, result) {
// console.log("+++++++++++++++++++++++++++++++++++++++++ NEXT STREAM")
// console.dir(err)
// console.dir(result)
if(err) {
options.db.removeListener('close', closeListener);
self.emit('error', err);
self.destroy();
return self.push(null);
}

// If we provided a transformation method
if(typeof streamOptions.transform == 'function' && result != null) {
return self.push(streamOptions.transform(result));
}

// Return the result
self.push(result);
});
Expand Down

0 comments on commit d399f7d

Please sign in to comment.