Skip to content

Commit

Permalink
Merge 030c5ed into e261147
Browse files Browse the repository at this point in the history
  • Loading branch information
yusefnapora committed Feb 23, 2017
2 parents e261147 + 030c5ed commit 50a4c24
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 0 deletions.
86 changes: 86 additions & 0 deletions npm-shrinkwrap.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"byline": "^5.0.0",
"digest-stream": "^1.0.1",
"duplex-child-process": "0.0.5",
"gunzip-maybe": "^1.3.1",
"knex": "^0.12.6",
"levelup": "^1.3.3",
"libp2p-crypto": "^0.8.0",
Expand Down
14 changes: 14 additions & 0 deletions src/client/api/RestClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const ndjson = require('ndjson')
const byline = require('byline')
const mapStream = require('map-stream')
const serialize = require('../../metadata/serialize')
const {stringifyNestedBuffers} = require('../../common/util')

import type { Transform as TransformStream, Duplex as DuplexStream, Readable as ReadableStream } from 'stream'
import type { StatementMsg, SimpleStatementMsg } from '../../protobuf/types'
Expand Down Expand Up @@ -146,6 +147,19 @@ class RestClient {
.then(parseMergeResponse)
}

importStatements (statements: Array<StatementMsg>): Promise<number> {
const statementNDJSON = statements
.map(s => stringifyNestedBuffers(s))
.map(s => JSON.stringify(s))
.join('\n')
return this.importRaw(statementNDJSON)
}

importRaw (statementNDJSON: string): Promise<number> {
return this.postRequest(`import`, statementNDJSON, false)
.then(parseIntResponse)
}

push (queryString: string, remotePeer: string): Promise<{statementCount: number, objectCount: number}> {
return this.postRequest(`push/${remotePeer}`, queryString, false)
.then(parseMergeResponse)
Expand Down
133 changes: 133 additions & 0 deletions src/client/cli/commands/archiveLoad.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// @flow

const fs = require('fs')
const gunzip = require('gunzip-maybe')
const tar = require('tar-stream')
const {consumeStream} = require('../../../common/util')
const {subcommand, printlnErr, println, pluralizeCount} = require('../util')

import type {RestClient} from '../../api'
import type {Readable as ReadableStream} from 'stream'

const OBJECT_BATCH_SIZE = 1024

module.exports = {
command: 'archiveLoad [filename]',
builder: {
filename: {
description: 'A tar archive (optionally gzipped) from which to read statements and objects, ' +
'as generated by `mcclient archive`. If not given, archive will be read from standard input.\n',
required: false,
type: 'string',
default: null
},
allowErrors: {
alias: ['warn', 'w'],
description: 'Warn if an error occurs when loading the archive instead of aborting the load.\n',
type: 'boolean',
default: false
}
},
handler: subcommand((opts: {client: RestClient, filename?: ?string, allowErrors: boolean}) => new Promise((resolve, reject) => {
const {client, filename, allowErrors} = opts
const inputStream = (filename != null) ? fs.createReadStream(filename) : process.stdin
const inputStreamName = filename || 'standard input'
const handlerPromises = []

const tarStream = tar.extract()
let objectBatch = []

let objectCount = 0
let statementCount = 0

function handleError (message: string): (err: Error) => void {
return err => {
const msg = message + ': ' + err.message
if (allowErrors) {
printlnErr(msg)
} else {
throw new Error(msg)
}
}
}

function sendBatch (force: boolean = false) {
if (force || objectBatch.length >= OBJECT_BATCH_SIZE) {
if (objectBatch.length < 1) return

const objects = objectBatch
objectBatch = []
handlerPromises.push(
client.putData(...objects)
.then(keys => {
objectCount += keys.length
})
.catch(handleError('Error sending data objects'))
)
}
}

tarStream.on('entry', (header, contentStream, done) => {
const {name} = header
if (name.startsWith('stmt/')) {
handlerPromises.push(
handleStatementEntry(contentStream, client)
.then(count => {
statementCount += count
})
.catch(handleError(`Error importing statements from ${name}`))
.then(() => done())
)
} else if (name.startsWith('data/')) {
readDataEntry(contentStream)
.then(obj => {
objectBatch.push(obj)
sendBatch()
})
.catch(handleError(`Error reading data object from ${name}`))
.then(() => done())
} else {
printlnErr(`Unexpected entry "${name}", ignoring`)
done()
}
})

inputStream.on('error', err => {
reject(new Error(`Error reading from ${inputStreamName}: ${err.message}`))
})
tarStream.on('error', err => {
reject(new Error(`Error reading from tar archive: ${err.message}`))
})

tarStream.on('finish', () => {
handlerPromises.push(sendBatch(true))
Promise.all(handlerPromises)
.then(() => {
println(`Imported ${pluralizeCount(statementCount, 'new statement')} and sent ${pluralizeCount(objectCount, 'object')}`)
resolve()
})
})

inputStream
.pipe(gunzip())
.pipe(tarStream)
}))
}

function handleStatementEntry (contentStream: ReadableStream, client: RestClient): Promise<number> {
return consumeStream(contentStream)
.then(ndjson => client.importRaw(ndjson))
}

function readDataEntry (contentStream: ReadableStream): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks = []
contentStream.on('data', chunk => {
chunks.push(chunk)
})
contentStream.on('end', () => {
resolve(Buffer.concat(chunks))
})
contentStream.on('error', reject)
})
}

0 comments on commit 50a4c24

Please sign in to comment.