diff --git a/hash_object.js b/hash_object.js new file mode 100644 index 0000000..04ca816 --- /dev/null +++ b/hash_object.js @@ -0,0 +1,50 @@ +var crypto = require('crypto') + +module.exports = function(object, algo){ + var string = canonicalJSON(object) + if (algo === 'djb2'){ + return djb2(string).toString(36) + } else { + return crypto.createHash(algo || 'sha1').update(string).digest('base64') + } +} + +function djb2(str) { + var hash = 5381 + for (i = 0; i < str.length; i++) { + char = str.charCodeAt(i) + hash = ((hash << 5) + hash) + char + } + return hash +} + +function canonicalJSON(object){ + if (object == null){ + return 'null' + } + + var object = object.valueOf() + + if (object instanceof Array){ + var result = "[" + + object.forEach(function(value, i){ + if (i > 0) result += ',' + result += canonicalJSON(value) + }) + + return result + ']' + } else if (object instanceof Object){ + var result = "{" + + Object.keys(object).sort().forEach(function(key, i){ + var value = object[key] + if (i > 0) result += ',' + result += JSON.stringify(key) + ':' + canonicalJSON(value) + }) + + return result + '}' + } else { + return JSON.stringify(object) + } +} \ No newline at end of file diff --git a/index.js b/index.js index 757f6de..d5f1051 100644 --- a/index.js +++ b/index.js @@ -1,15 +1,14 @@ var JsonContext = require('json-context') -var LevelMap = require('level-map') - -var checkFilter = require('json-filter') - +var SubLevel = require('level-sublevel') +var MatchMap = require('./match_map') var TimeoutMap = require('./timeout-map') +var EventEmitter = require("events").EventEmitter +var checkFilter = require('json-filter') var async = require('async') -var EventEmitter = require("events").EventEmitter -module.exports = function(db, options){ +module.exports = function(levelDB, options){ var rootOptions = options || {} @@ -25,87 +24,108 @@ module.exports = function(db, options){ rootOptions.timestamps = true } - LevelMap(db) - db.queue.delay = 100 + var db = SubLevel(levelDB) + var metaDB = db.sublevel('meta') + var matchDb = MatchMap(db, rootOptions.matchers) - var contextDB = new EventEmitter() - contextDB.db = db + var self = new EventEmitter() + self.db = db - var matcherLookup = {} - var matcherParamLookup = {} - var views = {} + matchDb.on('reindex', function(){ + self.emit('reindex') + }) // incrementing values var currentIncementValue = 0 if (rootOptions.incrementingKey){ - db.get('\xFFincrement~' + rootOptions.incrementingKey, function(err, val){ + metaDB.get('increment!' + rootOptions.incrementingKey, function(err, val){ currentIncementValue = parseInt(val, 10) || 0 }) } function incrementKey(){ currentIncementValue += 1 - db.put('\xFFincrement~' + rootOptions.incrementingKey, currentIncementValue) + metaDB.put('increment!' + rootOptions.incrementingKey, currentIncementValue) return currentIncementValue } function getObjectKey(object){ + if (!checkValid(object[rootOptions.primaryKey])){ + throw new Error("id can't contain '!' or '~") + } if (rootOptions.incrementingKey){ var ref = parseInt(object[rootOptions.incrementingKey]) - return padNumber(ref, 10) + ':' + object[rootOptions.primaryKey] + return padNumber(ref, 10) + '!' + object[rootOptions.primaryKey] } else { - return padNumber(0, 10) + ':' + object[rootOptions.primaryKey] + return padNumber(0, 10) + '!' + object[rootOptions.primaryKey] } } - rootOptions.matchers.forEach(function(matcher){ - var paramifiedMatch = paramify(matcher.match) - var map = getMapFromMatcher(matcher.ref, paramifiedMatch) - db.map.add(map) - matcherParamLookup[matcher.ref] = paramifiedMatch.params - matcherLookup[matcher.ref] = matcher - views[matcher.ref] = map - }) + function checkValid(key){ + return !~key.indexOf('~') && !~key.indexOf('!') && !~key.indexOf('\xFF') && !~key.indexOf('\0') + } + + self.applyChange = function(object, changeInfo, cb){ + self.applyChanges([object], changeInfo, cb) + return object + } + + // batch version + self.applyChanges = function(objects, changeInfo, cb){ + + if (typeof changeInfo === 'function'){ + cb = changeInfo + changeInfo = {} + } - contextDB.applyChange = function(object, changeInfo){ changeInfo = changeInfo || {} - if (changeInfo.source !== contextDB){ + if (changeInfo.source !== self){ - if (rootOptions.incrementingKey){ - if (!object[rootOptions.incrementingKey]){ - object[rootOptions.incrementingKey] = incrementKey() - } - } + try{ - if (rootOptions.timestamps){ - object.updated_at = Date.now() - object.created_at = object.created_at || Date.now() - if (object._deleted){ - object.deleted_at = Date.now() - } - } + var changes = objects.map(function(object){ + if (rootOptions.incrementingKey){ + if (!object[rootOptions.incrementingKey]){ + object[rootOptions.incrementingKey] = incrementKey() + } + } + + if (rootOptions.timestamps){ + object.updated_at = Date.now() + object.created_at = object.created_at || Date.now() + if (object._deleted){ + object.deleted_at = Date.now() + } + } + + var key = getObjectKey(object) + if (object._deleted){ + deleted.set(key, object) + } - var key = getObjectKey(object) + return {type: 'put', key: key, value: object} + }) - if (object._deleted){ - deleted.set(key, object) + db.batch(changes, cb) + + } catch (ex){ + cb&&cb(ex) } - db.put(key, object) - return object } } - contextDB.generate = function(options, callback){ + self.generate = function(options, callback){ var data = options.data || {} var matcherRefs = options.matcherRefs || [] try { - var matchers = matcherRefs.map(function(ref){ - if (!matcherLookup[ref]){ + var matchers = matcherRefs.map(function(ref){ + var matcher = matchDb.lookupMatcher(ref) + if (!matcher){ throw new Error('No matcher called ' + ref) } - return matcherLookup[ref] + return matcher }) } catch (ex){ return callback(ex) @@ -125,124 +145,92 @@ module.exports = function(db, options){ context.removeAllListeners() } + context.getChanges = function(cb){ + var result = [] + async.eachSeries(context.matchers, function(matcher, next){ + matchDb.createMatchStream(matcher.ref, context, {tail: false}).on('data', function(data){ + result.push(data.value) + }).on('end', next).on('error', next) + }, function(err){ + if (err) return cb&&cb(err) + cb&&cb(null, result) + }) + } + context.emitChangesSince = function(timestamp){ if (rootOptions.timestamps){ context.matchers.forEach(function(matcher){ - if (matcher.collection){ - - matcherStream(matcher.ref, context, {tail: false}).on('data', function(data){ - if (!data.updated_at || data.updated_at > timestamp){ - context.emit('change', data.value, { - matcher: matcher, - source: contextDB, - verfiedChange: true, - time: data.updated_at || Date.now() - }) - } - }) - - deletedStream(matcher.ref, context, timestamp, {tail: false}).on('data', function(data){ + matchDb.createMatchStream(matcher.ref, context, {tail: false}).on('data', function(data){ + if (!data.updated_at || data.updated_at > timestamp){ context.emit('change', data.value, { - source: contextDB, - action: 'remove', - verifiedChange: true, - matcher: matcher, - time: data.deleted_at || Date.now() + matcher: matcher, + source: self, + verfiedChange: true, + time: data.updated_at || Date.now() }) + } + }) + + matchDb.createMatchStream(matcher.ref, context, {tail: false, deletedSince: timestamp}).on('data', function(data){ + context.emit('change', data.value, { + source: self, + action: 'remove', + verifiedChange: true, + matcher: matcher, + time: data.deleted_at || Date.now() }) - - } + }) }) } } async.eachSeries(matchers, function(matcher, next){ - streams.push(matcherStream(matcher.ref, context).on('data', function(data){ - var key = data.key[data.key.length-1] - var object = data.value || deleted.get(key) + streams.push(matchDb.createMatchStream(matcher.ref, context).on('data', function(data){ + var object = data.value + if (!object){ + var split = data.key.split('~') + object = deleted.get(split[split.length-1]) + } if (object){ var time = object._deleted ? object.deleted_at : object.updated_at - context.pushChange(object, {matcher: matcher, source: contextDB, verifiedChange: true, time: time}) + context.pushChange(object, {matcher: matcher, source: self, verifiedChange: true, time: time}) } - + }).once('sync', function(){ next() })) }, function(err){ if(err)return callback&&callback(err) process.nextTick(function(){ - context.on('change', contextDB.applyChange) - callback(null, context) + context.on('change', self.applyChange) + context.emit('sync') + callback&&callback(null, context) }) }) - } - function matcherStream(matcherRef, context, options){ - var params = getParamsFrom(matcherParamLookup[matcherRef], context) - return db.map.view(mergeClone(options, {name: matcherRef, start: params.concat('')})) + return context } - function deletedStream(matcherRef, context, since, options){ - var params = getParamsFrom(matcherParamLookup[matcherRef], context).concat('DEL') - var startParams = params.concat(alphaKey(since), '') - var endParams = params.concat(alphaKey(Date.now()+100), '') + self.getByMatcher = function(matcherRef, params, cb){ + // should replace with QueryContext + var context = JsonContext({data: params, dataFilters: dataFilters}) + var result = [] - return db.map.view(mergeClone(options, {name: matcherRef, start: startParams, end: endParams})) + matchDb.createMatchStream(matcherRef, context, {tail: false}).on('data', function(data){ + result.push(data.value) + }).on('end', function(){ + cb(null, result) + }).on('error', cb) } - return contextDB + self.forceIndex = matchDb.forceIndex -} + return self -function isParam(object){ - return object instanceof Object && object.$query -} - -function getParamsFrom(params, context){ - return params.map(function(param){ - return context.get(param.query) - }) -} - -function paramify(match){ - var params = [] - var ensure = {} - - Object.keys(match).sort().forEach(function(key){ - var value = match[key] - if (isParam(value)){ - params.push({key: key, query: value.$query}) - } else { - ensure[key] = value - } - }) - - return { - params: params, - ensure: ensure - } -} - -function getMapFromMatcher(name, paramifiedMatch){ - return { - name: name, - map: function(key, value, emit){ - if (Object.keys(paramifiedMatch.ensure).length === 0 || checkFilter(value, paramifiedMatch.ensure, {match: 'filter'})){ - var newKey = paramifiedMatch.params.map(function(param){ - return value[param.key] || null - }) - if (value._deleted){ - newKey.push('DEL') - newKey.push(alphaKey(value.deleted_at || Date.now(), 8)) - } - emit(newKey, value) - } - } - } } function padNumber(number, pad) { @@ -250,14 +238,6 @@ function padNumber(number, pad) { return number < N ? ("" + (N + number)).slice(1) : "" + number } -function alphaKey(number, pad) { - var N = Math.pow(36, pad); - return number < N ? ((N + number).toString(36)).slice(1) : "" + number.toString(36) -} -function parseAlphaKey(string){ - return parseInt(string, 36) -} - function mergeClone(){ var result = {} for (var i=0;i