Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
perf: expose importer concurrency controls when adding files (#2637)
Browse files Browse the repository at this point in the history
* perf: expose importer concurrency controls when adding files

Adds two new arguments to the cli & http interface:

`--file-import-concurrency` and `--block-write-concurrency`

See ipfs-inactive/js-ipfs-unixfs-importer#41 for
futher discussion.

* chore: update deps, remove unused

* fix: pass args from http

* fix: hard code file concurrency for http requests

* fix: fix up chunker parsing tests

* chore: use ipfs-http-client branch temporarily

* chore: increase bundlesize by 1kb

* chore: update dep

* chore: increase bundle size again
  • Loading branch information
achingbrain committed Dec 11, 2019
1 parent ed886f4 commit 1d19c4f
Show file tree
Hide file tree
Showing 21 changed files with 50 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .aegir.js
Expand Up @@ -9,7 +9,7 @@ const preloadNode = MockPreloadNode.createNode()
const echoServer = EchoServer.createServer()

module.exports = {
bundlesize: { maxSize: '651kB' },
bundlesize: { maxSize: '652kB' },
webpack: {
resolve: {
mainFields: ['browser', 'main'],
Expand Down
7 changes: 3 additions & 4 deletions package.json
Expand Up @@ -66,8 +66,6 @@
"@hapi/joi": "^15.0.0",
"abort-controller": "^3.0.0",
"array-shuffle": "^1.0.1",
"async-iterator-all": "^1.0.0",
"async-iterator-first": "^1.0.0",
"async-iterator-to-pull-stream": "^1.3.0",
"async-iterator-to-stream": "^1.1.0",
"base32.js": "~0.1.0",
Expand Down Expand Up @@ -100,14 +98,14 @@
"ipfs-bitswap": "^0.26.0",
"ipfs-block": "~0.8.1",
"ipfs-block-service": "~0.16.0",
"ipfs-http-client": "^40.0.1",
"ipfs-http-client": "^40.1.0",
"ipfs-http-response": "~0.4.0",
"ipfs-mfs": "^0.13.2",
"ipfs-multipart": "^0.2.0",
"ipfs-repo": "^0.30.0",
"ipfs-unixfs": "~0.1.16",
"ipfs-unixfs-exporter": "^0.38.0",
"ipfs-unixfs-importer": "^0.40.0",
"ipfs-unixfs-importer": "^0.42.0",
"ipfs-utils": "~0.4.0",
"ipld": "~0.25.0",
"ipld-bitcoin": "~0.3.0",
Expand All @@ -123,6 +121,7 @@
"is-pull-stream": "~0.0.0",
"is-stream": "^2.0.0",
"iso-url": "~0.4.6",
"it-all": "^1.0.1",
"it-pipe": "^1.0.1",
"it-to-stream": "^0.1.1",
"jsondiffpatch": "~0.3.11",
Expand Down
15 changes: 14 additions & 1 deletion src/cli/commands/add.js
Expand Up @@ -49,10 +49,20 @@ module.exports = {
default: false,
describe: 'Only chunk and hash, do not write'
},
'block-write-concurrency': {
type: 'integer',
default: 10,
describe: 'After a file has been chunked, this controls how many chunks to hash and add to the block store concurrently'
},
chunker: {
default: 'size-262144',
describe: 'Chunking algorithm to use, formatted like [size-{size}, rabin, rabin-{avg}, rabin-{min}-{avg}-{max}]'
},
'file-import-concurrency': {
type: 'integer',
default: 50,
describe: 'How many files to import at once'
},
'enable-sharding-experiment': {
type: 'boolean',
default: false
Expand Down Expand Up @@ -130,7 +140,10 @@ module.exports = {
wrapWithDirectory: argv.wrapWithDirectory,
pin: argv.pin,
chunker: argv.chunker,
preload: argv.preload
preload: argv.preload,
nonatomic: argv.nonatomic,
fileImportConcurrency: argv.fileImportConcurrency,
blockWriteConcurrency: argv.blockWriteConcurrency
}

if (options.enableShardingExperiment && argv.isDaemonOn()) {
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/block.js
Expand Up @@ -5,7 +5,7 @@ const multihashing = require('multihashing-async')
const CID = require('cids')
const callbackify = require('callbackify')
const errCode = require('err-code')
const all = require('async-iterator-all')
const all = require('it-all')
const { PinTypes } = require('./pin/pin-manager')

module.exports = function block (self) {
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/dag.js
Expand Up @@ -2,7 +2,7 @@

const callbackify = require('callbackify')
const CID = require('cids')
const all = require('async-iterator-all')
const all = require('it-all')
const errCode = require('err-code')
const multicodec = require('multicodec')

Expand Down
2 changes: 1 addition & 1 deletion src/core/components/files-mfs.js
Expand Up @@ -5,7 +5,7 @@ const isPullStream = require('is-pull-stream')
const toPullStream = require('async-iterator-to-pull-stream')
const toReadableStream = require('async-iterator-to-stream')
const pullStreamToAsyncIterator = require('pull-stream-to-async-iterator')
const all = require('async-iterator-all')
const all = require('it-all')
const nodeify = require('promise-nodeify')
const PassThrough = require('stream').PassThrough
const pull = require('pull-stream/pull')
Expand Down
3 changes: 1 addition & 2 deletions src/core/components/files-regular/add-async-iterator.js
Expand Up @@ -22,8 +22,7 @@ module.exports = function (self) {
: Infinity
}, options, {
strategy: 'balanced',
chunker: chunkerOptions.chunker,
chunkerOptions: chunkerOptions.chunkerOptions
...chunkerOptions
})

// CID v0 is for multihashes encoded with sha2-256
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/files-regular/add.js
@@ -1,6 +1,6 @@
'use strict'

const all = require('async-iterator-all')
const all = require('it-all')

module.exports = function (self) {
// can't use callbackify because if `data` is a pull stream
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/files-regular/cat.js
@@ -1,7 +1,7 @@
'use strict'

const callbackify = require('callbackify')
const all = require('async-iterator-all')
const all = require('it-all')

module.exports = function (self) {
return callbackify.variadic(async function cat (ipfsPath, options) {
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/files-regular/get.js
@@ -1,7 +1,7 @@
'use strict'

const callbackify = require('callbackify')
const all = require('async-iterator-all')
const all = require('it-all')

module.exports = function (self) {
return callbackify.variadic(async function get (ipfsPath, options) { // eslint-disable-line require-await
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/files-regular/ls.js
@@ -1,7 +1,7 @@
'use strict'

const callbackify = require('callbackify')
const all = require('async-iterator-all')
const all = require('it-all')

module.exports = function (self) {
return callbackify.variadic(async function ls (ipfsPath, options) { // eslint-disable-line require-await
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/files-regular/refs-local.js
@@ -1,7 +1,7 @@
'use strict'

const callbackify = require('callbackify')
const all = require('async-iterator-all')
const all = require('it-all')

module.exports = function (self) {
return callbackify.variadic(async function refsLocal (ipfsPath, options) { // eslint-disable-line require-await
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/files-regular/refs.js
@@ -1,7 +1,7 @@
'use strict'

const callbackify = require('callbackify')
const all = require('async-iterator-all')
const all = require('it-all')

module.exports = function (self) {
return callbackify.variadic(async function refs (ipfsPath, options) { // eslint-disable-line require-await
Expand Down
6 changes: 2 additions & 4 deletions src/core/components/files-regular/utils.js
Expand Up @@ -45,14 +45,12 @@ const parseChunkerString = (chunker) => {
}
return {
chunker: 'fixed',
chunkerOptions: {
maxChunkSize: size
}
maxChunkSize: size
}
} else if (chunker.startsWith('rabin')) {
return {
chunker: 'rabin',
chunkerOptions: parseRabinString(chunker)
...parseRabinString(chunker)
}
} else {
throw new Error(`Unrecognized chunker option: ${chunker}`)
Expand Down
2 changes: 1 addition & 1 deletion src/core/runtime/add-from-fs-nodejs.js
Expand Up @@ -2,7 +2,7 @@

const callbackify = require('callbackify')
const globSource = require('ipfs-utils/src/files/glob-source')
const all = require('async-iterator-all')
const all = require('it-all')

module.exports = self => {
return callbackify.variadic(async (...args) => { // eslint-disable-line require-await
Expand Down
2 changes: 1 addition & 1 deletion src/http/api/resources/block.js
Expand Up @@ -7,7 +7,7 @@ const multibase = require('multibase')
const Boom = require('@hapi/boom')
const { cidToString } = require('../../../utils/cid')
const debug = require('debug')
const all = require('async-iterator-all')
const all = require('it-all')
const streamResponse = require('../../utils/stream-response')
const log = debug('ipfs:http-api:block')
log.error = debug('ipfs:http-api:block:error')
Expand Down
2 changes: 1 addition & 1 deletion src/http/api/resources/config.js
Expand Up @@ -9,7 +9,7 @@ const multipart = require('ipfs-multipart')
const Boom = require('@hapi/boom')
const Joi = require('@hapi/joi')
const { profiles } = require('../../../core/components/config')
const all = require('async-iterator-all')
const all = require('it-all')

exports.getOrSet = {
// pre request handler that parses the args and returns `key` & `value` which are assigned to `request.pre.args`
Expand Down
2 changes: 1 addition & 1 deletion src/http/api/resources/dag.js
Expand Up @@ -11,7 +11,7 @@ const debug = require('debug')
const {
cidToString
} = require('../../../utils/cid')
const all = require('async-iterator-all')
const all = require('it-all')
const log = debug('ipfs:http-api:dag')
log.error = debug('ipfs:http-api:dag:error')

Expand Down
10 changes: 9 additions & 1 deletion src/http/api/resources/files-regular.js
Expand Up @@ -159,6 +159,8 @@ exports.add = {
'only-hash': Joi.boolean(),
pin: Joi.boolean().default(true),
'wrap-with-directory': Joi.boolean(),
'file-import-concurrency': Joi.number().integer().min(0).default(50),
'block-write-concurrency': Joi.number().integer().min(0).default(10),
chunker: Joi.string(),
trickle: Joi.boolean(),
preload: Joi.boolean().default(true)
Expand Down Expand Up @@ -218,7 +220,13 @@ exports.add = {
pin: request.query.pin,
chunker: request.query.chunker,
trickle: request.query.trickle,
preload: request.query.preload
preload: request.query.preload,

// this has to be hardcoded to 1 because we can only read one file
// at a time from a http request and we have to consume it completely
// before we can read the next file
fileImportConcurrency: 1,
blockWriteConcurrency: request.query['block-write-concurrency']
})
},
async function (source) {
Expand Down
2 changes: 1 addition & 1 deletion src/http/api/resources/object.js
Expand Up @@ -2,7 +2,7 @@

const CID = require('cids')
const multipart = require('ipfs-multipart')
const all = require('async-iterator-all')
const all = require('it-all')
const dagPB = require('ipld-dag-pb')
const { DAGNode, DAGLink } = dagPB
const Joi = require('@hapi/joi')
Expand Down
12 changes: 6 additions & 6 deletions test/core/files-regular-utils.js
Expand Up @@ -20,27 +20,27 @@ describe('files-regular/utils', () => {
it('parses a fixed size string', () => {
const options = utils.parseChunkerString('size-512')
expect(options.chunker).to.equal('fixed')
expect(options.chunkerOptions.maxChunkSize).to.equal(512)
expect(options.maxChunkSize).to.equal(512)
})

it('parses a rabin string without size', () => {
const options = utils.parseChunkerString('rabin')
expect(options.chunker).to.equal('rabin')
expect(options.chunkerOptions.avgChunkSize).to.equal(262144)
expect(options.avgChunkSize).to.equal(262144)
})

it('parses a rabin string with only avg size', () => {
const options = utils.parseChunkerString('rabin-512')
expect(options.chunker).to.equal('rabin')
expect(options.chunkerOptions.avgChunkSize).to.equal(512)
expect(options.avgChunkSize).to.equal(512)
})

it('parses a rabin string with min, avg, and max', () => {
const options = utils.parseChunkerString('rabin-42-92-184')
expect(options.chunker).to.equal('rabin')
expect(options.chunkerOptions.minChunkSize).to.equal(42)
expect(options.chunkerOptions.avgChunkSize).to.equal(92)
expect(options.chunkerOptions.maxChunkSize).to.equal(184)
expect(options.minChunkSize).to.equal(42)
expect(options.avgChunkSize).to.equal(92)
expect(options.maxChunkSize).to.equal(184)
})

it('throws an error for unsupported chunker type', () => {
Expand Down

0 comments on commit 1d19c4f

Please sign in to comment.