Skip to content

Commit 51b536e

Browse files
committed
feat(perf): bulk content write api
Fixes: #55
1 parent fa8f3c3 commit 51b536e

File tree

4 files changed

+96
-77
lines changed

4 files changed

+96
-77
lines changed

lib/content/put-stream.js renamed to lib/content/write.js

Lines changed: 67 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@ const Promise = require('bluebird')
44

55
const checksumStream = require('checksum-stream')
66
const contentPath = require('./path')
7-
const finished = require('mississippi').finished
7+
const crypto = require('crypto')
88
const fixOwner = require('../util/fix-owner')
99
const fs = require('graceful-fs')
10-
const hasContent = require('./read').hasContent
1110
const moveFile = require('../util/move-file')
1211
const path = require('path')
1312
const pipe = require('mississippi').pipe
@@ -16,8 +15,31 @@ const through = require('mississippi').through
1615
const to = require('mississippi').to
1716
const uniqueFilename = require('unique-filename')
1817

19-
module.exports = putStream
20-
function putStream (cache, opts) {
18+
const writeFileAsync = Promise.promisify(fs.writeFile)
19+
20+
module.exports = write
21+
function write (cache, data, opts) {
22+
opts = opts || {}
23+
const digest = crypto.createHash(
24+
opts.hashAlgorithm || 'sha512'
25+
).update(data).digest('hex')
26+
if (typeof opts.size === 'number' && data.length !== opts.size) {
27+
return Promise.reject(sizeError(opts.size, data.length))
28+
}
29+
if (opts.digest && digest !== opts.digest) {
30+
return Promise.reject(checksumError(opts.digest, digest))
31+
}
32+
return Promise.using(makeTmp(cache, opts), tmp => (
33+
writeFileAsync(
34+
tmp.target, data, {flag: 'wx'}
35+
).then(() => (
36+
moveToDestination(tmp, cache, digest, opts)
37+
))
38+
)).then(() => digest)
39+
}
40+
41+
module.exports.stream = writeStream
42+
function writeStream (cache, opts) {
2143
opts = opts || {}
2244
const inputStream = through()
2345
let inputErr = false
@@ -53,37 +75,15 @@ function putStream (cache, opts) {
5375
}
5476

5577
function handleContent (inputStream, cache, opts, errCheck) {
56-
const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix)
57-
return (
58-
opts.digest
59-
? hasContent(cache, opts.digest)
60-
: Promise.resolve(false)
61-
).then(exists => {
78+
return Promise.using(makeTmp(cache, opts), tmp => {
6279
errCheck()
63-
// Fast-path-shortcut this if it's already been written.
64-
if (exists) {
65-
return new Promise((resolve, reject) => {
66-
// Slurp it up if they don't close the stream earlier.
67-
inputStream.on('data', () => {})
68-
finished(inputStream, err => {
69-
err ? reject(err) : resolve(opts.digest)
70-
})
71-
})
72-
} else {
73-
return fixOwner.mkdirfix(
74-
path.dirname(tmpTarget), opts.uid, opts.gid
75-
).then(() => {
76-
errCheck()
77-
const tmpWritten = pipeToTmp(
78-
inputStream, cache, tmpTarget, opts, errCheck)
79-
return Promise.using(tmpWritten, digest => {
80-
errCheck()
81-
return moveToDestination(
82-
tmpTarget, cache, digest, opts, errCheck
83-
).then(() => digest)
84-
})
85-
})
86-
}
80+
return pipeToTmp(
81+
inputStream, cache, tmp.target, opts, errCheck
82+
).then(digest => {
83+
return moveToDestination(
84+
tmp, cache, digest, opts, errCheck
85+
).then(() => digest)
86+
})
8787
})
8888
}
8989

@@ -102,11 +102,7 @@ function pipeToTmp (inputStream, cache, tmpTarget, opts, errCheck) {
102102
resolve(fs.createWriteStream(tmpTarget, {
103103
flags: 'wx'
104104
}))
105-
}).disposer(outStream => new Promise((resolve, reject) => {
106-
if (!outStream.fd) { resolve() }
107-
outStream.on('error', reject)
108-
outStream.on('close', resolve)
109-
}))
105+
})
110106
return Promise.using(outStream, outStream => {
111107
errCheck()
112108
return new Promise((resolve, reject) => {
@@ -121,23 +117,48 @@ function pipeToTmp (inputStream, cache, tmpTarget, opts, errCheck) {
121117
}
122118
})
123119
})
124-
}).disposer(() => {
125-
return rimraf(tmpTarget)
126120
})
127121
}
128122

