Permalink
Browse files

Release v0.5.2b1

  • Loading branch information...
pgte committed Feb 18, 2011
1 parent 77c1c82 commit a931b42b12a72a71f3050410bbd8894001ffb148
View
@@ -4,8 +4,10 @@ var carrier = require('carrier'),
EventEmitter = require('events').EventEmitter,
options_merger = require('./util/options_merger');
-var File = require('./files/file'),
- BufferedFile = require('./files/buffered_file');
+var File = require('./files/file'),
+ BufferedFile = require('./files/buffered_file'),
+ FullReader = require('./files/full_reader'),
+ AlfredCheckError = require('./files/alfred_check_error');
var default_options = {
buffered: true,
@@ -124,64 +126,39 @@ Collection.prototype.fetch = function(pos, length, callback) {
Collection.prototype.read = function(callback, null_on_end) {
var self = this;
-
- this.file.writtenSize(function(err, size) {
- if (err) { callback(err); return; }
- if (size <= 0) {
- if (null_on_end) {
- callback(null, null);
- }
+
+ var reader = FullReader.open(this.file.file_path);
+ reader.on('error', function(err) {
+ if (err instanceof AlfredCheckError) {
+ self.emit('warn', err);
} else {
- var file_pos = 0;
- var stop_at = size - 1;
- var record;
- var line_count = 0;
- process.nextTick(function keep_reading() {
- self.file.readOne(file_pos, function(err, str, bytesRead) {
- if (err) {
- if (!err._ALFRED_CHECK_ERROR) {
- callback(err); return;
- } else {
- if (file_pos < stop_at) {
- file_pos += 1;
- } else {
- }
- }
- }
-
- if (str === null) {
- if (null_on_end) {
- callback(null, null);
- }
- return;
- }
-
-
- if (str) {
- try {
- record = JSON.parse(str);
- } catch (excp) {
- process.nextTick(function() {
- callback(new Error("Error parsing line " + line_count + " of file " + self.file.file_path + " \"" + str + "\": " + excp.message));
- });
- return;
- }
- callback(null, record, file_pos, Buffer.byteLength(str) + 54);
- file_pos += bytesRead;
- line_count += 2;
- }
-
- if (file_pos < stop_at) {
- process.nextTick(keep_reading);
- } else {
- if (null_on_end) {
- callback(null, null);
- }
- }
- });
- });
+ callback(err);
}
});
+
+ reader.on('warn', function(warn) {
+ self.emit('warn', warn);
+ });
+
+ reader.on('data', function(record, pos, length) {
+ callback(null, JSON.parse(record), pos, length);
+ });
+
+ reader.on('end', function(pos) {
+ (function _do_one(){
+ self.file.readOne(pos, function(err, record, length) {
+ if (err) { callback(err); return; }
+ if (!record) {
+ callback(null, null);
+ } else {
+ callback(null, JSON.parse(record), pos, length);
+ pos += length;
+ process.nextTick(_do_one);
+ }
+ })
+ })();
+ });
+
};
Collection.prototype.filter = function(filter_function, callback) {
@@ -1,7 +1,10 @@
var util = require('util');
var AlfredCheckError = module.exports = function(message, error) {
+ Error.apply(this, arguments);
+ this.message = message;
this._ALFRED_CHECK_ERROR = error;
- Error.call(this, message);
+ Error.captureStackTrace(this);
};
-util.inherits(AlfredCheckError, Error);
+
+util.inherits(AlfredCheckError, Error);
View
@@ -91,6 +91,7 @@ var calculateTrailer = function(string, pos) {
//return a 40 byte trailer
return ret.substring(ret.length - 40);
};
+module.exports.calculateTrailer = calculateTrailer;
var validateTrailer = function(trailer, string, pos) {
return calculateTrailer(string, pos) == trailer;
@@ -238,6 +239,7 @@ File.prototype.readOne = function(position, callback) {
try {
buf.copy(read_string, bytesRead, 0, bytesReadNow);
} catch (excp) {
+ console.log(excp.message);
console.log("buf.copy(" + read_string.length + ", " + bytesRead + ", 0, " + bytesReadNow + ");");
console.log("processed_header: " + processed_header);
console.log('buf.length: ' + buf.length);
@@ -0,0 +1,141 @@
+var AlfredCheckError = require('./alfred_check_error'),
+ File = require('./file'),
+ StringDecoder = require('string_decoder').StringDecoder;
+
+var MAGIC_CHAR_1 = 0xDC;
+var MAGIC_CHAR_2 = 0x80;
+
+var FileProtocol = function(pos, recordCallback, warnCallback) {
+ this.pos = pos;
+ this.startRecordPos = 0;
+ this.recordCallback = recordCallback;
+ this.warnCallback = warnCallback;
+ this.reset();
+};
+
+module.exports.create = function(pos, recordCallback, warnCallback) {
+ return new FileProtocol(pos, recordCallback, warnCallback);
+};
+
+FileProtocol.prototype.reset = function() {
+ this.state = 'idle';
+ this.lengthString = '';
+ this.buffers = [];
+ this.trailerString = '';
+};
+
+FileProtocol.prototype.validateTrailerAndCallback = function() {
+ var decoder = new StringDecoder('utf8'),
+ record = '',
+ length = 54 + this.recordLength,
+ trailer;
+
+ this.buffers.forEach(function(buffer) {
+ record += decoder.write(buffer);
+ });
+ trailer = File.calculateTrailer(record, this.startRecordPos, length);
+ if (trailer != this.trailerString) {
+ throw new AlfredCheckError('at pos ' + this.pos + ': trailer does not match. Should be ' + trailer + ' and is ' + this.trailerString + '. record length was ' + this.recordLength);
+ }
+ this.recordCallback(record, this.startRecordPos, length);
+}
+
+FileProtocol.prototype.magicChar1 = function() {
+ if (this.state != 'idle') {
+ throw new AlfredCheckError('at pos ' + this.pos + ': magic char 1 should not come in the middle of a record', 'MAGIC_CHAR_1_MIDDLE_OF_RECORD');
+ }
+ this.state = 'magic char 1';
+};
+
+FileProtocol.prototype.magicChar2 = function() {
+ if (this.state != 'magic char 1') {
+ throw new AlfredCheckError('at pos ' + this.pos + ': magic char 2 should come only after magic char 1', 'INVALID_MAGIC_CHAR_2');
+ }
+ this.state = 'magic char 2';
+};
+
+FileProtocol.prototype.length = function(length) {
+ if (this.state != 'magic char 2' && this.state != 'in length') {
+ throw new AlfredCheckError('at pos ' + this.pos + ': length should come after magic char 2. in state ' + this.state, 'INVALID_LENGTH_POS');
+ }
+ this.lengthString += length;
+ if (this.lengthString.length < 12) {
+ this.state = 'in length'
+ } else {
+ this.remainingLength = this.recordLength = parseInt(this.lengthString, 10);
+ this.state = 'finished length';
+ }
+};
+
+FileProtocol.prototype.record = function(buffer) {
+ this.buffers.push(buffer);
+};
+
+FileProtocol.prototype.trailer = function(trailer) {
+ this.trailerString += trailer;
+ if (this.trailerString.length > 40) {
+ throw new Error('at pos ' + this.pos + ': parser error: trailer is ' + this.trailerString.length + ' bytes');
+ }
+ if (this.trailerString.length === 40) {
+ this.state = 'finished';
+ this.validateTrailerAndCallback();
+ this.startRecordPos += (54 + this.recordLength);
+ this.reset();
+ }
+}
+
+FileProtocol.prototype.write = function(buffer) {
+ var byt, len, piece, advanceBy, i;
+
+ i = 0;
+ while (i < buffer.length) {
+ advanceBy = 1;
+ try {
+ byt = buffer[i]
+ if (this.state == 'idle' && byt === MAGIC_CHAR_1) {
+ this.magicChar1();
+ } else if (this.state == 'magic char 1' && byt === MAGIC_CHAR_2) {
+ this.magicChar2();
+ } else {
+ if (this.state == 'magic char 2' || this.state == 'in length') {
+ len = Math.min(12 - this.lengthString.length, buffer.length - i);
+ advanceBy = len;
+ piece = buffer.slice(i, i + len);
+ this.length(piece.toString('utf8'));
+ } else if (this.state == 'finished length' || this.state == 'in record') {
+ this.state = 'in record';
+ len = Math.min(this.remainingLength, buffer.length - i);
+ advanceBy = len;
+ piece = buffer.slice(i, i + len);
+ this.record(piece);
+ this.remainingLength -= len;
+ if (this.remainingLength === 0) {
+ this.state = 'finished record';
+ }
+ } else if (this.state == 'finished record' || this.state == 'in trailer') {
+ this.state = 'in trailer';
+ len = Math.min(40 - this.trailerString.length, buffer.length - i);
+ advanceBy = len;
+ this.trailer(buffer.slice(i, i + len).toString('utf8'));
+ } else {
+ throw new AlfredCheckError('invalid sequence in state ' + this.state + ' on pos ' + this.pos);
+ }
+ }
+ } catch (err) {
+ if (!(err instanceof AlfredCheckError)) {
+ throw err;
+ } else {
+ advanceBy = 1;
+ this.startRecordPos += 1;
+ // log and skip to the next byte
+ //console.log("check error: " + err.message);
+ //console.log('advancing by ' + advanceBy);
+ this.warnCallback(err);
+ this.reset();
+ }
+ }
+ //console.log(advancing to)
+ this.pos += advanceBy;
+ i += (advanceBy || 1);
+ }
+}
@@ -0,0 +1,42 @@
+var EventEmitter = require('events').EventEmitter,
+ util = require('util'),
+ fs = require('fs'),
+ FileProtocol = require('./file_protocol');
+
+var MAGIC_CHAR_1 = 0xDC;
+var MAGIC_CHAR_2 = 0x80;
+
+var FullReader = function(file_path) {
+ var self = this;
+ this.file_path = file_path;
+ self.read();
+};
+
+util.inherits(FullReader, EventEmitter);
+
+module.exports.open = function(filePath) {
+ return new FullReader(filePath);
+};
+
+FullReader.prototype.read = function() {
+ var self = this;
+ var rs = fs.createReadStream(this.file_path),
+ protocol = FileProtocol.create(0, function(record, pos, length) {
+ self.emit('data', record, pos, length);
+ }, function(warn) {
+ self.emit('warn', warn);
+ });
+
+ rs.on('data', function(data) {
+ protocol.write(data);
+ });
+
+ rs.on('error', function(err) {
+ self.emit('error', err);
+ });
+
+ rs.on('end', function() {
+ self.emit('end', protocol.pos);
+ });
+
+};
@@ -1,3 +1,5 @@
+var util = require('util');
+
var Index = function() {
this.map = {};
this.waitQueue = [];
@@ -11,6 +13,10 @@ Index.prototype.put = function(key, offset, length, callback, secret) {
var self = this;
+ if (offset === undefined || offset === null) { throw new Error('invalid offset: ' + util.inspect(offset))}
+ if (!length) { throw new Error('invalid length: ' + util.inspect(length))}
+ if (!callback) { throw new Error('invalid callback: ' + util.inspect(callback))}
+
this.waitForAtomic(key, secret, function(err, rec) {
if (err) {
callback(err);
View
@@ -137,7 +137,7 @@ KeyMap.prototype.destroy = function(key, callback) {
KeyMap.prototype.getAtPos = function(pos, length, callback) {
if (!length) {
- callback(new Error("invalid length"));
+ callback(new Error("invalid length: " + length));
return;
}
View
@@ -1,6 +1,6 @@
{ "name" : "alfred"
, "description" : "In-process key-value store"
-, "version" : "0.5.1b2"
+, "version" : "0.5.2b1"
, "homepage" : "http://pgte.github.com/alfred"
, "author" : "Pedro Teixeira <pedro.teixeira@gmail.com> (http://metaduck.com)"
, "contributors" :
@@ -32,7 +32,7 @@ module.exports.run = function(next) {
// close collection
collection.end(function(err) {
if (err) { next(err); return; }
-
+
// Now, let's plant some rotten eggs
fs.stat(COLL_PATH, function(err, stat) {
Oops, something went wrong.

0 comments on commit a931b42

Please sign in to comment.