Skip to content

Commit

Permalink
Fixed commit
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Dec 16, 2010
2 parents f1063c6 + 9f68287 commit c786a9a
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 131 deletions.
119 changes: 119 additions & 0 deletions examples/oplog.js
@@ -0,0 +1,119 @@
GLOBAL.DEBUG = true;

sys = require("sys");
test = require("assert");

var Db = require('../lib/mongodb').Db,
Connection = require('../lib/mongodb').Connection,
Server = require('../lib/mongodb').Server,
Cursor = require('../lib/mongodb').Cursor;

var host = process.env['MONGO_NODE_DRIVER_HOST'] != null ? process.env['MONGO_NODE_DRIVER_HOST'] : 'localhost';
var port = process.env['MONGO_NODE_DRIVER_PORT'] != null ? process.env['MONGO_NODE_DRIVER_PORT'] : Connection.DEFAULT_PORT;

Slave = function() {
this.running = false;
this.callbacks = [];
//no native_parser right now (because timestamps)
//no strict mode (because system db signed with $ db.js line 189)
//connect without dbName for querying not only "local" db
sys.puts("Connecting to " + host + ":" + port);
this.db = new Db('', new Server(host, port, {}), {});
}

//start watching
Slave.prototype.start = function() {
var self = this;
if (this.running) return;

this.db.open(function(err, db) {
if (err) {
sys.puts('> MongoSlave error' + err);
process.exit(1);
}

db.collection('local.oplog.$main', function(err, collection) {
if (! collection) {
sys.puts('> MongoSlave - local.oplog.$main not found');
self.stop();
return false;
}

process.on('SIGINT', function () {
self.stop(); //tailable cursor should be stopped manually
});

//get last row for init TS
collection.find({}, {'limit': 1, 'sort': [['$natural', -1]]}, function(err, cursor) {
cursor.toArray(function(err, items) {
if (items.length) {
sys.puts('> MongoSlave started');
self.running = true;
self._runSlave(collection, items[0]['ts']);
} else if (err) {
sys.puts(err);
self.stop();
}
});
});
});
});
}

//stop watching
Slave.prototype.stop = function() {
if (!this.running) return;
sys.puts('> MongoSlave stopped');
this.running = false;
this.db.close();
}

Slave.prototype._runSlave = function(collection, time) {

var self = this;

//watch oplog INFINITE (until Slave.stop())
collection.find({'ts': {'$gt': time}}, {'tailable': 1, 'sort': [['$natural', 1]]}, function(err, cursor) {
cursor.each(function(err, item) {
if (cursor.state == Cursor.CLOSED) { //broken cursor
self.running && self._runSlave(collection, time);
return;
}
time = item['ts'];

switch(item['op']) {
case 'i': //inserted
self._emitObj(item['o']);
break;
case 'u': //updated
self.db.collection(item['ns'], function(err, collection) {
collection.findOne(item['o2']['_id'], {}, function(err, item) {
item && self._emitObj(item);
});
});
break;
case 'd': //deleted
//nothing to do
break;
}
});
});
}

Slave.prototype._emitObj = function (obj) {
for(var i in this.callbacks) this.callbacks[i].call(this, obj);
}

Slave.prototype.onObject = function(callback) {
this.callbacks.push(callback);
}


//just for example
var watcher = new Slave();

watcher.onObject(function(obj) {
sys.puts(sys.inspect(obj));
});

watcher.start();
1 change: 1 addition & 0 deletions index.js
@@ -0,0 +1 @@
module.exports = require('./lib/mongodb');
11 changes: 5 additions & 6 deletions integration/integration_tests.js
Expand Up @@ -1128,13 +1128,12 @@ var all_tests = {
test.ok(err instanceof Error);
test.equal("Cursor is closed", err.message);

// Each should allow us to iterate over the entries due to cache
// Should fail if called again (cursor should be closed)
cursor.each(function(err, item) {
if(item != null) {
test.equal(1, item.a);
// Let's close the db
finished_test({test_to_a:'ok'});
}
test.ok(err instanceof Error);
test.equal("Cursor is closed", err.message);
// Let's close the db
finished_test({test_to_a:'ok'});
});
});
});
Expand Down
8 changes: 5 additions & 3 deletions lib/mongodb/collection.js
Expand Up @@ -367,7 +367,7 @@ Collection.prototype.find = function() {
}

if(len == 3){ // backwards compat for options object
var test = ['limit','sort','fields','skip','hint','explain','snapshot','timeout'],
var test = ['limit','sort','fields','skip','hint','explain','snapshot','timeout','tailable'],
idx = 0, l = test.length, is_option = false;
while(!is_option && idx < l) if(test[idx] in fields ) is_option = true; else idx++;
options = is_option ? fields : {};
Expand All @@ -391,7 +391,7 @@ Collection.prototype.find = function() {
options.timeout = len == 6 ? arguments[4] : options.timeout ? options.timeout : undefined;

var o = options;
callback(null, new Cursor(this.db, this, selector, fields, o.skip, o.limit, o.sort, o.hint, o.explain, o.snapshot, o.timeout));
callback(null, new Cursor(this.db, this, selector, fields, o.skip, o.limit, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable));
};

Collection.prototype.normalizeHintField = function(hint) {
Expand Down Expand Up @@ -435,8 +435,10 @@ Collection.prototype.findOne = function(queryObject, options, callback) {
finalQueryObject = (finalQueryObject instanceof this.db.bson_serializer.ObjectID || Object.prototype.toString.call(finalQueryObject) === '[object ObjectID]') ? {'_id':finalQueryObject} : finalQueryObject;
// Build special selector
var specialSelector = {'query':finalQueryObject};
// Build full collection name
var collectionName = (this.db.databaseName ? this.db.databaseName + "." : '') + this.collectionName;
// Execute the command
var queryCommand = new QueryCommand(this.db, this.db.databaseName + "." + this.collectionName, queryOptions, 0, 1, specialSelector, fields);
var queryCommand = new QueryCommand(this.db, collectionName, queryOptions, 0, 1, specialSelector, fields);
this.db.executeCommand(queryCommand, function(err, result) {
if(!err && result.documents[0] && result.documents[0]['$err']) return callback(result.documents[0]['$err'], null);
callback(err, result.documents[0]);
Expand Down

0 comments on commit c786a9a

Please sign in to comment.