Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix: report ipfs.add progress over http (#3310)
Browse files Browse the repository at this point in the history
The browser fetch api doesn't allow reading of any data until the whole request has been sent which means progress events only fire after the upload is complete which rather defeats the purpose of reporting upload progress.

Here we switch to XHR for uploads with progress that does allow reading response data before the request is complete.

Co-authored-by: achingbrain <alex@achingbrain.net>
  • Loading branch information
Gozala and achingbrain committed Nov 16, 2020
1 parent bba650d commit 39cad4b
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 95 deletions.
2 changes: 1 addition & 1 deletion examples/browser-ipns-publish/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"human-crypto-keys": "^0.1.4",
"ipfs": "^0.52.0",
"ipfs-http-client": "^48.1.0",
"ipfs-utils": "^4.0.0",
"ipfs-utils": "^5.0.0",
"ipns": "^0.8.0",
"it-last": "^1.0.4",
"p-retry": "^4.2.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/interface-ipfs-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"err-code": "^2.0.3",
"ipfs-unixfs": "^2.0.3",
"ipfs-unixfs-importer": "^4.0.0",
"ipfs-utils": "^4.0.0",
"ipfs-utils": "^5.0.0",
"ipld-block": "^0.11.0",
"ipld-dag-cbor": "^0.17.0",
"ipld-dag-pb": "^0.20.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"ipfs-http-gateway": "^0.1.1",
"ipfs-http-server": "^0.1.1",
"ipfs-repo": "^7.0.0",
"ipfs-utils": "^4.0.0",
"ipfs-utils": "^5.0.0",
"ipld-dag-cbor": "^0.17.0",
"ipld-dag-pb": "^0.20.0",
"it-all": "^1.0.4",
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-core-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"browser-readablestream-to-it": "^1.0.1",
"cids": "^1.0.0",
"err-code": "^2.0.3",
"ipfs-utils": "^4.0.0",
"ipfs-utils": "^5.0.0",
"it-all": "^1.0.4",
"it-map": "^1.0.4",
"it-peekable": "^1.0.1",
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-core/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ let preloadNode = MockPreloadNode.createNode()
let ipfsdServer

module.exports = {
bundlesize: { maxSize: '523kB' },
bundlesize: { maxSize: '524kB' },
karma: {
files: [{
pattern: 'node_modules/interface-ipfs-core/test/fixtures/**/*',
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
"ipfs-unixfs": "^2.0.3",
"ipfs-unixfs-exporter": "^3.0.4",
"ipfs-unixfs-importer": "^4.0.0",
"ipfs-utils": "^4.0.0",
"ipfs-utils": "^5.0.0",
"ipld": "^0.28.0",
"ipld-block": "^0.11.0",
"ipld-dag-cbor": "^0.17.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-http-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"debug": "^4.1.1",
"form-data": "^3.0.0",
"ipfs-core-utils": "^0.5.1",
"ipfs-utils": "^4.0.0",
"ipfs-utils": "^5.0.0",
"ipld-block": "^0.11.0",
"ipld-dag-cbor": "^0.17.0",
"ipld-dag-pb": "^0.20.0",
Expand Down
62 changes: 57 additions & 5 deletions packages/ipfs-http-client/src/add-all.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,20 @@ module.exports = configure((api) => {
* @type {import('.').Implements<typeof import('ipfs-core/src/components/add-all/index')>}
*/
async function * addAll (source, options = {}) {
const progressFn = options.progress

// allow aborting requests on body errors
const controller = new AbortController()
const signal = anySignal([controller.signal, options.signal])
const { headers, body, total, parts } =
await multipartRequest(source, controller, options.headers)

// In browser response body only starts streaming once upload is
// complete, at which point all the progress updates are invalid. If
// length of the content is computable we can interpret progress from
// `{ total, loaded}` passed to `onUploadProgress` and `multipart.total`
// in which case we disable progress updates to be written out.
const [progressFn, onUploadProgress] = typeof options.progress === 'function'
? createProgressHandler(total, parts, options.progress)
: [null, null]

const res = await api.post('add', {
searchParams: toUrlSearchParams({
Expand All @@ -26,10 +35,10 @@ module.exports = configure((api) => {
progress: Boolean(progressFn)
}),
timeout: options.timeout,
onUploadProgress,
signal,
...(
await multipartRequest(source, controller, options.headers)
)
headers,
body
})

for await (let file of res.ndjson()) {
Expand All @@ -45,6 +54,48 @@ module.exports = configure((api) => {
return addAll
})

/**
* Returns simple progress callback when content length isn't computable or a
* progress event handler that calculates progress from upload progress events.
*
* @param {number} total
* @param {{name:string, start:number, end:number}[]|null} parts
* @param {(n:number, name:string) => void} progress
*/
const createProgressHandler = (total, parts, progress) =>
parts ? [null, createOnUploadPrgress(total, parts, progress)] : [progress, null]

/**
* Creates a progress handler that interpolates progress from upload progress
* events and total size of the content that is added.
*
* @param {number} size - actual content size
* @param {{name:string, start:number, end:number}[]} parts
* @param {(n:number, name:string) => void} progress
* @returns {(event:{total:number, loaded: number}) => void}
*/
const createOnUploadPrgress = (size, parts, progress) => {
let index = 0
const count = parts.length
return ({ loaded, total }) => {
// Derive position from the current progress.
const position = Math.floor(loaded / total * size)
while (index < count) {
const { start, end, name } = parts[index]
// If within current part range report progress and break the loop
if (position < end) {
progress(position - start, name)
break
// If passed current part range report final byte for the chunk and
// move to next one.
} else {
progress(end - start, name)
index += 1
}
}
}
}

/**
* @param {any} input
* @returns {UnixFSEntry}
Expand All @@ -67,6 +118,7 @@ function toCoreInterface ({ name, hash, size, mode, mtime, mtimeNsecs }) {
}
}

// @ts-ignore
return output
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
'use strict'

// Import browser version otherwise electron-renderer will end up with node
// version and fail.
const normaliseInput = require('ipfs-core-utils/src/files/normalise-input/index.browser')
const modeToString = require('./mode-to-string')
const mtimeToObject = require('./mtime-to-object')
const { File, FormData } = require('ipfs-utils/src/globalthis')

async function multipartRequest (source = '', abortController, headers = {}) {
const parts = []
const formData = new FormData()
let index = 0
let total = 0

for await (const { content, path, mode, mtime } of normaliseInput(source)) {
let fileSuffix = ''
Expand Down Expand Up @@ -41,6 +45,9 @@ async function multipartRequest (source = '', abortController, headers = {}) {

if (content) {
formData.set(fieldName, content, encodeURIComponent(path))
const end = total + content.size
parts.push({ name: path, start: total, end })
total = end
} else {
formData.set(fieldName, new File([''], encodeURIComponent(path), { type: 'application/x-directory' }))
}
Expand All @@ -49,6 +56,8 @@ async function multipartRequest (source = '', abortController, headers = {}) {
}

return {
total,
parts,
headers,
body: formData
}
Expand Down
85 changes: 4 additions & 81 deletions packages/ipfs-http-client/src/lib/multipart-request.js
Original file line number Diff line number Diff line change
@@ -1,87 +1,10 @@
'use strict'

const normaliseInput = require('ipfs-core-utils/src/files/normalise-input/index')
const { nanoid } = require('nanoid')
const modeToString = require('../lib/mode-to-string')
const mtimeToObject = require('../lib/mtime-to-object')
const merge = require('merge-options').bind({ ignoreUndefined: true })
const toStream = require('it-to-stream')
const { isElectronRenderer } = require('ipfs-utils/src/env')

/**
*
* @param {Object} source
* @param {AbortController} abortController
* @param {Headers|Record<string, string>} [headers]
* @param {string} [boundary]
*/
async function multipartRequest (source = '', abortController, headers = {}, boundary = `-----------------------------${nanoid()}`) {
async function * streamFiles (source) {
try {
let index = 0

for await (const { content, path, mode, mtime } of normaliseInput(source)) {
let fileSuffix = ''
const type = content ? 'file' : 'dir'

if (index > 0) {
yield '\r\n'

fileSuffix = `-${index}`
}

let fieldName = type + fileSuffix
const qs = []

if (mode !== null && mode !== undefined) {
qs.push(`mode=${modeToString(mode)}`)
}

const time = mtimeToObject(mtime)
if (time != null) {
const { secs, nsecs } = time

qs.push(`mtime=${secs}`)

if (nsecs != null) {
qs.push(`mtime-nsecs=${nsecs}`)
}
}

if (qs.length) {
fieldName = `${fieldName}?${qs.join('&')}`
}

yield `--${boundary}\r\n`
yield `Content-Disposition: form-data; name="${fieldName}"; filename="${encodeURIComponent(path)}"\r\n`
yield `Content-Type: ${content ? 'application/octet-stream' : 'application/x-directory'}\r\n`
yield '\r\n'

if (content) {
yield * content
}

index++
}
} catch (err) {
// workaround for https://github.com/node-fetch/node-fetch/issues/753
// @ts-ignore - abort does not expect an arguments
abortController.abort(err)
} finally {
yield `\r\n--${boundary}--\r\n`
}
}

return {
headers: merge(headers, {
'Content-Type': `multipart/form-data; boundary=${boundary}`
}),
body: await toStream(streamFiles(source))
}
}

module.exports = multipartRequest

// In electron-renderer we use native fetch and should encode body using native
// form data.
if (isElectronRenderer) {
module.exports = require('./multipart-request.browser')
} else {
module.exports = require('./multipart-request.node')
}
84 changes: 84 additions & 0 deletions packages/ipfs-http-client/src/lib/multipart-request.node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
'use strict'

const normaliseInput = require('ipfs-core-utils/src/files/normalise-input')
const { nanoid } = require('nanoid')
const modeToString = require('./mode-to-string')
const mtimeToObject = require('./mtime-to-object')
const merge = require('merge-options').bind({ ignoreUndefined: true })
const toStream = require('it-to-stream')

/**
*
* @param {Object} source
* @param {AbortController} abortController
* @param {Headers|Record<string, string>} [headers]
* @param {string} [boundary]
*/
async function multipartRequest (source = '', abortController, headers = {}, boundary = `-----------------------------${nanoid()}`) {
async function * streamFiles (source) {
try {
let index = 0

for await (const { content, path, mode, mtime } of normaliseInput(source)) {
let fileSuffix = ''
const type = content ? 'file' : 'dir'

if (index > 0) {
yield '\r\n'

fileSuffix = `-${index}`
}

let fieldName = type + fileSuffix
const qs = []

if (mode !== null && mode !== undefined) {
qs.push(`mode=${modeToString(mode)}`)
}

const time = mtimeToObject(mtime)
if (time != null) {
const { secs, nsecs } = time

qs.push(`mtime=${secs}`)

if (nsecs != null) {
qs.push(`mtime-nsecs=${nsecs}`)
}
}

if (qs.length) {
fieldName = `${fieldName}?${qs.join('&')}`
}

yield `--${boundary}\r\n`
yield `Content-Disposition: form-data; name="${fieldName}"; filename="${encodeURIComponent(path)}"\r\n`
yield `Content-Type: ${content ? 'application/octet-stream' : 'application/x-directory'}\r\n`
yield '\r\n'

if (content) {
yield * content
}

index++
}
} catch (err) {
// workaround for https://github.com/node-fetch/node-fetch/issues/753
// @ts-ignore - abort does not expect an arguments
abortController.abort(err)
} finally {
yield `\r\n--${boundary}--\r\n`
}
}

return {
parts: null,
total: -1,
headers: merge(headers, {
'Content-Type': `multipart/form-data; boundary=${boundary}`
}),
body: await toStream(streamFiles(source))
}
}

module.exports = multipartRequest
2 changes: 1 addition & 1 deletion packages/ipfs/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const path = require('path')
let preloadNode
let echoServer = new EchoServer()

// the second signalling server is needed for the inferface test 'should list peers only once even if they have multiple addresses'
// the second signalling server is needed for the interface test 'should list peers only once even if they have multiple addresses'
let sigServerA
let sigServerB
let ipfsdServer
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"interface-ipfs-core": "^0.142.0",
"ipfs-http-client": "^48.1.0",
"ipfs-interop": "^3.0.0",
"ipfs-utils": "^4.0.0",
"ipfs-utils": "^5.0.0",
"ipfsd-ctl": "^7.0.2",
"iso-url": "^1.0.0",
"libp2p-webrtc-star": "^0.20.1",
Expand Down

0 comments on commit 39cad4b

Please sign in to comment.