Skip to content

Commit

Permalink
Merged in pixelspark:gridfs_with_buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Apr 29, 2011
1 parent be79ed6 commit 24f6ee6
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 9 deletions.
115 changes: 114 additions & 1 deletion integration/integration_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var Db = require('../lib/mongodb').Db,
BinaryParser = require('../lib/mongodb/bson/binary_parser').BinaryParser,
Buffer = require('buffer').Buffer,
fs = require('fs'),
Script = process.binding('evals').Script;
Script = require('vm');

/*******************************************************************************************************
Integration Tests
Expand Down Expand Up @@ -2783,6 +2783,36 @@ var all_tests = {
});
},

test_gs_small_write_with_buffer : function() {
var gridStore = new GridStore(client, "test_gs_small_write_with_buffer", "w");
gridStore.open(function(err, gridStore) {
var data = new Buffer("hello world", "utf8");

gridStore.writeBuffer(data, function(err, gridStore) {
gridStore.close(function(err, result) {
client.collection('fs.files', function(err, collection) {
collection.find({'filename':'test_gs_small_write_with_buffer'}, function(err, cursor) {
cursor.toArray(function(err, items) {
test.equal(1, items.length);
var item = items[0];
test.ok(item._id instanceof ObjectID || Object.prototype.toString.call(item._id) === '[object ObjectID]');

client.collection('fs.chunks', function(err, collection) {
collection.find({'files_id':item._id}, function(err, cursor) {
cursor.toArray(function(err, items) {
test.equal(1, items.length);
finished_test({test_gs_small_write_with_buffer:'ok'});
})
});
});
});
});
});
});
});
});
},

test_gs_small_file : function() {
var gridStore = new GridStore(client, "test_gs_small_file", "w");
gridStore.open(function(err, gridStore) {
Expand Down Expand Up @@ -2863,6 +2893,89 @@ var all_tests = {
});
},

test_gs_seek_with_buffer : function() {
var gridStore = new GridStore(client, "test_gs_seek_with_buffer", "w");
gridStore.open(function(err, gridStore) {
var data = new Buffer("hello, world!", "utf8");
gridStore.writeBuffer(data, function(err, gridStore) {
gridStore.close(function(result) {
var gridStore2 = new GridStore(client, "test_gs_seek_with_buffer", "r");
gridStore2.open(function(err, gridStore) {
gridStore.seek(0, function(err, gridStore) {
gridStore.getc(function(err, chr) {
test.equal('h', chr);
});
});
});

var gridStore3 = new GridStore(client, "test_gs_seek_with_buffer", "r");
gridStore3.open(function(err, gridStore) {
gridStore.seek(7, function(err, gridStore) {
gridStore.getc(function(err, chr) {
test.equal('w', chr);
});
});
});

var gridStore4 = new GridStore(client, "test_gs_seek_with_buffer", "r");
gridStore4.open(function(err, gridStore) {
gridStore.seek(4, function(err, gridStore) {
gridStore.getc(function(err, chr) {
test.equal('o', chr);
});
});
});

var gridStore5 = new GridStore(client, "test_gs_seek_with_buffer", "r");
gridStore5.open(function(err, gridStore) {
gridStore.seek(-1, GridStore.IO_SEEK_END, function(err, gridStore) {
gridStore.getc(function(err, chr) {
test.equal('!', chr);
});
});
});

var gridStore6 = new GridStore(client, "test_gs_seek_with_buffer", "r");
gridStore6.open(function(err, gridStore) {
gridStore.seek(-6, GridStore.IO_SEEK_END, function(err, gridStore) {
gridStore.getc(function(err, chr) {
test.equal('w', chr);
});
});
});

var gridStore7 = new GridStore(client, "test_gs_seek_with_buffer", "r");
gridStore7.open(function(err, gridStore) {
gridStore.seek(7, GridStore.IO_SEEK_CUR, function(err, gridStore) {
gridStore.getc(function(err, chr) {
test.equal('w', chr);

gridStore.seek(-1, GridStore.IO_SEEK_CUR, function(err, gridStore) {
gridStore.getc(function(err, chr) {
test.equal('w', chr);

gridStore.seek(-4, GridStore.IO_SEEK_CUR, function(err, gridStore) {
gridStore.getc(function(err, chr) {
test.equal('o', chr);

gridStore.seek(3, GridStore.IO_SEEK_CUR, function(err, gridStore) {
gridStore.getc(function(err, chr) {
test.equal('o', chr);
finished_test({test_gs_seek_with_buffer:'ok'});
});
});
});
});
});
});
});
});
});
});
});
});
},

test_gs_seek : function() {
var gridStore = new GridStore(client, "test_gs_seek", "w");
gridStore.open(function(err, gridStore) {
Expand Down
7 changes: 6 additions & 1 deletion lib/mongodb/bson/bson.js
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,12 @@ Binary.prototype.write = function(string, offset) {
this.buffer = buffer;
}
// Write the content to the buffer
this.buffer.write(string, 'binary', offset);
if(string instanceof Buffer) {
string.copy(this.buffer, offset, 0, string.length);
}
else {
this.buffer.write(string, 'binary', offset);
}
this.position = offset + string.length;
};

Expand Down
21 changes: 19 additions & 2 deletions lib/mongodb/gridfs/chunk.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var Chunk = exports.Chunk = function(file, mongoObject) {
* will contain a reference to this object.
*/
Chunk.prototype.write = function(data, callback) {
this.data.write(data, this.internalPosition);
this.data.write(data.toString('binary'), this.internalPosition);
this.internalPosition = this.data.length();
callback(null, this);
};
Expand All @@ -81,6 +81,23 @@ Chunk.prototype.read = function(length) {
}
};

