Skip to content

Commit

Permalink
dedup object refs, write statements as ndjson batches
Browse files Browse the repository at this point in the history
  • Loading branch information
yusefnapora committed Feb 22, 2017
1 parent a2f87c1 commit 1c5c990
Showing 1 changed file with 44 additions and 19 deletions.
63 changes: 44 additions & 19 deletions src/client/cli/commands/archive.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const {subcommand} = require('../util')
const {Statement} = require('../../../model/statement')
import type {RestClient} from '../../api'

const STMT_BATCH_SIZE = 1024
const OBJECT_BATCH_SIZE = 1024
const TAR_ENTRY_OPTS = {
uid: 500,
Expand All @@ -15,6 +16,10 @@ const TAR_ENTRY_OPTS = {
gname: 'staff'
}

function leftpad (str, length, char = '0') {
return char.repeat(Math.max(0, length - str.length)) + str
}

module.exports = {
command: 'archive <queryString>',
description: 'Create a gzipped tar archive of the statements and data objects returned for the given `queryString`',
Expand All @@ -29,27 +34,28 @@ module.exports = {
},
handler: subcommand((opts: {client: RestClient, queryString: string, output?: ?string}) => {
const {client, queryString} = opts
let dataFetchPromises = []
let outputStream
let tarball
const tarball = tar.pack()
const gzip = zlib.createGzip()
const objectIds: Set<string> = new Set()

return client.queryStream(queryString)
.then(response => new Promise((resolve, reject) => {
const {output} = opts
const queryStream = response.stream()
const outStreamName = output || 'standard output'
outputStream = (output == null) ? process.stdout : fs.createWriteStream(output)

tarball = tar.pack()
const gzip = zlib.createGzip()
tarball.pipe(gzip).pipe(outputStream)

let objectIds = []
function fetchBatch (force: boolean = false) {
if (force || objectIds.length >= OBJECT_BATCH_SIZE) {
const ids = objectIds
objectIds = []
dataFetchPromises.push(writeDataObjectsToTarball(client, tarball, ids))
let stmtBatch: Array<string> = []
let stmtBatchNumber = 0
function writeStatementBatch (force: boolean = false) {
if (force || stmtBatch.length >= STMT_BATCH_SIZE) {
const content = Buffer.from(stmtBatch.join('\n'), 'utf-8')
const filename = `stmt/${leftpad(stmtBatchNumber.toString(), 8)}.ndjson`
writeToTarball(tarball, filename, content)
stmtBatchNumber += 1
stmtBatch = []
}
}

Expand All @@ -70,22 +76,23 @@ module.exports = {
return
}

const name = `stmt/${stmt.id}`
const content = Buffer.from(JSON.stringify(obj), 'utf-8')
writeToTarball(tarball, name, content)
stmtBatch.push(JSON.stringify(obj))
writeStatementBatch()

for (const id of stmt.objectIds) {
objectIds.push(id)
objectIds.add(id)
}
for (const id of stmt.depsSet) {
objectIds.add(id)
}
fetchBatch()
})

queryStream.on('end', () => {
fetchBatch(true)
writeStatementBatch(true)
resolve()
})
}))
.then(() => Promise.all(dataFetchPromises))
.then(() => writeDataObjectsToTarball(client, tarball, objectIds))
.then(() => new Promise(resolve => {
outputStream.on('end', () => resolve())
tarball.finalize()
Expand All @@ -98,7 +105,25 @@ function writeToTarball (tarball: Object, filename: string, content: Buffer) {
tarball.entry(header, content)
}

function writeDataObjectsToTarball (client: RestClient, tarball: Object, objectIds: Array<string>): Promise<*> {
function writeDataObjectsToTarball (client: RestClient, tarball: Object, objectIds: Set<string>): Promise<*> {
if (objectIds.size < 1) return Promise.resolve()

const batchPromises = []
let batch = []
for (const id of objectIds) {
batch.push(id)
if (batch.length >= OBJECT_BATCH_SIZE) {
const ids = batch
batch = []
batchPromises.push(fetchObjectBatch(client, tarball, ids))
}
}

batchPromises.push(fetchObjectBatch(client, tarball, batch))
return Promise.all(batchPromises)
}

function fetchObjectBatch (client: RestClient, tarball: Object, objectIds: Array<string>): Promise<*> {
if (objectIds.length < 1) return Promise.resolve()

return client.batchGetDataStream(objectIds, false)
Expand Down

0 comments on commit 1c5c990

Please sign in to comment.