Skip to content

Commit daf9e08

Browse files
committed
fix(put-stream): robustified and predictibilized
1 parent fb913fb commit daf9e08

File tree

1 file changed

+92
-59
lines changed

1 file changed

+92
-59
lines changed

lib/content/put-stream.js

Lines changed: 92 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -4,106 +4,138 @@ const Promise = require('bluebird')
44

55
const checksumStream = require('checksum-stream')
66
const contentPath = require('./path')
7-
const duplex = require('mississippi').duplex
87
const finished = require('mississippi').finished
98
const fixOwner = require('../util/fix-owner')
109
const fs = require('graceful-fs')
1110
const hasContent = require('./read').hasContent
1211
const moveFile = require('../util/move-file')
1312
const path = require('path')
14-
const pipeline = require('mississippi').pipeline
13+
const pipe = require('mississippi').pipe
1514
const rimraf = Promise.promisify(require('rimraf'))
15+
const through = require('mississippi').through
1616
const to = require('mississippi').to
1717
const uniqueFilename = require('unique-filename')
1818

19-
const closeAsync = Promise.promisify(fs.close)
20-
2119
module.exports = putStream
2220
function putStream (cache, opts) {
2321
opts = opts || {}
24-
const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix)
22+
const inputStream = through()
23+
let inputErr = false
24+
function errCheck () {
25+
if (inputErr) { throw inputErr }
26+
}
2527

26-
const inputStream = duplex()
28+
let allDone
29+
const ret = to((c, n, cb) => {
30+
if (!allDone) {
31+
allDone = handleContent(inputStream, cache, opts, errCheck)
32+
}
33+
inputStream.write(c, n, cb)
34+
}, cb => {
35+
inputStream.end(() => {
36+
if (!allDone) {
37+
const e = new Error('Input stream was empty')
38+
e.code = 'ENODATA'
39+
return ret.emit('error', e)
40+
}
41+
allDone.then(digest => {
42+
digest && ret.emit('digest', digest)
43+
cb()
44+
}, e => {
45+
ret.emit('error', e)
46+
})
47+
})
48+
})
49+
ret.once('error', e => {
50+
inputErr = e
51+
})
52+
return ret
53+
}
2754

28-
hasContent(cache, opts.digest).then(exists => {
55+
function handleContent (inputStream, cache, opts, errCheck) {
56+
const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix)
57+
return (
58+
opts.digest
59+
? hasContent(cache, opts.digest)
60+
: Promise.resolve(false)
61+
).then(exists => {
62+
errCheck()
2963
// Fast-path-shortcut this if it's already been written.
3064
if (exists) {
31-
inputStream.emit('digest', opts.digest)
32-
// Slurp it up if they don't close the stream earlier.
33-
inputStream.setWritable(to((c, en, cb) => cb()))
34-
return
65+
return new Promise((resolve, reject) => {
66+
// Slurp it up if they don't close the stream earlier.
67+
inputStream.on('data', () => {})
68+
finished(inputStream, err => {
69+
err ? reject(err) : resolve(opts.digest)
70+
})
71+
})
72+
} else {
73+
return fixOwner.mkdirfix(
74+
path.dirname(tmpTarget), opts.uid, opts.gid
75+
).then(() => {
76+
errCheck()
77+
const tmpWritten = pipeToTmp(
78+
inputStream, cache, tmpTarget, opts, errCheck)
79+
return Promise.using(tmpWritten, digest => {
80+
errCheck()
81+
return moveToDestination(
82+
tmpTarget, cache, digest, opts, errCheck
83+
).then(() => digest)
84+
})
85+
})
3586
}
36-
return fixOwner.mkdirfix(
37-
path.dirname(tmpTarget), opts.uid, opts.gid
38-
).then(() => (
39-
Promise.using(pipeToTmp(inputStream, cache, tmpTarget, opts), digest => (
40-
moveToDestination(tmpTarget, cache, digest, opts).then(() => (
41-
inputStream.emit('digest', digest)
42-
))
43-
))
44-
))
45-
}).catch(err => (
46-
inputStream.emit('error', err)
47-
))
48-
49-
return inputStream
87+
})
5088
}
5189

52-
function pipeToTmp (inputStream, cache, tmpTarget, opts) {
90+
function pipeToTmp (inputStream, cache, tmpTarget, opts, errCheck) {
5391
let digest
54-
let size
5592
const hashStream = checksumStream({
5693
digest: opts.digest,
5794
algorithm: opts.hashAlgorithm,
5895
size: opts.size
5996
}).on('digest', d => {
6097
digest = d
61-
}).on('size', s => {
62-
size = s
63-
})
64-
65-
const outStream = fs.createWriteStream(tmpTarget, {
66-
flags: 'w',
67-
autoClose: false
6898
})
6999

70-
let finishStream
71-
return new Promise((resolve, reject) => {
72-
const combined = pipeline(hashStream, to((c, en, cb) => {
73-
outStream.write(c, en, cb)
74-
}, cb => {
75-
finishStream = cb
76-
outStream.end()
100+
let outStream = new Promise((resolve, reject) => {
101+
errCheck()
102+
resolve(fs.createWriteStream(tmpTarget, {
103+
flags: 'wx'
77104
}))
78-
inputStream.setWritable(combined)
79-
finished(outStream, err => {
80-
// Make damn sure the fd is closed before we continue
81-
(outStream.fd ? closeAsync(outStream.fd) : Promise.resolve())
82-
.then(() => {
83-
if (!size) {
84-
const e = new Error('Input stream was empty')
85-
e.code = 'ENODATA'
86-
reject(e)
87-
} else if (err) {
105+
}).disposer(outStream => new Promise((resolve, reject) => {
106+
if (!outStream.fd) { resolve() }
107+
outStream.on('error', reject)
108+
outStream.on('close', resolve)
109+
}))
110+
return Promise.using(outStream, outStream => {
111+
errCheck()
112+
return new Promise((resolve, reject) => {
113+
errCheck()
114+
inputStream.on('error', reject)
115+
pipe(inputStream, hashStream, outStream, err => {
116+
errCheck()
117+
if (err) {
88118
reject(err)
89119
} else {
90-
resolve()
120+
resolve(digest)
91121
}
92122
})
93123
})
94-
}).then(() => digest).disposer(() => (
95-
rimraf(tmpTarget).then(() => finishStream && finishStream())
96-
))
124+
}).disposer(() => {
125+
return rimraf(tmpTarget)
126+
})
97127
}
98128

99-
function moveToDestination (tmpTarget, cache, digest, opts) {
129+
function moveToDestination (tmpTarget, cache, digest, opts, errCheck) {
130+
errCheck()
100131
const destination = contentPath(cache, digest)
101132
const destDir = path.dirname(destination)
102133

103134
return fixOwner.mkdirfix(
104135
destDir, opts.uid, opts.gid
105136
).then(() => (
106137
new Promise((resolve, reject) => {
138+
errCheck()
107139
moveFile(tmpTarget, destination, err => {
108140
if (err) {
109141
reject(err)
@@ -112,7 +144,8 @@ function moveToDestination (tmpTarget, cache, digest, opts) {
112144
}
113145
})
114146
})
115-
)).then(() => (
116-
fixOwner.chownr(destination, opts.uid, opts.gid)
117-
))
147+
)).then(() => {
148+
errCheck()
149+
return fixOwner.chownr(destination, opts.uid, opts.gid)
150+
})
118151
}

0 commit comments

Comments
 (0)