129-
function moveToDestination (tmpTarget, cache, digest, opts, errCheck) {
130-
errCheck()
123+
function makeTmp (cache, opts) {
124+
const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix)
125+
return fixOwner.mkdirfix(
126+
path.dirname(tmpTarget), opts.uid, opts.gid
127+
).then(() => ({
128+
target: tmpTarget,
129+
moved: false
130+
})).disposer(tmp => (!tmp.moved && rimraf(tmp.target)))
131+
}
132+
133+
function moveToDestination (tmp, cache, digest, opts, errCheck) {
134+
errCheck && errCheck()
131135
const destination = contentPath(cache, digest, opts.hashAlgorithm)
132136
const destDir = path.dirname(destination)
133137

134138
return fixOwner.mkdirfix(
135139
destDir, opts.uid, opts.gid
136140
).then(() => {
137-
errCheck()
138-
return moveFile(tmpTarget, destination)
141+
errCheck && errCheck()
142+
return moveFile(tmp.target, destination)
139143
}).then(() => {
140-
errCheck()
144+
errCheck && errCheck()
145+
tmp.moved = true
141146
return fixOwner.chownr(destination, opts.uid, opts.gid)
142147
})
143148
}
149+
150+
function sizeError (expected, found) {
151+
var err = new Error('stream data size mismatch')
152+
err.expected = expected
153+
err.found = found
154+
err.code = 'EBADSIZE'
155+
return err
156+
}
157+
158+
function checksumError (expected, found) {
159+
var err = new Error('checksum failed')
160+
err.code = 'EBADCHECKSUM'
161+
err.expected = expected
162+
err.found = found
163+
return err
164+
}

put.js

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,28 @@
11
'use strict'
22

3-
const Promise = require('bluebird')
4-
53
const index = require('./lib/entry-index')
64
const memo = require('./lib/memoization')
7-
const pipe = Promise.promisify(require('mississippi').pipe)
8-
const putContent = require('./lib/content/put-stream')
9-
const through = require('mississippi').through
5+
const write = require('./lib/content/write')
106
const to = require('mississippi').to
117

128
module.exports = putData
139
function putData (cache, key, data, opts) {
1410
opts = opts || {}
15-
const src = through()
16-
let digest
17-
const dest = putStream(cache, key, opts)
18-
dest.on('digest', d => { digest = d })
19-
const ret = pipe(src, dest).then(() => digest)
20-
src.write(data, () => src.end())
21-
return ret
11+
return write(cache, data, opts).then(digest => {
12+
return index.insert(cache, key, digest, opts).then(entry => {
13+
if (opts.memoize) {
14+
memo.put(cache, entry, data)
15+
}
16+
return digest
17+
})
18+
})
2219
}
2320

