Skip to content
This repository has been archived by the owner on Jul 3, 2019. It is now read-only.

Commit

Permalink
fix(put-stream): robustified and predictibilized
Browse files Browse the repository at this point in the history
  • Loading branch information
zkat committed Feb 27, 2017
1 parent fb913fb commit daf9e08
Showing 1 changed file with 92 additions and 59 deletions.
151 changes: 92 additions & 59 deletions lib/content/put-stream.js
Expand Up @@ -4,106 +4,138 @@ const Promise = require('bluebird')

const checksumStream = require('checksum-stream')
const contentPath = require('./path')
const duplex = require('mississippi').duplex
const finished = require('mississippi').finished
const fixOwner = require('../util/fix-owner')
const fs = require('graceful-fs')
const hasContent = require('./read').hasContent
const moveFile = require('../util/move-file')
const path = require('path')
const pipeline = require('mississippi').pipeline
const pipe = require('mississippi').pipe
const rimraf = Promise.promisify(require('rimraf'))
const through = require('mississippi').through
const to = require('mississippi').to
const uniqueFilename = require('unique-filename')

const closeAsync = Promise.promisify(fs.close)

module.exports = putStream
function putStream (cache, opts) {
opts = opts || {}
const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix)
const inputStream = through()
let inputErr = false
function errCheck () {
if (inputErr) { throw inputErr }
}

const inputStream = duplex()
let allDone
const ret = to((c, n, cb) => {
if (!allDone) {
allDone = handleContent(inputStream, cache, opts, errCheck)
}
inputStream.write(c, n, cb)
}, cb => {
inputStream.end(() => {
if (!allDone) {
const e = new Error('Input stream was empty')
e.code = 'ENODATA'
return ret.emit('error', e)
}
allDone.then(digest => {
digest && ret.emit('digest', digest)
cb()
}, e => {
ret.emit('error', e)
})
})
})
ret.once('error', e => {
inputErr = e
})
return ret
}

hasContent(cache, opts.digest).then(exists => {
function handleContent (inputStream, cache, opts, errCheck) {
const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix)
return (
opts.digest
? hasContent(cache, opts.digest)
: Promise.resolve(false)
).then(exists => {
errCheck()
// Fast-path-shortcut this if it's already been written.
if (exists) {
inputStream.emit('digest', opts.digest)
// Slurp it up if they don't close the stream earlier.
inputStream.setWritable(to((c, en, cb) => cb()))
return
return new Promise((resolve, reject) => {
// Slurp it up if they don't close the stream earlier.
inputStream.on('data', () => {})
finished(inputStream, err => {
err ? reject(err) : resolve(opts.digest)
})
})
} else {
return fixOwner.mkdirfix(
path.dirname(tmpTarget), opts.uid, opts.gid
).then(() => {
errCheck()
const tmpWritten = pipeToTmp(
inputStream, cache, tmpTarget, opts, errCheck)
return Promise.using(tmpWritten, digest => {
errCheck()
return moveToDestination(
tmpTarget, cache, digest, opts, errCheck
).then(() => digest)
})
})
}
return fixOwner.mkdirfix(
path.dirname(tmpTarget), opts.uid, opts.gid
).then(() => (
Promise.using(pipeToTmp(inputStream, cache, tmpTarget, opts), digest => (
moveToDestination(tmpTarget, cache, digest, opts).then(() => (
inputStream.emit('digest', digest)
))
))
))
}).catch(err => (
inputStream.emit('error', err)
))

return inputStream
})
}

function pipeToTmp (inputStream, cache, tmpTarget, opts) {
function pipeToTmp (inputStream, cache, tmpTarget, opts, errCheck) {
let digest
let size
const hashStream = checksumStream({
digest: opts.digest,
algorithm: opts.hashAlgorithm,
size: opts.size
}).on('digest', d => {
digest = d
}).on('size', s => {
size = s
})

const outStream = fs.createWriteStream(tmpTarget, {
flags: 'w',
autoClose: false
})

let finishStream
return new Promise((resolve, reject) => {
const combined = pipeline(hashStream, to((c, en, cb) => {
outStream.write(c, en, cb)
}, cb => {
finishStream = cb
outStream.end()
let outStream = new Promise((resolve, reject) => {
errCheck()
resolve(fs.createWriteStream(tmpTarget, {
flags: 'wx'
}))
inputStream.setWritable(combined)
finished(outStream, err => {
// Make damn sure the fd is closed before we continue
(outStream.fd ? closeAsync(outStream.fd) : Promise.resolve())
.then(() => {
if (!size) {
const e = new Error('Input stream was empty')
e.code = 'ENODATA'
reject(e)
} else if (err) {
}).disposer(outStream => new Promise((resolve, reject) => {
if (!outStream.fd) { resolve() }
outStream.on('error', reject)
outStream.on('close', resolve)
}))
return Promise.using(outStream, outStream => {
errCheck()
return new Promise((resolve, reject) => {
errCheck()
inputStream.on('error', reject)
pipe(inputStream, hashStream, outStream, err => {
errCheck()
if (err) {
reject(err)
} else {
resolve()
resolve(digest)
}
})
})
}).then(() => digest).disposer(() => (
rimraf(tmpTarget).then(() => finishStream && finishStream())
))
}).disposer(() => {
return rimraf(tmpTarget)
})
}

function moveToDestination (tmpTarget, cache, digest, opts) {
function moveToDestination (tmpTarget, cache, digest, opts, errCheck) {
errCheck()
const destination = contentPath(cache, digest)
const destDir = path.dirname(destination)

return fixOwner.mkdirfix(
destDir, opts.uid, opts.gid
).then(() => (
new Promise((resolve, reject) => {
errCheck()
moveFile(tmpTarget, destination, err => {
if (err) {
reject(err)
Expand All @@ -112,7 +144,8 @@ function moveToDestination (tmpTarget, cache, digest, opts) {
}
})
})
)).then(() => (
fixOwner.chownr(destination, opts.uid, opts.gid)
))
)).then(() => {
errCheck()
return fixOwner.chownr(destination, opts.uid, opts.gid)
})
}

0 comments on commit daf9e08

Please sign in to comment.