Chunk.prototype.readSlice = function(length) {
if ((this.length() - this.internalPosition + 1) >= length) {
var data = null;
if (this.data.buffer != null) { //Pure BSON
data = this.data.buffer.slice(this.internalPosition, this.internalPosition + length);
} else { //Native BSON
data = new Buffer(length);
//todo there is performance degradation! we need direct Binary::write() into buffer with offset support!
length = data.write(this.data.read(this.internalPosition, length), 'binary', 0);
}
this.internalPosition = this.internalPosition + length;
return data;
} else {
return null;
}
};

/**
* Checks if the read/write head is at the end.
*
Expand Down Expand Up @@ -174,4 +191,4 @@ Chunk.prototype.length = function() {
* The default chunk size
* @constant
*/
Chunk.DEFAULT_CHUNK_SIZE = 1024 * 256;
Chunk.DEFAULT_CHUNK_SIZE = 1024 * 256;
137 changes: 132 additions & 5 deletions lib/mongodb/gridfs/gridstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ GridStore.prototype.open = function(callback) {
var self = this;

self.collection(function(err, collection) {
if(err!==null) {
callback(new Error("at collection: "+err), null);
return;
}

self.chunkCollection(function(err, chunkCollection) {
collection.find({'filename':self.filename}, function(err, cursor) {
// Fetch the file
Expand Down Expand Up @@ -127,7 +132,7 @@ GridStore.prototype.open = function(callback) {
} else if(self.mode == "w") {
self.chunkCollection(function(err, collection2) {
// Create index for the chunks
chunkCollection.ensureIndex([['files_id', 1], ['n', 1]], function(err, index) {
//chunkCollection.ensureIndex([['files_id', 1], ['n', 1]], function(err, index) {
// Delete any existing chunks
self.deleteChunks(function(err, result) {
self.currentChunk = new Chunk(self, {'n':0});
Expand All @@ -137,12 +142,12 @@ GridStore.prototype.open = function(callback) {
self.position = 0;
callback(null, self);
});
});
//});
});
} else if(self.mode == "w+") {
self.chunkCollection(function(err, collection) {
// Create index for the chunks
chunkCollection.ensureIndex([['files_id', 1], ['n', 1]], function(err, index) {
//chunkCollection.ensureIndex([['files_id', 1], ['n', 1]], function(err, index) {
self.nthChunk(self.lastChunkNumber(), function(err, chunk) {
// Set the current chunk
self.currentChunk = chunk == null ? new Chunk(self, {'n':0}) : chunk;
Expand All @@ -151,7 +156,7 @@ GridStore.prototype.open = function(callback) {
self.position = self.length;
callback(null, self);
});
});
//});
});
} else {
callback(new Error("Illegal mode " + self.mode), null);
Expand Down Expand Up @@ -222,7 +227,7 @@ GridStore.prototype.writeFile = function (file, callback) {
*
* @see GridStore#writeFile
*/
GridStore.prototype.write = function(string, close, callback) {
GridStore.prototype.write = function(string, close, callback) {
if(typeof close === "function") { callback = close; close = null; }
var self = this;
var finalClose = close == null ? false : close;
Expand Down Expand Up @@ -270,6 +275,61 @@ GridStore.prototype.write = function(string, close, callback) {
}
};

GridStore.prototype.writeBuffer = function(buffer, close, callback) {
if(typeof close === "function") { callback = close; close = null; }
var self = this;
var finalClose = (close == null) ? false : close;

if(self.mode[0] != "w") {
callback(new Error(self.filename + " not opened for writing"), null);
}
else {
if((self.currentChunk.position + buffer.length) > self.chunkSize) {
// Data exceeds current chunk remaining free size; fill up current chunk and write the rest
// to a new chunk (recursively)
var previousChunkNumber = self.currentChunk.chunkNumber;
var leftOverDataSize = self.chunkSize - self.currentChunk.position;
var firstChunkData = buffer.slice(0, leftOverDataSize);

var leftOverData = buffer.slice(leftOverDataSize);

// Let's finish the current chunk and then call write again for the remaining data
self.currentChunk.write(firstChunkData, function(err, chunk) {
chunk.save(function(err, result) {
self.currentChunk = new Chunk(self, {'n': (previousChunkNumber + 1)});
self.position = self.position + leftOverDataSize;

// Write the remaining data
self.writeBuffer(leftOverData, function(err, gridStore) {
if(finalClose) {
self.close(function(err, result) {
callback(null, gridStore);
});
}
else {
callback(null, gridStore);
}
});
});
});
}
else {
// Write buffer to chunk all at once
self.currentChunk.write(buffer, function(err, chunk) {
self.position = self.position + buffer.length;
if(finalClose) {
self.close(function(err, result) {
callback(null, self);
});
}
else {
callback(null, self);
}
});
}
}
};

/**
* Creates a mongoDB object representation of this object.
*
Expand Down Expand Up @@ -425,6 +485,9 @@ GridStore.prototype.deleteChunks = function(callback) {

if(self.fileId != null) {
self.chunkCollection(function(err, collection) {
if(err!==null) {
callback(err, false);
}
collection.remove({'files_id':self.fileId}, function(err, result) {
callback(null, true);
});
Expand All @@ -434,6 +497,27 @@ GridStore.prototype.deleteChunks = function(callback) {
}
};

GridStore.prototype.unlink = function(callback) {
var self = this;
this.deleteChunks(function(err) {
if(err!==null) {
callback("at deleteChunks: "+err);
return;
}

self.collection(function(err, collection) {
if(err!==null) {
callback("at collection: "+err);
return;
}

collection.remove({'_id':self.fileId}, function(err, collection) {
callback(err, self);
});
});
});
};

/**
* Retrieves the file collection associated with this object.
*
Expand Down Expand Up @@ -566,6 +650,49 @@ GridStore.prototype.read = function(length, buffer, callback) {
}
};

GridStore.prototype.readBuffer = function(length, buffer, callback) {
var self = this;
var args = Array.prototype.slice.call(arguments, 0);
callback = args.pop();
length = args.length ? args.shift() : null;
buffer = args.length ? args.shift() : null;

var left = Math.min(self.length - self.position, length);
if(buffer===null) {
buffer = new Buffer(left);
}

var leftInCurrentChunk = self.currentChunk.length()-self.currentChunk.position;

// Everything can be read from this chunk
if((leftInCurrentChunk >= left) && leftInCurrentChunk!==0) {
var slice = self.currentChunk.readSlice(left);
self.position += left;
callback(null, slice);
}
else {
if(leftInCurrentChunk > 0) {
var slice = self.currentChunk.readSlice(leftInCurrentChunk);
self.position += leftInCurrentChunk;
slice.copy(buffer, 0, 0, leftInCurrentChunk);
}

var leftForNextChunk = left - leftInCurrentChunk;
var subBuffer = buffer.slice(leftInCurrentChunk, leftInCurrentChunk + leftForNextChunk);

self.nthChunk(self.currentChunk.chunkNumber+1, function(err, chunk) {
self.currentChunk = chunk;
self.readBuffer(leftForNextChunk, subBuffer, function(err, subb) {
if(subb!==subBuffer) {
// readBuffer returned its own buffer slice
subb.copy(buffer, leftInCurrentChunk, 0, subb.length);
}
callback(err, buffer);
});
});
}
}

/**
* Retrieves the position of the read/write head of this file.
*
Expand Down

0 comments on commit 24f6ee6

Please sign in to comment.