Permalink
Browse files

file rolling on replication

  • Loading branch information...
1 parent a706e56 commit 74af4a40eeaad54361a04e131c1e1792b969ba96 @pgte committed Jan 7, 2011
View
@@ -0,0 +1,4 @@
+temp
+docs
+test
+benchmarks
View
@@ -21,7 +21,7 @@ test: mkdirtmp
operators/test_nin operators/test_neq operators/test_or operators/test_global_or operators/test_order operators/test_desc_order operators/test_chainable \
operators/test_find_stream operators/test_find_stream_chained \
recovery/collection_recovery_test \
- replication/test_master replication/test_slave replication/test_slave_reconnect
+ replication/test_master replication/test_slave replication/test_slave_reconnect replication/test_master_temp_roll
benchmark: mkdirtmp mkdirresults
node tools/benchmarks.js benchmark_collection benchmark_collection_filter benchmark_key_map benchmark_key_map_each_with_pos benchmark_indexed_key_map \
@@ -32,4 +32,7 @@ aggregate_benchmarks: mkdirresults
node tools/aggregate_benchmarks.js > benchmarks/results/summaries/`date "+%Y%m%d%H%M%S"`
cd benchmarks/results/summaries && rm -f latest && ln -s `ls -t1 | head -n1` latest && cd ../../..
+publish: clean
+ npm publish .
+
.PHONY: test
View
@@ -1,4 +1,4 @@
- Compact meta key_map on database startup.
-- Replication should roll over files on the server and client
+- BUG: For some strange reason the last replication master temp file is not deleted.
- Allow master-master replication
- Allow master-master replication with versioning and conflicts (a la CouchDB) ?
View
No changes.
@@ -9,9 +9,10 @@ p.note Everything here is experimental, and the API is no exception, so expect t
li <b>callback</b> (err, db): invoked when there is an error or the database is open. Use db to access / create / remove key maps.
li <b>options:</b>
ul
- li <b>meta_compact_interval</b>: default average compact interval for key maps, in miliseconds. Defaults to 1 hour (1000 * 60 * 60).
+ li <b>meta_compact_interval</b>: default average compact interval for the metadata key map, in miliseconds. Defaults to 1 hour (1000 * 60 * 60).
li <b>replication_master</b>: true if this database is going to act as a replication master. Defaults to false.
li <b>replication_port</b>: replication server port for this database. Defaults to 5293.
+ li <b>replication_max_temp_file_size_kb</b>: maximum file size for replication temporary files on master. Defaults to 10000 KBytes.
h3 Example:
pre
code
@@ -29,6 +30,8 @@ p.note Everything here is experimental, and the API is no exception, so expect t
li <b>callback</b> (err, key_map): invoked when there is an error or the key map is attached.
li <b>options</b>:
ul
+ li <b>buffered</b>: if <i>true</i>, writes are buffered (flushes are scheduled every second after the last one by default). If <i>false</i>, key_map.put only callsback when data is written to disk. Defaults to <i>true</i>.
+ li <b>flush_interval</b>: This determines the frequency of flushes. A flush is scheduled <i>flush_interval</i> miliseconds after the last one finished. In miliseconds. Defaults to 1000 (1 second).
li <b>type</b>: can be 'cached_key_map' or 'indexed_key_map'. Defaults to 'cached_key_map'. ('cached_key_map' is also indexed)
li <b>compact_interval</b>: average compact interval, in miliseconds. Defaults to 1 hour (1000 * 60 * 60).
li <b>cache_slots</b>: Maximum number of objects to cache (only valid if type is 'cached_key_map'). Defaults to 1000.
@@ -80,9 +83,37 @@ p.note Everything here is experimental, and the API is no exception, so expect t
| })
| });
- h2 database.end (callback)
+ h2 database.close (callback)
p Closes all key maps and meta data.
p You should call this before leaving.
p
ul
- li <b>callback</b> (err): when an error occurs or the database ends.
+ li <b>callback</b> (err): when an error occurs or the database ends.
+
+ h1 Events
+
+ p Bind to events like this:
+
+ p
+ pre
+ code
+ | db.on('error', function(err) {
+ | // handle error
+ | });
+
+ h2 'error'
+ p Emitted when there is an unhandled error. You should catch these.
+ p <b>callback</b> (err)
+
+ h2 'key_map_attached'
+ p Emitted when a key map is attached into this database.
+ p <b>callback</b> (key_map_name)
+
+ h2 'index_added'
+ p Emitted when an index is added into a key map attached to this db.
+ p <b>callback</b> (key_map_name, index_name)
+
+ h2 'index_dropped'
+ p Emitted when an index is removed from a key map attached to this db.
+ p <b>callback</b> (key_map_name, index_name)
+
@@ -155,3 +155,26 @@ p.note Everything here is experimental, and the API is no exception, so expect t
h2 key_map.compact (callback)
p Compacts a key map, loosing all past history.
+ h1 Events
+
+ p Regsiter ro receive events like this:
+ p
+ pre
+ code
+ | db.users.on('put', function(key, value) {
+ | console.log('key ' + key + ' was put on users key map');
+ | });
+
+ p Supported events are:
+
+ h2 'before_flush'
+ p Emitted right before flushing starts.
+ p <b>callback</b> ()
+
+ h2 'after_flush'
+ p Emitted right after flushing ends.
+ p <b>callback</b> ()
+
+ h2 'put'
+ p Emitted when a put is made.
+ p <b>callback</b> (key, value)
@@ -221,6 +221,8 @@ File.prototype.readOne = function(position, callback) {
} else {
if (bytesReadNow == 0) {
+ self.queue_size --;
+ self._needsEnd();
callback(null, null);
return;
}
@@ -255,6 +257,7 @@ File.prototype.readOne = function(position, callback) {
record = getAndValidateRecordFromBuffer(read_string.slice(0, length), position, self);
} catch (except) {
callback(except);
+ self._needsEnd();
return;
}
callback(null, record, length);
@@ -317,10 +320,7 @@ File.prototype.end = function(callback) {
}
};
-File.prototype.endSync = function(callback) {
- if (this.ending) {
- callback(new Error("File is already ending"));
- }
+File.prototype.endSync = function() {
return fs.closeSync(this.fd);
};
@@ -364,6 +364,12 @@ File.prototype.destroy = function(callback) {
});
};
+File.prototype.destroySync = function() {
+ if (path.existsSync(this.file_path)) {
+ fs.unlinkSync(self.file_path);
+ }
+};
+
/* Locking */
File.prototype._lockFilePath = function(callback) {
@@ -31,13 +31,31 @@ var makePath = function(callback) {
};
var TempFile = function(callback) {
+ var self = this;
+ makePath(function(path) {
+ File.call(self, path, null, function(err, file) {
+ if (err) { callback(err); return; }
+ process.on('exit', function() {
+ try {
+ require('fs').writeSync(self.fd, 'ENDED', 0);
+ return;
+ self.endSync();
+ } catch (excep) {
+ // do nothing
+ }
+ try {
+ self.destroySync();
+ } catch (excep) {
+ // do nothing
+ }
+ });
+ callback(null, self);
+ });
+ });
};
util.inherits(TempFile, File);
module.exports.open = function(callback) {
- var tf = new TempFile(callback);
- makePath(function(path) {
- File.call(tf, path, null, callback);
- });
+ return new TempFile(callback);
};
@@ -29,7 +29,7 @@ DropIndexCommand.prototype.do = function(meta, callback) {
return;
}
if (!meta.indexes[self.key_map_name]) {
- console.log('Key map with name ' + self.key_map_name + ' not found');
+ callback(new Error('Key map with name ' + self.key_map_name + ' not found'));
return;
}
delete key_map[self.name];
@@ -10,7 +10,8 @@ var path = require('path'),
var default_meta_options = {
meta_compact_interval: 1000 * 60 * 60, // 1 hour, +- 50%
replication_master: false,
- replication_port: 5293
+ replication_port: 5293,
+ replication_max_temp_file_size_kb: 10000
};
var Database = function(db_path, options, callback) {
@@ -87,9 +88,7 @@ Database.prototype._initialize = function(callback) {
// call compact on key_map.end
var old_end = meta.end;
meta.end = function(callback) {
- // console.log('meta end');
if (meta._compact_timeout) {
- // console.log('cleared timeout');
clearTimeout(meta._compact_timeout);
delete meta._compact_timeout;
}
@@ -151,7 +150,8 @@ Database.prototype._initialize = function(callback) {
if (self.options.replication_master) {
var options = {
master: true,
- port: self.options.replication_port
+ port: self.options.replication_port,
+ max_temp_file_size_kb: self.options.replication_max_temp_file_size_kb
};
self.master_replicator = replication.start(self, options, this);
self.master_replicator.on('error', function(err) {
@@ -285,8 +285,6 @@ Database.prototype.close = function(callback) {
var self = this;
var key_map_names = [];
- // console.log('closing...');
-
if (self.state == 'closing') {
callback('Database is already closing');
return;
@@ -66,8 +66,6 @@ var _optimize = function(query) {
};
Finder.prototype.executeCondition = function(keys, field, condition) {
- // console.log("field: " + field);
- // console.log(condition);
var self = this;
for (var operator in condition) {
if (condition.hasOwnProperty(operator)) {
@@ -43,8 +43,6 @@ module.exports = function(master, stream) {
return;
}
- // console.log('opened temp file ' + backlog_file.file_path);
-
backlog_file._written_bytes = 0;
var send = function(data) {
backlog_file.write(JSON.stringify(data) + "\n", function(err, pos, length) {
@@ -140,35 +138,45 @@ module.exports = function(master, stream) {
return;
}
- // console.log('opened temp file ' + running_file.file_path);
-
var send = function(data) {
- if (running_file.ended) {
+ if (running_file._finished) {
// we're ending the file, so let's queue this send
process.nextTick(function() {
send(data);
});
+ return;
}
// if we have equaled or exceeded the max_temp_file_size_kb, let's create and use another file
- if ((running_file.size / 1000) >= master.options.max_temp_file_size_kb) {
- running_file._finished = true;
- temp_file.open(function(err, new_file) {
- if (err) {
- master.error(stream, err);
- return;
- }
- waiting_files.push(new_file);
- running_file = new_file;
- });
- }
- running_file.write(JSON.stringify(data) + "\n", function(err, pos, length) {
- if (err) {
- master.error(stream, err);
- return;
+ running_file.size(function(err, size) {
+ if (err) { master.error(stream, err); return; }
+
+ var really_write = function() {
+ running_file.write(JSON.stringify(data) + "\n", function(err, pos, length) {
+ if (err) {
+ master.error(stream, err);
+ return;
+ }
+ process.nextTick(function() {
+ notify(); // notify sending stream so data gets to the client
+ });
+ });
+ };
+
+ if ((size / 1000) >= master.options.max_temp_file_size_kb) {
+ running_file._finished = true;
+ temp_file.open(function(err, new_file) {
+ if (err) {
+ master.error(stream, err);
+ return;
+ }
+ waiting_files.push(new_file);
+ running_file = new_file;
+ really_write();
+ });
+ } else {
+ really_write();
}
- process.nextTick(function() {
- notify(); // notify sending stream so data gets to the client
- });
+
});
};
@@ -211,8 +219,6 @@ module.exports = function(master, stream) {
notifying = true;
-
- // console.log('reading from ' + sending_file.file_path);
sending_file.readOne(pos, function(err, record, bytesRead) {
notifying = false;
@@ -226,22 +232,22 @@ module.exports = function(master, stream) {
stream.write(record, 'utf8');
process.nextTick(notify);
} else {
- // console.log('0 bytes read');
if (sending_file._finished) {
- // console.log('finished');
if (waiting_files.length > 1) {
waiting_files.splice(0, 1);
+ waiting_files[0]._sent_bytes = 0;
}
- process.nextTick(notify);
- process.nextTick(function() {
- sending_file.end(function(err) {
- if (err) { master.error(stream, err); }
- sending_file.destroy(function(err) {
+ (function(remove_file) {
+ process.nextTick(function() {
+ remove_file.end(function(err) {
if (err) { master.error(stream, err); }
- // console.log('removed file');
+ remove_file.destroy(function(err) {
+ if (err) { master.error(stream, err); }
+ });
});
});
- });
+ process.nextTick(notify);
+ }) (sending_file);
} else {
if (pending_notifies > 0) {
process.nextTick(notify);
@@ -31,7 +31,7 @@ var Master = function(database, options, callback) {
util.inherits(Master, EventEmitter);
module.exports.start = function(database, options, callback) {
- return new Master(database, callback);
+ return new Master(database, options, callback);
};
Master.prototype._initialize = function(callback) {
View
@@ -1,6 +1,6 @@
{ "name" : "alfred"
, "description" : "In-process key-value store"
-, "version" : "0.1.0beta3"
+, "version" : "0.1.0beta4"
, "homepage" : "http://pgte.github.com/alfred"
, "author" : "Pedro Teixeira <pedro.teixeira@gmail.com> (http://metaduck.com)"
, "contributors" :
Oops, something went wrong.

0 comments on commit 74af4a4

Please sign in to comment.