2421
module.exports.stream = putStream
2522
function putStream (cache, key, opts) {
2623
opts = opts || {}
2724
let digest
28-
const contentStream = putContent(cache, opts).on('digest', function (d) {
25+
const contentStream = write.stream(cache, opts).on('digest', d => {
2926
digest = d
3027
})
3128
let memoData

test/content.put-stream.chownr.js renamed to test/content.write.chownr.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ test('allows setting a custom uid for cache contents on write', {
2020
var NEWUID = process.getuid() + 1
2121
var NEWGID = process.getgid() + 1
2222
var updatedPaths = []
23-
var putStream = requireInject('../lib/content/put-stream', {
23+
var write = requireInject('../lib/content/write', {
2424
chownr: function (p, uid, gid, cb) {
2525
process.nextTick(function () {
2626
t.equal(uid, NEWUID, 'new uid set')
@@ -31,7 +31,7 @@ test('allows setting a custom uid for cache contents on write', {
3131
}
3232
})
3333
t.plan(7)
34-
pipe(fromString(CONTENT), putStream(CACHE, {
34+
pipe(fromString(CONTENT), write.stream(CACHE, {
3535
uid: NEWUID,
3636
gid: NEWGID,
3737
hashAlgorithm: 'sha1'

test/content.put-stream.js renamed to test/content.write.js

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,16 @@ const CACHE = path.join(testDir, 'cache')
1414
const contentPath = require('../lib/content/path')
1515
const Dir = Tacks.Dir
1616
const File = Tacks.File
17-
const putStream = require('../lib/content/put-stream')
17+
18+
const write = require('../lib/content/write')
1819

1920
test('basic put', function (t) {
2021
const CONTENT = 'foobarbaz'
2122
// Default is sha512
2223
const DIGEST = crypto.createHash('sha512').update(CONTENT).digest('hex')
2324
let foundDigest
2425
const src = fromString(CONTENT)
25-
const stream = putStream(CACHE).on('digest', function (d) {
26+
const stream = write.stream(CACHE).on('digest', function (d) {
2627
foundDigest = d
2728
})
2829
pipe(src, stream, function (err) {
@@ -47,7 +48,7 @@ test('checks input digest doesn\'t match data', function (t) {
4748
t.plan(5)
4849
let foundDigest1
4950
let foundDigest2
50-
pipe(fromString('bazbarfoo'), putStream(CACHE, {
51+
pipe(fromString('bazbarfoo'), write.stream(CACHE, {
5152
digest: DIGEST
5253
}).on('digest', function (d) {
5354
foundDigest1 = d
@@ -56,7 +57,7 @@ test('checks input digest doesn\'t match data', function (t) {
5657
t.ok(!!err, 'got an error')
5758
t.equal(err.code, 'EBADCHECKSUM', 'returns a useful error code')
5859
})
59-
pipe(fromString(CONTENT), putStream(CACHE, {
60+
pipe(fromString(CONTENT), write.stream(CACHE, {
6061
digest: DIGEST
6162
}).on('digest', function (d) {
6263
foundDigest2 = d
@@ -68,7 +69,7 @@ test('checks input digest doesn\'t match data', function (t) {
6869

6970
test('errors if stream ends with no data', function (t) {
7071
let foundDigest = null
71-
pipe(fromString(''), putStream(CACHE).on('digest', function (d) {
72+
pipe(fromString(''), write.stream(CACHE).on('digest', function (d) {
7273
foundDigest = d
7374
}), function (err) {
7475
t.ok(err, 'got an error')
@@ -81,7 +82,7 @@ test('errors if stream ends with no data', function (t) {
8182
test('errors if input size does not match expected', function (t) {
8283
t.plan(10)
8384
let dig1 = null
84-
pipe(fromString('abc'), putStream(CACHE, {
85+
pipe(fromString('abc'), write.stream(CACHE, {
8586
size: 5
8687
}).on('digest', function (d) {
8788
dig1 = d
@@ -93,7 +94,7 @@ test('errors if input size does not match expected', function (t) {
9394
t.equal(err.found, 3, 'error includes found size')
9495
})
9596
let dig2 = null
96-
pipe(fromString('abcdefghi'), putStream(CACHE, {
97+
pipe(fromString('abcdefghi'), write.stream(CACHE, {
9798
size: 5
9899
}).on('digest', function (d) {
99100
dig2 = d
@@ -119,7 +120,7 @@ test('does not overwrite content if already on disk', function (t) {
119120
let dig1
120121
let dig2
121122
// With a digest -- early short-circuiting
122-
pipe(fromString(CONTENT), putStream(CACHE, {
123+
pipe(fromString(CONTENT), write.stream(CACHE, {
123124
digest: DIGEST
124125
}).on('digest', function (d) {
125126
dig1 = d
@@ -131,7 +132,7 @@ test('does not overwrite content if already on disk', function (t) {
131132
t.equal(d, 'nope', 'process short-circuited. Data not written.')
132133
})
133134
})
134-
pipe(fromString(CONTENT), putStream(CACHE).on('digest', function (d) {
135+
pipe(fromString(CONTENT), write.stream(CACHE).on('digest', function (d) {
135136
dig2 = d
136137
}), function (err) {
137138
if (err) { throw err }
@@ -147,7 +148,7 @@ test('errors if input stream errors', function (t) {
147148
const stream = fromString('foobarbaz')
148149
.on('end', () => stream.emit('error', new Error('bleh')))
149150
let foundDigest
150-
const putter = putStream(CACHE).on('digest', function (d) {
151+
const putter = write.stream(CACHE).on('digest', function (d) {
151152
foundDigest = d
152153
})
153154
pipe(stream, putter, function (err) {
@@ -177,7 +178,7 @@ test('exits normally if file already open', function (t) {
177178
// Generally, you'd get an EBUSY back.
178179
fs.open(PATH, 'r+', function (err, fd) {
179180
if (err) { throw err }
180-
pipe(fromString(CONTENT), putStream(CACHE).on('digest', function (d) {
181+
pipe(fromString(CONTENT), write.stream(CACHE).on('digest', function (d) {
181182
foundDigest = d
182183
}), function (err) {
183184
if (err) { throw err }
@@ -195,7 +196,7 @@ test('exits normally if file already open', function (t) {
195196

196197
test('cleans up tmp on successful completion', function (t) {
197198
const CONTENT = 'foobarbaz'
198-
pipe(fromString(CONTENT), putStream(CACHE), function (err) {
199+
pipe(fromString(CONTENT), write.stream(CACHE), function (err) {
199200
if (err) { throw err }
200201
const tmp = path.join(CACHE, 'tmp')
201202
fs.readdir(tmp, function (err, files) {
@@ -212,7 +213,7 @@ test('cleans up tmp on successful completion', function (t) {
212213

213214
test('cleans up tmp on error', function (t) {
214215
const CONTENT = 'foobarbaz'
215-
pipe(fromString(CONTENT), putStream(CACHE, { size: 1 }), function (err) {
216+
pipe(fromString(CONTENT), write.stream(CACHE, { size: 1 }), function (err) {
216217
t.ok(err, 'got an error')
217218
t.equal(err.code, 'EBADSIZE', 'got expected code')
218219
const tmp = path.join(CACHE, 'tmp')
@@ -234,7 +235,7 @@ test('checks the size of stream data if opts.size provided', function (t) {
234235
t.plan(8)
235236
pipe(
236237
fromString(CONTENT.slice(3)),
237-
putStream(CACHE, {
238+
write.stream(CACHE, {
238239
size: CONTENT.length
239240
}).on('digest', function (d) { dig1 = d }),
240241
function (err) {
@@ -245,7 +246,7 @@ test('checks the size of stream data if opts.size provided', function (t) {
245246
)
246247
pipe(
247248
fromString(CONTENT + 'quux'),
248-
putStream(CACHE, {
249+
write.stream(CACHE, {
249250
size: CONTENT.length
250251
}).on('digest', function (d) { dig2 = d }),
251252
function (err) {
@@ -256,7 +257,7 @@ test('checks the size of stream data if opts.size provided', function (t) {
256257
)
257258
pipe(
258259
fromString(CONTENT),
259-
putStream(CACHE, {
260+
write.stream(CACHE, {
260261
size: CONTENT.length
261262
}).on('digest', function (d) { dig3 = d }),
262263
function (err) {

0 commit comments

Comments
 (0)