Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
  • 3 commits
  • 2 files changed
  • 0 commit comments
  • 1 contributor
Showing with 66 additions and 9 deletions.
  1. +65 −8 index.js
  2. +1 −1 package.json
View
@@ -1,3 +1,4 @@
var after = require('after-all')
var events = require('events')
var framedHash = require('framed-hash')
var batcher = require('byte-stream')
@@ -357,6 +358,7 @@ Dat.prototype.get = function (key, opts, cb) { // TODO: refactor me
if (!self._layerKey) return cb(notFound(key))
var valueEncoding = self._getValueEncoding(opts.valueEncoding)
var onoperation = function (err, op, node) {
if (err) return cb(err)
cb(null, {
@@ -372,7 +374,7 @@ Dat.prototype.get = function (key, opts, cb) { // TODO: refactor me
var onpointer = function (err, ptr) {
if (err) return cb(err)
if (opts.change) return self._getOperation(ptr, onoperation) // TODO: does this trigger deopt since it changes return type?
self._getPointerEntry(ptr, cb)
self._getPointerEntry(ptr, valueEncoding, cb)
}
// TODO: fix rc where *latest* is updated in the middle of below block
@@ -448,16 +450,15 @@ Dat.prototype._getOperation = function (ptr, cb) {
})
}
Dat.prototype._getPointerEntry = function (ptr, cb) {
var self = this
Dat.prototype._getPointerEntry = function (ptr, encoding, cb) {
this._getOperation(ptr, function (err, entry, node) {
if (err) return cb(err)
if (entry.type === DELETE) return cb(notFound(entry.key))
cb(null, {
content: entry.content === FILE ? 'file' : 'row',
key: entry.key,
version: node.key,
value: entry.value && (entry.content === FILE ? messages.File.decode(entry.value) : self._valueEncoding.decode(entry.value))
value: entry.value && (entry.content === FILE ? messages.File.decode(entry.value) : encoding.decode(entry.value))
})
})
}
@@ -672,9 +673,56 @@ Dat.prototype.createWriteStream = function (opts) {
var prev
var first = true
var checkout = this.checkout(this.head)
var binaryEncoding = encoding('binary')
var cmp = function (a, b) {
if (a.length !== b.length) return false
for (var i = 0; i < a.length; i++) {
if (a[i] !== b[i]) return false
}
return true
}
var deduplicate = function (batch, enc, cb) {
batch = batch.map(toOperation)
var next = after(function (err) {
if (err) return cb(err)
batch = batch.filter(function (item) {
if (item) {
if (item.type === DELETE) stream.progress.deletes++
else stream.progress.puts++
stream.emit('progress')
}
return item
})
if (!batch.length) return cb()
cb(null, batch)
})
batch.forEach(function (item, i) {
var n = next()
checkout.get(item.key, {valueEncoding: binaryEncoding, dataset: item.dataset}, function (err, data) {
if (err && err.notFound && item.type === DELETE) {
batch[i] = null
return n()
}
if (!err && item.type === PUT && cmp(data.value, item.value)) {
batch[i] = null
return n()
}
n()
})
})
}
var write = function (batch, enc, cb) {
self._commit(null, DATA, batch.map(toOperation), opts.message, function (err) {
self._commit(null, DATA, batch, opts.message, function (err) {
cb(err)
})
}
@@ -686,7 +734,7 @@ Dat.prototype.createWriteStream = function (opts) {
} else {
var type = first ? TRANSACTION_START : TRANSACTION_DATA
first = false
self._commit(null, type, prev.map(toOperation), null, function (err) {
self._commit(null, type, prev, null, function (err) {
prev = batch
cb(err)
})
@@ -696,7 +744,7 @@ Dat.prototype.createWriteStream = function (opts) {
var endTransaction = function (cb) {
if (!prev) return cb()
stream.cork()
self._commit(null, first ? DATA : TRANSACTION_END, prev.map(toOperation), opts.message, function (err) {
self._commit(null, first ? DATA : TRANSACTION_END, prev, opts.message, function (err) {
stream.uncork()
cb(err)
})
@@ -706,7 +754,10 @@ Dat.prototype.createWriteStream = function (opts) {
through.obj({highWaterMark: 0}, writeTransaction, endTransaction) :
through.obj({highWaterMark: 1}, write)
var stream = pumpify.obj(batcher({limit: opts.batchSize || 128}), writer)
var stream = pumpify.obj(batcher({limit: opts.batchSize || 128}), through.obj(deduplicate), writer)
stream.progress = {puts: 0, deletes: 0}
return stream
}
@@ -944,12 +995,18 @@ Dat.prototype.push = function (opts) {
Dat.prototype._createProxyStream = function (method, args) {
var proxy = duplexify.obj()
proxy.progress = {}
this.open(function (err, self) {
if (err) return proxy.destroy(err)
if (proxy.destroyed) return proxy.destroy()
var stream = method.apply(self, args)
stream.on('progress', function () {
proxy.progress = stream.progress
})
stream.on('pull', function () {
proxy.emit('pull')
})
View
@@ -1,6 +1,6 @@
{
"name": "dat-core",
"version": "3.5.0",
"version": "3.6.0",
"description": "dat core",
"main": "index.js",
"dependencies": {

No commit comments for this range