Skip to content

Commit

Permalink
[branch] switch to buffalo, remove mongodb-native dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
marcello3d committed Dec 19, 2011
1 parent 8246ff4 commit 53b8ff9
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 89 deletions.
11 changes: 0 additions & 11 deletions lib/bson.js

This file was deleted.

27 changes: 11 additions & 16 deletions lib/collection.js
@@ -1,9 +1,10 @@
/* Mongolian DeadBeef by Marcello Bastea-Forte - zlib license */
var util = require('util')
var buffalo = require('buffalo')
var mongo = buffalo.mongo

var safetyNet = require('./util').safetyNet
var extend = require('./util').extend
var mongo = require('mongodb')
var util = require('util')
var bson = require('./bson')
var MongolianCursor = require('./cursor')

var MongolianCollection = module.exports = function(db, name) {
Expand Down Expand Up @@ -63,11 +64,9 @@ MongolianCollection.prototype.insert = function(object, callback) {

// Assign ids
objects.forEach(function(object) {
if (!object._id) object._id = new bson.ObjectId
if (!object._id) object._id = new buffalo.ObjectId
})
var insertCommand = new mongo.InsertCommand(this.server._fakeDb, this.fullName)
insertCommand.documents = objects
this.db.sendCommand(insertCommand)
this.db.sendCommand(mongo.serializeInsert(this.fullName, objects, false))
if (callback) {
this.db.lastError(safetyNet(callback, function() {
callback(null, object)
Expand All @@ -91,8 +90,8 @@ MongolianCollection.prototype.update = function(criteria, objNew, upsert, multi,
upsert = false
}
if (callback && typeof callback !== 'function') throw new Error("callback is not a function!")
var updateCommand = new mongo.UpdateCommand(this.server._fakeDb, this.fullName, criteria, objNew, { upsert:upsert, multi:multi })
this.db.sendCommand(updateCommand)

this.db.sendCommand(mongo.serializeUpdate(this.fullName, criteria, objNew, upsert, multi))
if (callback) {
this.db.lastError(safetyNet(callback, function() {
callback(null)
Expand Down Expand Up @@ -137,8 +136,7 @@ MongolianCollection.prototype.remove = function(criteria, callback) {
criteria = {}
}
if (callback && typeof callback !== 'function') throw new Error("callback is not a function!")
var deleteCommand = new mongo.DeleteCommand(this.server._fakeDb, this.fullName, criteria)
this.db.sendCommand(deleteCommand)
this.db.sendCommand(mongo.serializeDelete(this.fullName, criteria))
if (callback) {
this.db.lastError(safetyNet(callback, function() {
callback(null)
Expand Down Expand Up @@ -232,16 +230,13 @@ MongolianCollection.prototype.runCommand = function(commandNameOrCommand, option
MongolianCollection.prototype.mapReduce = function(mapFunction, reduceFunction, options, callback) {
var command = {
mapreduce:this.name,
map:new bson.Code(mapFunction, mapFunction.scope),
reduce:new bson.Code(reduceFunction, reduceFunction.scope)
map:mapFunction,
reduce:reduceFunction
}
if (typeof options === 'string') {
options = { out:options }
}
extend(command, options)
if (typeof command.finalize === 'function') {
command.finalize = new bson.Code(command.finalize, command.finalize.scope)
}

var self = this
this.db.runCommand(command, safetyNet(callback, function(result) {
Expand Down
4 changes: 2 additions & 2 deletions lib/connection.js
Expand Up @@ -49,8 +49,8 @@ function Connection(options) {
}

// Setup write command
this.writeCommand = function(command, callback) {
readWriteStream.write(command.toBinary(), callback)
this.write = function(buffer, callback) {
readWriteStream.write(buffer, callback)
}

// Setup data listener
Expand Down
34 changes: 16 additions & 18 deletions lib/cursor.js
@@ -1,7 +1,7 @@
/* Mongolian DeadBeef by Marcello Bastea-Forte - zlib license */
var mongo = require('mongodb')
var safetyNet = require('./util').safetyNet
var bson = mongo.BSONPure || mongo.BSONNative
var buffalo = require('buffalo')
var mongo = buffalo.mongo

var MongolianCursor = module.exports = function(collection, criteria, fields) {
this.server = collection.db.server
Expand All @@ -24,9 +24,9 @@ MongolianCursor.prototype.reset = function() {
* Closes the cursor
*/
MongolianCursor.prototype.close = function(callback) {
if (this._currentBatch && this._currentBatch.cursorId) {
this.db.sendCommand(new mongo.KillCursorCommand(this.server._fakeDb, [this._currentBatch.cursorId]), callback)
delete this._currentBatch.cursorId
if (this._currentBatch && this._currentBatch.cursor) {
this.db.sendCommand(mongo.serializeKillCursors([this._currentBatch.cursor]), callback)
delete this._currentBatch.cursor
}
}

Expand Down Expand Up @@ -103,14 +103,14 @@ MongolianCursor.prototype.explain = function() {
MongolianCursor.prototype.nextBatch = function(callback) {
if (typeof callback !== 'function') throw new Error("callback is not a function!")
var self = this
if (self._currentIndex && self._currentIndex < self._currentBatch.numberReturned) throw new Error("nextBatch cannot be mixed with next")
if (self._currentIndex && self._currentIndex < self._currentBatch.documents.length) throw new Error("nextBatch cannot be mixed with next")
var filterBatch = safetyNet(callback, function(batch) {
// self.server.log.debug("<<<<<<---",batch.numberReturned,batch.cursorId)
// self.server.log.debug("<<<<<<---",batch.numberReturned,batch.cursor)
self._currentIndex = 0
self._currentBatch = batch
self._retrieved += batch.numberReturned
if (batch.cursorId && batch.cursorId.isZero()) {
delete batch.cursorId
self._retrieved += batch.documents.length
if (batch.cursor && batch.cursor.isZero()) {
delete batch.cursor
} else if (self._limit && self._retrieved >= self._limit) {
self.close()
}
Expand All @@ -123,8 +123,7 @@ MongolianCursor.prototype.nextBatch = function(callback) {
self._limit

if (!self._currentBatch) {
var queryCommand = new mongo.QueryCommand(
self.server._fakeDb,
var queryCommand = mongo.serializeQuery(
self.collection.fullName,
0,
self._skip,
Expand All @@ -133,12 +132,11 @@ MongolianCursor.prototype.nextBatch = function(callback) {
self.fields
)
self.db.sendCommand(queryCommand, filterBatch)
} else if (self._currentBatch.cursorId) {
var getMoreCommand = new mongo.GetMoreCommand(
self.server._fakeDb,
} else if (self._currentBatch.cursor) {
var getMoreCommand = mongo.serializeGetMore(
self.collection.fullName,
retrieveCount,
self._currentBatch.cursorId
self._currentBatch.cursor
)
self.db.sendCommand(getMoreCommand, filterBatch)
} else {
Expand All @@ -153,12 +151,12 @@ MongolianCursor.prototype.next = function(callback) {
if (typeof callback !== 'function') throw new Error("callback is not a function!")
var self = this
// We have a retrieved batch that hasn't been exhausted
if (self._currentBatch && self._currentIndex < self._currentBatch.numberReturned) {
if (self._currentBatch && self._currentIndex < self._currentBatch.documents.length) {
var document = self._currentBatch.documents[self._currentIndex++]
// self.server.log.debug("<<<<<<---",document)
callback(null, self._mapper ? self._mapper(document) : document)
// We don't have a batch or the cursor hasn't been closed yet
} else if (!self._currentBatch || self._currentBatch.cursorId) {
} else if (!self._currentBatch || self._currentBatch.cursor) {
self.nextBatch(safetyNet(callback,function() {
self.next(callback)
}))
Expand Down
3 changes: 1 addition & 2 deletions lib/db.js
Expand Up @@ -5,7 +5,6 @@ var util = require('util')
var safetyNet = require('./util').safetyNet
var MongolianCollection = require('./collection')
var MongolianGridFS = require('./gridfs')
var bson = require('./bson')

var MongolianDB = module.exports = function(server, name) {
this.server = server
Expand Down Expand Up @@ -162,7 +161,7 @@ MongolianDB.prototype.dropDatabase = function(callback) {
}

MongolianDB.prototype.eval = function(execFunction, args, callback) {
var command = { $eval:new bson.Code(execFunction, execFunction.scope || {}) }
var command = { $eval:execFunction }
if (arguments.length > 1) {
if (typeof arguments[arguments.length-1] === 'function') {
command.args = Array.prototype.slice.call(arguments, 1, arguments.length-1)
Expand Down
16 changes: 6 additions & 10 deletions lib/gridfile.js
@@ -1,15 +1,13 @@
/* Mongolian DeadBeef by Marcello Bastea-Forte - zlib license */
var mongo = require('mongodb')
var stream = require('stream')
var util = require('util')
var crypto = require('crypto')
var Waiter = require('waiter')
var buffalo = require('buffalo')

var safetyNet = require('./util').safetyNet
var callback = require('./util').callback
var bson = require('./bson')
var Long = bson.Long
var Binary = bson.Binary
var Long = buffalo.Long

var MongolianGridFile = module.exports = function(gridfs, document) {
this.gridfs = gridfs
Expand All @@ -31,7 +29,7 @@ MongolianGridFile.prototype.save = function(callback) {

var waiter = new Waiter
if (!self._id) {
self._id = new bson.ObjectId
self._id = new buffalo.ObjectId
var findCallback = waiter()
// Remove existing file
self.gridfs.findOne({
Expand Down Expand Up @@ -189,7 +187,7 @@ MongolianGridFileWriteStream.prototype.flush = function(callback) {
files_id:this.file._id,
n:this._chunkIndex
},{
data:new Binary(this._partialChunk.slice(0, this._partialIndex)),
data:this._partialChunk.slice(0, this._partialIndex),
files_id:this.file._id,
n:this._chunkIndex
}, waiter())
Expand Down Expand Up @@ -255,12 +253,10 @@ MongolianGridFileReadStream.prototype._nextChunk = function() {
self.emit('error', error || new Error("Chunk not found: "+chunkIndex+"/"+self._chunkIndex))
self.destroy()
} else {
// The mongodb-native BSON Binary buffer may be larger than the actual data size
chunk = chunk.data.buffer.slice(0, chunk.data.position)
if (self.paused) {
self._pauseData = chunk
self._pauseData = chunk.data
} else {
self.emit('data', chunk)
self.emit('data', chunk.data)
self._nextChunk()
}
}
Expand Down
6 changes: 3 additions & 3 deletions lib/gridfs.js
@@ -1,7 +1,7 @@
/* Mongolian DeadBeef by Marcello Bastea-Forte - zlib license */
var mongo = require('mongodb')
var buffalo = require('buffalo')

var safetyNet = require('./util').safetyNet
var bson = require('./bson')
var MongolianGridFile = require('./gridfile')

var MongolianGridFS = module.exports = function(db, name) {
Expand Down Expand Up @@ -40,7 +40,7 @@ MongolianGridFS.prototype.find = function(searchBy) {
var query
if (typeof searchBy === 'string' || searchBy instanceof RegExp) {
query = { filename:searchBy }
} else if (searchBy instanceof bson.ObjectId) {
} else if (searchBy instanceof buffalo.ObjectId) {
query = { _id:searchBy }
} else {
query = searchBy
Expand Down
40 changes: 17 additions & 23 deletions lib/server.js
@@ -1,13 +1,13 @@
/* Mongolian DeadBeef by Marcello Bastea-Forte - zlib license */
var mongo = require('mongodb')
var Waiter = require('waiter')
var taxman = require('taxman')
var EventEmitter = require('events').EventEmitter
var buffalo = require('buffalo')
var mongo = buffalo.mongo

var safetyNet = require('./util').safetyNet
var extend = require('./util').extend
var MongolianDB = require('./db')
var bson = require('./bson')
var Connection = require('./connection')

// [mongo://][username:password@]hostname[:port][/databasename]
Expand Down Expand Up @@ -191,16 +191,10 @@ Mongolian.prototype.log = {
error: function(message){ console.error("[error] " + message) }
}

extend(Mongolian, bson)

//////////////////////////////////////////////////////////////////////////////////
// Internal

var bsonSerializer = require('mongodb').BSONPure
Mongolian.prototype._fakeDb = {
bson_serializer: bsonSerializer,
bson_deserializer: bsonSerializer
}
Mongolian.Long = buffalo.Long
Mongolian.Timestamp = buffalo.Timestamp
Mongolian.ObjectId = buffalo.ObjectId
Mongolian.DBRef = buffalo.DBRef

/**
* Constructs a new MongolianServer object
Expand All @@ -214,6 +208,7 @@ function MongolianServer(mongolian, url) {
this._connection = taxman(function(callback) {
var connection = new Connection
var connected = false
connection.requestId = 0
connection.on('error', function(error) {
mongolian.log.error(self+": "+require('util').inspect(error))
if (!connected) callback(error)
Expand All @@ -229,16 +224,13 @@ function MongolianServer(mongolian, url) {
callback(null, connection)
})
connection.on('message', function(message) {
var reply = new mongo.MongoReply
reply.parseHeader(message, bsonSerializer)
var cb = self._callbacks[reply.responseTo]
var response = new mongo.Response(message)
// mongolian.log.debug("<<<--- "+require('util').inspect(response,undefined,5,true).slice(0,5000))
var cb = self._callbacks[response.responseTo]
if (cb) {
// mongolian.log.debug("<<<--- "+require('util').inspect(reply,undefined,5,true).slice(0,5000))
delete self._callbacks[reply.responseTo]
delete self._callbacks[response.responseTo]
self._callbackCount--
reply.parseBody(message, bsonSerializer, null, function() {
cb(null,reply)
})
cb(null,response)
}
})
connection.connect(url.port, url.host)
Expand All @@ -254,11 +246,13 @@ MongolianServer.prototype.sendCommand = function(command, callback) {
// var stack = new Error().stack
self._connection(safetyNet(callback,function(connection) {
if (callback) {
self._callbacks[command.getRequestId()] = callback
connection.requestId++
mongo.setRequestId(command, connection.requestId)
self._callbacks[connection.requestId] = callback
self._callbackCount++
}
// self.mongolian.log.debug("--->>> "+require('util').inspect(command,undefined,5,true).slice(0,5000))
connection.writeCommand(command)
// self.mongolian.log.debug("--->>> "+require('util').inspect(command,undefined,5,true).slice(0,5000)+'\n'+new Error().stack)
connection.write(command)
}))
}

Expand Down
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -8,7 +8,7 @@
"main": "mongolian.js",
"keywords": ["mongo", "mongodb", "database", "db", "nosql"],
"dependencies": {
"mongodb": "0.9.7-2-2",
"buffalo": "0.1.1",
"waiter": "0.1.1",
"taxman": "0.1.1"
},
Expand Down
6 changes: 3 additions & 3 deletions test/collection1.js
Expand Up @@ -20,14 +20,14 @@ module.exports = {

collection.findOne(function(error, foundRow) {
test.ifError(error)
test.deepEqual(foundRow._id, insertedRow._id)
test.equal(foundRow._id.toString(), insertedRow._id.toString())
test.equal(foundRow.name, "hello world")

collection.findOne({
_id:new Mongolian.ObjectId(insertedRow._id.toString())
}, function(error, foundRow) {
test.ifError(error)
test.deepEqual(foundRow._id, insertedRow._id)
test.equal(foundRow._id.toString(), insertedRow._id.toString())
test.equal(foundRow.name, "hello world")

test.done()
Expand All @@ -43,7 +43,7 @@ module.exports = {
_id:new Mongolian.ObjectId(insertedRow._id.toString())
}, function(error, foundRow) {
test.ifError(error)
test.deepEqual(foundRow._id, insertedRow._id)
test.equal(foundRow._id.toString(), insertedRow._id.toString())
test.equal(foundRow.name, "hello world")

test.done()
Expand Down

0 comments on commit 53b8ff9

Please sign in to comment.