diff --git a/src/client/cli/commands/archive.js b/src/client/cli/commands/archive.js index 5f2f392..09ae268 100644 --- a/src/client/cli/commands/archive.js +++ b/src/client/cli/commands/archive.js @@ -7,6 +7,7 @@ const {subcommand} = require('../util') const {Statement} = require('../../../model/statement') import type {RestClient} from '../../api' +const STMT_BATCH_SIZE = 1024 const OBJECT_BATCH_SIZE = 1024 const TAR_ENTRY_OPTS = { uid: 500, @@ -15,6 +16,10 @@ const TAR_ENTRY_OPTS = { gname: 'staff' } +function leftpad (str, length, char = '0') { + return char.repeat(Math.max(0, length - str.length)) + str +} + module.exports = { command: 'archive ', description: 'Create a gzipped tar archive of the statements and data objects returned for the given `queryString`', @@ -29,9 +34,10 @@ module.exports = { }, handler: subcommand((opts: {client: RestClient, queryString: string, output?: ?string}) => { const {client, queryString} = opts - let dataFetchPromises = [] let outputStream - let tarball + const tarball = tar.pack() + const gzip = zlib.createGzip() + const objectIds: Set = new Set() return client.queryStream(queryString) .then(response => new Promise((resolve, reject) => { @@ -39,17 +45,17 @@ module.exports = { const queryStream = response.stream() const outStreamName = output || 'standard output' outputStream = (output == null) ? process.stdout : fs.createWriteStream(output) - - tarball = tar.pack() - const gzip = zlib.createGzip() tarball.pipe(gzip).pipe(outputStream) - let objectIds = [] - function fetchBatch (force: boolean = false) { - if (force || objectIds.length >= OBJECT_BATCH_SIZE) { - const ids = objectIds - objectIds = [] - dataFetchPromises.push(writeDataObjectsToTarball(client, tarball, ids)) + let stmtBatch: Array = [] + let stmtBatchNumber = 0 + function writeStatementBatch (force: boolean = false) { + if (force || stmtBatch.length >= STMT_BATCH_SIZE) { + const content = Buffer.from(stmtBatch.join('\n'), 'utf-8') + const filename = `stmt/${leftpad(stmtBatchNumber.toString(), 8)}.ndjson` + writeToTarball(tarball, filename, content) + stmtBatchNumber += 1 + stmtBatch = [] } } @@ -70,22 +76,23 @@ module.exports = { return } - const name = `stmt/${stmt.id}` - const content = Buffer.from(JSON.stringify(obj), 'utf-8') - writeToTarball(tarball, name, content) + stmtBatch.push(JSON.stringify(obj)) + writeStatementBatch() for (const id of stmt.objectIds) { - objectIds.push(id) + objectIds.add(id) + } + for (const id of stmt.depsSet) { + objectIds.add(id) } - fetchBatch() }) queryStream.on('end', () => { - fetchBatch(true) + writeStatementBatch(true) resolve() }) })) - .then(() => Promise.all(dataFetchPromises)) + .then(() => writeDataObjectsToTarball(client, tarball, objectIds)) .then(() => new Promise(resolve => { outputStream.on('end', () => resolve()) tarball.finalize() @@ -98,7 +105,25 @@ function writeToTarball (tarball: Object, filename: string, content: Buffer) { tarball.entry(header, content) } -function writeDataObjectsToTarball (client: RestClient, tarball: Object, objectIds: Array): Promise<*> { +function writeDataObjectsToTarball (client: RestClient, tarball: Object, objectIds: Set): Promise<*> { + if (objectIds.size < 1) return Promise.resolve() + + const batchPromises = [] + let batch = [] + for (const id of objectIds) { + batch.push(id) + if (batch.length >= OBJECT_BATCH_SIZE) { + const ids = batch + batch = [] + batchPromises.push(fetchObjectBatch(client, tarball, ids)) + } + } + + batchPromises.push(fetchObjectBatch(client, tarball, batch)) + return Promise.all(batchPromises) +} + +function fetchObjectBatch (client: RestClient, tarball: Object, objectIds: Array): Promise<*> { if (objectIds.length < 1) return Promise.resolve() return client.batchGetDataStream(objectIds, false)