|
|
@@ -570,6 +570,15 @@ Dat.prototype.createDiffStream = function (headA, headB, opts) { |
|
|
return 0 // nothing shared
|
|
|
}
|
|
|
|
|
|
var decode = function (obj) {
|
|
|
var i = obj.key.indexOf('!')
|
|
|
if (i > -1) {
|
|
|
obj.key = obj.key.slice(i + 1)
|
|
|
obj.dataset = obj.key.slice(0, i)
|
|
|
}
|
|
|
obj.value = self._encoding.decode(obj.value)
|
|
|
}
|
|
|
|
|
|
var fork = -1
|
|
|
var filter = function (data, enc, cb) {
|
|
|
if (fork === -1) fork = findFork()
|
|
|
@@ -580,8 +589,8 @@ Dat.prototype.createDiffStream = function (headA, headB, opts) { |
|
|
if (!a && !b) return cb()
|
|
|
if (!a && b.type === 'del') return cb()
|
|
|
if (!b && a.type === 'del') return cb()
|
|
|
if (a) a.value = self._encoding.decode(a.value)
|
|
|
if (b) b.value = self._encoding.decode(b.value)
|
|
|
if (a) decode(a)
|
|
|
if (b) decode(b)
|
|
|
cb(null, data)
|
|
|
}
|
|
|
|
|
|
@@ -648,29 +657,31 @@ Dat.prototype.createValueStream = function (opts) { |
|
|
return this.createReadStream(opts)
|
|
|
}
|
|
|
|
|
|
var toKey = function (data) {
|
|
|
return data.key.slice(data.key.lastIndexOf('!') + 1)
|
|
|
}
|
|
|
|
|
|
var emptyStream = function () {
|
|
|
var stream = through.obj()
|
|
|
stream.end()
|
|
|
return stream
|
|
|
}
|
|
|
|
|
|
var toKey = function (data) {
|
|
|
return data.key.slice(data.key.lastIndexOf(data.key.lastIndexOf('!') + 1) + 1)
|
|
|
}
|
|
|
|
|
|
Dat.prototype.createReadStream = function (opts) {
|
|
|
if (!this.opened) return this._createProxyStream(this.createReadStream, [opts])
|
|
|
if (!opts) opts = {}
|
|
|
|
|
|
var self = this
|
|
|
var justKeys = opts.keys && !opts.values
|
|
|
var justValues = opts.values && !opts.keys
|
|
|
var all = !!opts.all
|
|
|
var getOpts = all ? xtend(opts) : opts
|
|
|
|
|
|
if (!this._layers.length) return emptyStream()
|
|
|
|
|
|
var stream = this._layers
|
|
|
.map(function (layer) {
|
|
|
var prefix = '!latest!' + layer[1] + '!' + (opts.dataset || '') + '!'
|
|
|
var prefix = '!latest!' + layer[1] + '!' + (all ? '' : (opts.dataset || '') + '!')
|
|
|
var rs = self._index.data.createReadStream({
|
|
|
gt: prefix + (opts.gt || ''),
|
|
|
lt: prefix + (opts.lt || '\xff'),
|
|
|
@@ -685,13 +696,17 @@ Dat.prototype.createReadStream = function (opts) { |
|
|
})
|
|
|
|
|
|
var write = function (data, enc, cb) {
|
|
|
var key = data.key.slice(data.key.lastIndexOf('!') + 1)
|
|
|
var i = data.key.lastIndexOf('!')
|
|
|
var key = data.key.slice(i + 1)
|
|
|
var dataset = data.key.slice(data.key.lastIndexOf('!', i - 1) + 1, i)
|
|
|
|
|
|
self.get(key, opts, function (err, row) {
|
|
|
if (dataset) getOpts.dataset = dataset
|
|
|
self.get(key, getOpts, function (err, row) {
|
|
|
if (err && err.notFound) return cb()
|
|
|
if (err) return cb(err)
|
|
|
if (justKeys) return cb(null, key)
|
|
|
if (justValues) return cb(null, row.value)
|
|
|
if (all && dataset) row.key = dataset + '!' + row.key
|
|
|
cb(null, row)
|
|
|
})
|
|
|
}
|
|
|
|
0 comments on commit
9a52767