Skip to content

Commit

Permalink
fix(caching): a bunch of cache-related fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
zkat committed Apr 1, 2017
1 parent 9528442 commit 8ebda1d
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 93 deletions.
181 changes: 114 additions & 67 deletions cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,20 @@ module.exports = class Cache {
// Returns a Promise that resolves to the response associated with the first
// matching request in the Cache object.
match (request, opts) {
request = new fetch.Request(request)
return cacache.get.info(this._path, cacheKey(request)).then(info => {
if (info && matchDetails(request, info.metadata, opts)) {
const req = new fetch.Request(request)
return cacache.get.info(this._path, cacheKey(req)).then(info => {
if (info && matchDetails(req, {
url: info.metadata.url,
reqHeaders: new fetch.Headers(info.metadata.reqHeaders),
resHeaders: new fetch.Headers(info.metadata.resHeaders)
})) {
if (req.method === 'HEAD') {
return new fetch.Response(null, {
url: req.url,
headers: info.metadata.resHeaders,
status: 200
})
}
return new this.Promise((resolve, reject) => {
fs.stat(info.path, (err, stat) => {
if (err) {
Expand All @@ -50,26 +61,37 @@ module.exports = class Cache {
}
})
}).then(stat => {
let body
if (stat.size > MAX_MEM_SIZE) {
body = cacache.get.stream.byDigest(this._path, info.digest, {
hashAlgorithm: info.hashAlgorithm
})
} else {
// cacache is much faster at bulk reads
body = through()
cacache.get.byDigest(this._path, info.digest, {
hashAlgorithm: info.hashAlgorithm,
memoize: true
}).then(data => {
body.write(data, () => {
body.end()
})
}, err => body.emit('error', err))
}
const cachePath = this._path
let disturbed = false
// avoid opening cache file handles until a user actually tries to
// read from it.
const body = through((chunk, enc, cb) => {
if (disturbed) {
cb(null, chunk, enc)
} else {
disturbed = true
if (stat.size > MAX_MEM_SIZE) {
pipe(cacache.get.stream.byDigest(cachePath, info.digest, {
hashAlgorithm: info.hashAlgorithm
}), body, () => {})
} else {
// cacache is much faster at bulk reads
cacache.get.byDigest(cachePath, info.digest, {
hashAlgorithm: info.hashAlgorithm,
memoize: true
}).then(data => {
body.write(data, () => {
body.end()
})
}, err => body.emit('error', err))
}
cb() // throw away dummy data
}
})
body.write('dummy')
return new fetch.Response(body, {
url: request.url,
headers: info.metadata.headers,
url: req.url,
headers: info.metadata.resHeaders,
status: 200,
size: stat.size
})
Expand All @@ -80,56 +102,74 @@ module.exports = class Cache {
})
}

// Returns a Promise that resolves to an array of all matching requests in
// the Cache object.
matchAll (request, options) {
return this.Promise.reject(new Error('Cache.matchAll not implemented'))
}

// Takes a URL, retrieves it and adds the resulting response object to the
// given cache. This is fuctionally equivalent to calling fetch(), then using
// Cache.put() to add the results to the cache.
add (request) {
return this.Promise.reject(new Error('Cache.add not implemented'))
}

// Takes an array of URLs, retrieves them, and adds the resulting response
// objects to the given cache.
addAll (requests) {
return this.Promise.reject(new Error('Cache.addAll not implemented'))
}

// Takes both a request and its response and adds it to the given cache.
put (request, response) {
const req = new fetch.Request(request)
const size = response.headers.get('content-length')
const fitInMemory = !!size && size < MAX_MEM_SIZE
const warningCode = (response.headers.get('Warning') || '').match(/^\d+/)
if (warningCode && +warningCode >= 100 && +warningCode < 200) {
// https://tools.ietf.org/html/rfc7234#section-4.3.4
response.headers.delete('Warning')
}
const opts = {
metadata: {
url: request.url,
headers: response.headers.raw()
url: req.url,
reqHeaders: req.headers.raw(),
resHeaders: response.headers.raw()
},
uid: this._uid,
gid: this._gid,
size,
memoize: fitInMemory
}
if (req.method === 'HEAD' || response.status === 304) {
// Update metadata without writing
return cacache.get.info(this._path, cacheKey(req)).then(info => {
// Providing these will bypass content write
opts.hashAlgorithm = info.hashAlgorithm
opts.digest = info.digest
return new this.Promise((resolve, reject) => {
pipe(
cacache.get.stream.byDigest(this._path, info.digest, opts),
cacache.put.stream(this._path, cacheKey(req), opts),
err => err ? reject(err) : resolve(response)
)
})
}).then(() => response)
}
let buf = []
let bufSize = 0
let cacheStream = fitInMemory
? to({highWaterMark: MAX_MEM_SIZE}, (chunk, enc, cb) => {
buf.push(chunk)
bufSize += chunk.length
cb()
let cacheTargetStream = false
const cachePath = this._path
let cacheStream = to((chunk, enc, cb) => {
if (!cacheTargetStream) {
if (fitInMemory) {
cacheTargetStream =
to({highWaterMark: MAX_MEM_SIZE}, (chunk, enc, cb) => {
buf.push(chunk)
bufSize += chunk.length
cb()
}, done => {
cacache.put(
cachePath,
cacheKey(req),
Buffer.concat(buf, bufSize),
opts
).then(
() => done(),
done
)
})
} else {
cacheTargetStream =
cacache.put.stream(cachePath, cacheKey(req), opts)
}
}
cacheTargetStream.write(chunk, enc, cb)
}, done => {
cacache.put(
this._path,
cacheKey(req),
Buffer.concat(buf, bufSize),
opts
).then(() => done(), done)
cacheTargetStream ? cacheTargetStream.end(done) : done()
})
: cacache.put.stream(this._path, cacheKey(req), opts)
const oldBody = response.body
const newBody = through({highWaterMark: fitInMemory && MAX_MEM_SIZE})
response.body = newBody
Expand All @@ -141,7 +181,11 @@ module.exports = class Cache {
newBody.write(chunk, enc, cb)
})
}, done => {
cacheStream.end(() => newBody.end(done))
cacheStream.end(() => {
newBody.end(() => {
done()
})
})
}), err => err && newBody.emit('error', err))
return response
}
Expand All @@ -153,26 +197,29 @@ module.exports = class Cache {
const req = new fetch.Request(request)
return cacache.rm.entry(
this._path,
cacheKey(req.url)
cacheKey(req)
// TODO - true/false
).then(() => false)
}

keys (request, options) {
return cacache.ls(this._path).then(entries => Object.keys(entries))
}
}

function matchDetails (req, cached, opts) {
const reqUrl = url.parse(req.url)
const cacheUrl = url.parse(cached.url)
if (!(opts && opts.ignoreSearch) && (cacheUrl.search !== reqUrl.search)) {
return false
}
if (!(opts && opts.ignoreMethod) && req.method && req.method !== 'GET') {
return false
const vary = cached.resHeaders.get('Vary')
// https://tools.ietf.org/html/rfc7234#section-4.1
if (vary) {
if (vary.match(/\*/)) {
return false
} else {
const fieldsMatch = vary.split(/\s*,\s*/).every(field => {
return cached.reqHeaders.get(field) === req.headers.get(field)
})
if (!fieldsMatch) {
return false
}
}
}
// TODO - opts.ignoreVary?
reqUrl.hash = null
cacheUrl.hash = null
return url.format(reqUrl) === url.format(cacheUrl)
Expand Down
Loading

0 comments on commit 8ebda1d

Please sign in to comment.