From a9c48e48c0d4b2767e7d973f36464a464dccb429 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 22 Apr 2016 22:52:44 +0200 Subject: [PATCH] fix(datastore): keep locks on writes --- package.json | 3 ++- src/stores/datastore.js | 15 ++++++++++++++- test/repo-test.js | 24 ++++++++++++++++++++++++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 5da4eef2..d40d6db1 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "bl": "^1.1.2", "concat-stream": "^1.5.1", "level-js": "^2.2.3", + "lock": "^0.1.2", "lockfile": "^1.0.1", "multihashes": "^0.2.1", "xtend": "^4.0.1" @@ -63,4 +64,4 @@ "dignifiedquire ", "greenkeeperio-bot " ] -} \ No newline at end of file +} diff --git a/src/stores/datastore.js b/src/stores/datastore.js index 83426c42..e09876fb 100644 --- a/src/stores/datastore.js +++ b/src/stores/datastore.js @@ -1,5 +1,8 @@ 'use strict' +const Lock = require('lock') +const stream = require('stream') + const PREFIX_LENGTH = 8 exports = module.exports @@ -15,6 +18,7 @@ function multihashToPath (multihash, extension) { exports.setUp = (basePath, blobStore, locks) => { const store = blobStore(basePath + '/blocks') + const lock = new Lock() return { createReadStream: (multihash, extension) => { @@ -29,8 +33,16 @@ exports.setUp = (basePath, blobStore, locks) => { } const path = multihashToPath(multihash, extension) - return store.createWriteStream(path, cb) + const through = stream.PassThrough() + + lock(path, (release) => { + const ws = store.createWriteStream(path, release(cb)) + through.pipe(ws) + }) + + return through }, + exists: (multihash, extension, cb) => { if (typeof extension === 'function') { cb = extension @@ -40,6 +52,7 @@ exports.setUp = (basePath, blobStore, locks) => { const path = multihashToPath(multihash, extension) return store.exists(path, cb) }, + remove: (multihash, extension, cb) => { if (typeof extension === 'function') { cb = extension diff --git a/test/repo-test.js b/test/repo-test.js index 97dd9db0..6adaa075 100644 --- a/test/repo-test.js +++ b/test/repo-test.js @@ -185,6 +185,30 @@ module.exports = function (repo) { }).end(data) }) + it('write locks', (done) => { + const rnd = 'QmVtU7ths96fMgZ8YSZAbKghyieq7AjxNdcqyVtesthash' + const mh = new Buffer(base58.decode(rnd)) + const data = new Buffer('Oh the data') + + let i = 0 + const finish = () => { + i++ + if (i === 2) done() + } + + repo.datastore.createWriteStream(mh, (err, metadata) => { + expect(err).to.not.exist + expect(metadata.key).to.equal('12207028/122070286b9afa6620a66f715c7020d68af3d10e1a497971629c07605f55537ce990.data') + finish() + }).end(data) + + repo.datastore.createWriteStream(mh, (err, metadata) => { + expect(err).to.not.exist + expect(metadata.key).to.equal('12207028/122070286b9afa6620a66f715c7020d68af3d10e1a497971629c07605f55537ce990.data') + finish() + }).end(data) + }) + it('block exists', function (done) { const buf = new Buffer(base58.decode(baseFileHash))