Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

replication sync command now supports the from argument

  • Loading branch information...
commit e2666e27956e13d963711bde9c60ab74e04914c1 1 parent 1c6235d
Pedro Teixeira authored
1  docs/web/views/api/database.jade
View
@@ -13,6 +13,7 @@ p.note Everything here is experimental, and the API is no exception, so expect t
li <b>replication_master</b>: true if this database is going to act as a replication master. Defaults to false.
li <b>replication_port</b>: replication server port for this database. Defaults to 5293.
li <b>replication_max_file_size_kb</b>: maximum file size for replication temporary files on master. Defaults to 10000 KBytes.
+ li <b>replication_max_keep_ms</b>: maximum time a replication temporary file should be kept around. Defaults to 15 days.
h3 Example:
pre
code
11 lib/alfred/meta/commands/replicate_from.js
View
@@ -26,6 +26,7 @@ ReplicateFromCommand.prototype.do = function(meta, callback) {
var self = this;
var connection;
var reconnect_tries;
+ var last_run_log_pos = 0;
var command_queue = [];
@@ -46,6 +47,7 @@ ReplicateFromCommand.prototype.do = function(meta, callback) {
callback(new Error('Error doing replication slave command \'' + util.inspect(command) + '\': ' + util.inspect(err)));
return;
}
+ last_run_log_pos = command.__log_pos;
process.nextTick(function() {
in_notify = false;
do_notify();
@@ -80,12 +82,12 @@ ReplicateFromCommand.prototype.do = function(meta, callback) {
var sendCommand = function(command) {
var line_command = JSON.stringify({command: command}) + "\n";
connection.write(line_command, 'utf8');
- }
+ };
- var connect = function () {
+ (function connect() {
connection = net.createConnection(self.options.source_port, self.source);
connection.on('connect', function() {
- sendCommand('sync');
+ sendCommand('sync', {from: last_run_log_pos});
carrier.carry(connection, function(line) {
var command;
try {
@@ -105,8 +107,7 @@ ReplicateFromCommand.prototype.do = function(meta, callback) {
});
- };
- connect();
+ })();
};
6 lib/alfred/meta/database.js
View
@@ -11,7 +11,8 @@ var default_meta_options = {
meta_compact_interval: 1000 * 60 * 60, // 1 hour, +- 50%
replication_master: false,
replication_port: 5293,
- replication_max_file_size_kb: 10000
+ replication_max_file_size_kb: 10000,
+ replication_max_keep_ms: 1000 * 60 * 60 * 24 * 15, // 15 days
};
var Database = function(db_path, options, callback) {
@@ -151,7 +152,8 @@ Database.prototype._initialize = function(callback) {
var options = {
master: true,
port: self.options.replication_port,
- max_file_size_kb: self.options.replication_max_file_size_kb
+ max_file_size_kb: self.options.replication_max_file_size_kb,
+ max_keep_ms: self.options.replication_max_keep_ms
};
self.master_replicator = replication.start(self, options, this);
self.master_replicator.on('error', function(err) {
4 lib/alfred/meta/replication/commands/sync.js
View
@@ -1,8 +1,8 @@
var LogStream = require('../log_stream');
-module.exports = function(master, stream) {
+module.exports = function(master, args, stream) {
try {
- var logStream = LogStream.open(master.logger);
+ var logStream = LogStream.open(master.logger, args && args.from);
logStream.on('error', function(err) {
stream.write(JSON.stringify({"error": err}));
});
59 lib/alfred/meta/replication/log_stream.js
View
@@ -3,29 +3,35 @@ var EventEmitter = require('events').EventEmitter,
util = require('util'),
File = require('../../files/file');
-var LogStream = function(logger) {
+var LogStream = function(logger, from_pos) {
var self = this;
this.logger = logger;
- this.current_file_index = -1;
+
this.paused = false;
this.current_file_pos = 0;
process.nextTick(function() {
- self._read();
+ if (from_pos && from_pos > 0) {
+ self._seek(from_pos, function(err) {
+ if (err) { emit('error', err); }
+ self._read();
+ });
+ } else {
+ self._read();
+ }
});
};
util.inherits(LogStream, EventEmitter);
-module.exports.open = function(logger) {
- return new LogStream(logger);
+module.exports.open = function(logger, from_pos) {
+ return new LogStream(logger, from_pos);
};
LogStream.prototype._grabNextFile = function(callback) {
var self = this;
- this.current_file_index ++;
- var file_path = this.logger.livelog_file_paths[this.current_file_index];
+ var file_path = self.logger.nextFile(self.current_file && self.current_file.file_path);
var openFilePath = function(file_path) {
File.open(file_path, {read_only: true}, function(err, file) {
if (err) { callback(err); return; }
@@ -36,7 +42,7 @@ LogStream.prototype._grabNextFile = function(callback) {
};
if (!file_path) {
self.logger.once('data', function() {
- file_path = this.logger.livelog_file_paths[this.current_file_index];
+ file_path = self.logger.nextFile(self.current_file && self.current_file.file_path);
assert.ok(!!file_path, 'Couldn\'t get a file from the logger');
openFilePath(file_path);
});
@@ -45,15 +51,42 @@ LogStream.prototype._grabNextFile = function(callback) {
}
}
+LogStream.prototype._seek = function(pos, callback) {
+ var self = this;
+ var rec;
+
+ delete self.current_file;
+ self.current_file = self.logger.seek(pos);
+ self.current_file_pos = 0;
+ (function inside_seek() {
+ self.current_file.readOne(self.current_file_pos, function(err, record, length) {
+ if (err) { callback(err); return; }
+ if (!record) {
+ callback(null);
+ return;
+ }
+ self.current_file_pos += length;
+ rec = JSON.parse(record);
+ if (rec.__log_pos) {
+ if (pos > rec.__log_pos) {
+ process.nextTick(inside_seek);
+ } else {
+ callback(null);
+ }
+ }
+ });
+ })();
+};
+
LogStream.prototype._read = function() {
var self = this;
- if (this.paused) { return; }
- if (!this.current_file) {
- this._grabNextFile(function(file) {
+ if (self.paused) { return; }
+ if (!self.current_file) {
+ self._grabNextFile(function(file) {
self._read();
});
} else {
- this.current_file.readOne(this.current_file_pos, function(err, record, length) {
+ self.current_file.readOne(self.current_file_pos, function(err, record, length) {
if (err) { self.emit('error', err); return; }
if (record) {
self.current_file_pos += length;
@@ -63,7 +96,7 @@ LogStream.prototype._read = function() {
// file reached end
// now we have to know if it has rolled into another file or should we keep trying to read this one
// check if a new file exists
- if (self.logger.livelog_file_paths.length > (self.current_file_index + 1)) {
+ if (self.logger.nextFile(self.current_file && self.current_file.file_path)) {
self.current_file.end(function(err) {
if (err) { emit('error', err); return; }
});
66 lib/alfred/meta/replication/logger.js
View
@@ -6,7 +6,8 @@ var File = require('../../files/file'),
options_merger = require('../../util/options_merger');
var default_options = {
- max_file_size_kb: 10000
+ max_file_size_kb: 10000,
+ max_keep_ms: 1000 * 60 * 60 * 24 * 15, // 15 days
}
var Logger = function(database, options, callback) {
@@ -121,6 +122,8 @@ Logger.prototype._backlogKeyMap = function(key_map_name, key_map, callback) {
var self = this;
var send = function(what) {
+ self.current_log_pos ++;
+ what['__log_pos'] = self.current_log_pos;
self.backlog_file.write(JSON.stringify(what), function(err) {
if (err) { callback(err); return; }
self.emit('data', what);
@@ -158,6 +161,9 @@ Logger.prototype._openLiveLog = function(callback) {
self.livelog_file = livelog_file;
self.livelog_file_paths.push(liveLogPath);
callback(null);
+ self._maxKeepCheck(function(err) {
+ if (err) { callback(err); return; }
+ });
});
};
@@ -172,10 +178,10 @@ Logger.prototype._startLiveLog = function(callback) {
var send = function(what) {
var reallyWrite = function(what) {
+ self.current_log_pos ++;
+ what['__log_pos'] = self.current_log_pos;
self.livelog_file.write(JSON.stringify(what), function(err, pos, length) {
if (err) { callback(err); return; }
- written += length;
- self.current_log_pos += length;
self.emit('data', what);
});
};
@@ -222,4 +228,58 @@ Logger.prototype._startLiveLog = function(callback) {
send({m: key_map_name, k: key, v: value});
});
callback(null);
+};
+
+Logger.prototype._maxKeepCheck = function(callback) {
+ var self = this;
+ var old = Date.now() - self.max_keep_ms;
+ var files = self.livelog_file_paths.slice(0); // clone self.livelog_file_paths
+ for (var i = 0; i < files.length; i++) {
+ (function(i) {
+ var file_path = self.livelog_file_paths[i];
+ if (file_path) {
+ path.exists(file_path, function(exists) {
+ if (exists) {
+ fs.stat(file_path, function(err, stat) {
+ if (err) { callback(err); return; }
+ var idx;
+ if ((stat.mtime || stat.ctime).getTime() < old) {
+ idx = self.livelog_file_paths.indexOf(file_path);
+ if (idx >= 0) {
+ self.livelog_file_paths.splice(idx, 1);
+ }
+ fs.unlink(file_path, function(err) {
+ if (err) { callback(err); return; }
+ });
+ }
+ });
+ }
+ });
+ }
+ })(i);
+ }
+};
+
+Logger.prototype.nextFile = function(current_file) {
+ if (!current_file) {
+ return this.livelog_file_paths[0];
+ }
+ var idx = this.livelog_file_paths.indexOf(current_file);
+ if (idx >= 0) {
+ return this.livelog_file_paths[idx + 1];
+ }
+};
+
+Logger.prototype.seek = function(pos) {
+ var last_path, path;
+
+ for(var i = 0; i < this.livelog_file_paths.length; i++) {
+ path = this.livelog_file_paths[i];
+ this_pos = parseInt(path.match(/^live_log_([0-9]+).alf$/)[1], 10);
+ if (this_pos > pos) {
+ return last_path;
+ }
+ last_path = path;
+ }
+ return last_path;
};
2  lib/alfred/meta/replication/master.js
View
@@ -85,7 +85,7 @@ Master.prototype._executeCommand = function(command, stream) {
if (!command_function || typeof(command_function) != 'function' ) {
throw new Error('unrecognized command ' + command.command);
}
- command_function(self, stream);
+ command_function(self, command['arguments'], stream);
} catch(exception) {
self.error(stream, exception);
}
51 test/replication/test_master.js
View
@@ -170,7 +170,8 @@ module.exports.run = function(next) {
var expected_objects = [
{ m: 'meta',
command: 'attach_key_map',
- arguments: [ 'users', { cache_slots: 1000 } ] }
+ arguments: [ 'users', { cache_slots: 1000 }, ],
+ __log_pos: 1}
, { m: 'meta',
command: 'add_index',
@@ -178,7 +179,8 @@ module.exports.run = function(next) {
[ 'users',
'sex',
{ bplustree_order: 100, ordered: true },
- 'function (user) {\n return user.sex;\n }' ] }
+ 'function (user) {\n return user.sex;\n }' ],
+ __log_pos: 2}
, { m: 'meta',
command: 'add_index',
@@ -186,50 +188,65 @@ module.exports.run = function(next) {
[ 'users',
'age',
{ bplustree_order: 100, ordered: true },
- 'function (user) {\n return user.age;\n }' ] }
+ 'function (user) {\n return user.age;\n }' ],
+ __log_pos: 3}
,{ m: 'users',
k: '1',
- v: { name: 'Pedro', age: 35, sex: 'm' } }
+ v: { name: 'Pedro', age: 35, sex: 'm' },
+ __log_pos: 4}
,{ m: 'users',
k: '2',
- v: { name: 'John', age: 32, sex: 'm' } }
+ v: { name: 'John', age: 32, sex: 'm' },
+ __log_pos: 5}
,{ m: 'users',
k: '3',
- v: { name: 'Bruno', age: 28, sex: 'm' } }
+ v: { name: 'Bruno', age: 28, sex: 'm' },
+ __log_pos: 6}
,{ m: 'users',
k: '4',
- v: { name: 'Sandra', age: 35, sex: 'f' } }
+ v: { name: 'Sandra', age: 35, sex: 'f' },
+ __log_pos: 7}
, { m: 'users',
k: '5',
- v: { name: 'Patricia', age: 42, sex: 'f' } }
+ v: { name: 'Patricia', age: 42, sex: 'f' },
+ __log_pos: 8}
, { m: 'users',
k: '6',
- v: { name: 'Joana', age: 29, sex: 'f' } }
+ v: { name: 'Joana', age: 29, sex: 'f' },
+ __log_pos: 9}
, { m: 'users',
k: '7',
- v: { name: 'Susana', age: 30, sex: 'f' } }
+ v: { name: 'Susana', age: 30, sex: 'f' },
+ __log_pos: 10}
, { m: 'users',
k: '1',
- v: { name: 'Pedro', age: 35, sex: 'm', rndm: '1' } }
+ v: { name: 'Pedro', age: 35, sex: 'm', rndm: '1' },
+ __log_pos: 11}
, { m: 'users',
k: '2',
- v: { name: 'John', age: 32, sex: 'm', rndm: '2' } }
+ v: { name: 'John', age: 32, sex: 'm', rndm: '2' },
+ __log_pos: 12}
, { m: 'users',
k: '3',
- v: { name: 'Bruno', age: 28, sex: 'm', rndm: '3' } }
+ v: { name: 'Bruno', age: 28, sex: 'm', rndm: '3' },
+ __log_pos: 13}
, { m: 'users',
k: '4',
- v: { name: 'Sandra', age: 35, sex: 'f', rndm: '4' } }
+ v: { name: 'Sandra', age: 35, sex: 'f', rndm: '4' },
+ __log_pos: 14}
, { m: 'users',
k: '5',
- v: { name: 'Patricia', age: 42, sex: 'f', rndm: '5' } }
+ v: { name: 'Patricia', age: 42, sex: 'f', rndm: '5' },
+ __log_pos: 15}
, { m: 'users',
k: '6',
- v: { name: 'Joana', age: 29, sex: 'f', rndm: '6' } }
+ v: { name: 'Joana', age: 29, sex: 'f', rndm: '6' },
+ __log_pos: 16}
, { m: 'users',
k: '7',
- v: { name: 'Susana', age: 30, sex: 'f', rndm: '7' } }
+ v: { name: 'Susana', age: 30, sex: 'f', rndm: '7' },
+ __log_pos: 17}
];
var records = [];
Please sign in to comment.
Something went wrong with that request. Please try again.