Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

feat: preload mfs and dag.put #1468

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
'use strict'

const createServer = require('ipfsd-ctl').createServer
const IPFSFactory = require('ipfsd-ctl')
const parallel = require('async/parallel')
const MockPreloadNode = require('./test/utils/mock-preload-node')

const server = createServer()
const ipfsdServer = IPFSFactory.createServer()
const preloadNode = MockPreloadNode.createNode()

module.exports = {
webpack: {
Expand All @@ -21,9 +24,29 @@ module.exports = {
singleRun: true
},
hooks: {
node: {
pre: (cb) => preloadNode.start(cb),
post: (cb) => preloadNode.stop(cb)
},
browser: {
pre: server.start.bind(server),
post: server.stop.bind(server)
pre: (cb) => {
parallel([
(cb) => {
ipfsdServer.start()
cb()
},
(cb) => preloadNode.start(cb)
], cb)
},
post: (cb) => {
parallel([
(cb) => {
ipfsdServer.stop()
cb()
},
(cb) => preloadNode.stop(cb)
], cb)
}
}
}
}
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ Creates and returns an instance of an IPFS node. Use the `options` argument to s
- `enabled` (boolean): Make this node a relay (other nodes can connect *through* it). (Default: `false`)
- `active` (boolean): Make this an *active* relay node. Active relay nodes will attempt to dial a destination peer even if that peer is not yet connected to the relay. (Default: `false`)

- `preload` (object): Configure external nodes that will preload content added to this node
- `enabled` (boolean): Enable content preloading (Default: `true`)
- `addresses` (array): Multiaddr API addresses of nodes that should preload content. NOTE: nodes specified here should also be added to your node's bootstrap address list at `config.Boostrap`
- `EXPERIMENTAL` (object): Enable and configure experimental features.
- `pubsub` (boolean): Enable libp2p pub-sub. (Default: `false`)
- `sharding` (boolean): Enable directory sharding. Directories that have many child objects will be represented by multiple DAG nodes instead of just one. It can improve lookup performance when a directory has several thousand files or more. (Default: `false`)
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"./src/core/components/init-assets.js": false,
"./src/core/runtime/config-nodejs.js": "./src/core/runtime/config-browser.js",
"./src/core/runtime/libp2p-nodejs.js": "./src/core/runtime/libp2p-browser.js",
"./src/core/runtime/preload-nodejs.js": "./src/core/runtime/preload-browser.js",
"./src/core/runtime/repo-nodejs.js": "./src/core/runtime/repo-browser.js",
"./src/core/runtime/dns-nodejs.js": "./src/core/runtime/dns-browser.js",
"./test/utils/create-repo-nodejs.js": "./test/utils/create-repo-browser.js",
Expand Down Expand Up @@ -140,6 +141,7 @@
"mime-types": "^2.1.18",
"mkdirp": "~0.5.1",
"multiaddr": "^5.0.0",
"multiaddr-to-uri": "^4.0.0",
"multibase": "~0.4.0",
"multihashes": "~0.4.13",
"once": "^1.4.0",
Expand Down
6 changes: 5 additions & 1 deletion src/core/components/dag.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ module.exports = function dag (self) {

options = options.cid ? options : Object.assign({}, optionDefaults, options)

self._ipld.put(dagNode, options, callback)
self._ipld.put(dagNode, options, (err, cid) => {
if (err) return callback(err)
if (options.preload !== false) self._preload(cid)
callback(null, cid)
})
}),

get: promisify((cid, path, options, callback) => {
Expand Down
15 changes: 15 additions & 0 deletions src/core/components/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,20 @@ function normalizeContent (opts, content) {
})
}

function preloadFile (self, opts, file) {
const isRootFile = opts.wrapWithDirectory
? file.path === ''
: !file.path.includes('/')

const shouldPreload = isRootFile && !opts.onlyHash && opts.preload !== false

if (shouldPreload) {
self._preload(file.hash)
}

return file
}

function pinFile (self, opts, file, cb) {
// Pin a file if it is the root dir of a recursive add or the single file
// of a direct add.
Expand Down Expand Up @@ -158,6 +172,7 @@ module.exports = function files (self) {
pull.flatten(),
importer(self._ipld, opts),
pull.asyncMap(prepareFile.bind(null, self, opts)),
pull.map(preloadFile.bind(null, self, opts)),
pull.asyncMap(pinFile.bind(null, self, opts))
)
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ exports.dht = require('./dht')
exports.dns = require('./dns')
exports.key = require('./key')
exports.stats = require('./stats')
exports.mfs = require('ipfs-mfs/core')
exports.mfs = require('./mfs')
24 changes: 24 additions & 0 deletions src/core/components/mfs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict'

const promisify = require('promisify-es6')
const mfs = require('ipfs-mfs/core')

module.exports = self => {
const mfsSelf = Object.assign({}, self)

// A patched dag API to ensure preload doesn't happen for MFS operations
mfsSelf.dag = Object.assign({}, self.dag, {
put: promisify((node, opts, cb) => {
if (typeof opts === 'function') {
cb = opts
opts = {}
}

opts = Object.assign({}, opts, { preload: false })

return self.dag.put(node, opts, cb)
})
})

return mfs(mfsSelf, mfsSelf._options)
}
5 changes: 3 additions & 2 deletions src/core/components/pin-set.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ exports = module.exports = function (dag) {

pinSet.storeItems(pins, (err, rootNode) => {
if (err) { return callback(err) }
const opts = { cid: new CID(rootNode.multihash) }
const opts = { cid: new CID(rootNode.multihash), preload: false }
dag.put(rootNode, opts, (err, cid) => {
if (err) { return callback(err) }
callback(null, rootNode)
Expand Down Expand Up @@ -168,7 +168,8 @@ exports = module.exports = function (dag) {
function storeChild (err, child, binIdx, cb) {
if (err) { return cb(err) }

dag.put(child, { cid: new CID(child._multihash) }, err => {
const opts = { cid: new CID(child._multihash), preload: false }
dag.put(child, opts, err => {
if (err) { return cb(err) }
fanoutLinks[binIdx] = new DAGLink('', child.size, child.multihash)
cb(null)
Expand Down
4 changes: 2 additions & 2 deletions src/core/components/pin.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ module.exports = (self) => {
// the pin-set nodes link to a special 'empty' node, so make sure it exists
cb => DAGNode.create(Buffer.alloc(0), (err, empty) => {
if (err) { return cb(err) }
dag.put(empty, { cid: new CID(empty.multihash) }, cb)
dag.put(empty, { cid: new CID(empty.multihash), preload: false }, cb)
}),

// create a root node with DAGLinks to the direct and recursive DAGs
cb => DAGNode.create(Buffer.alloc(0), [dLink, rLink], (err, node) => {
if (err) { return cb(err) }
root = node
dag.put(root, { cid: new CID(root.multihash) }, cb)
dag.put(root, { cid: new CID(root.multihash), preload: false }, cb)
}),

// hack for CLI tests
Expand Down
4 changes: 3 additions & 1 deletion src/core/components/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ module.exports = (self) => {

self._bitswap.start()
self._blockService.setExchange(self._bitswap)
cb()

self._preload.start()
self._mfsPreload.start(cb)
}
], done)
})
Expand Down
2 changes: 2 additions & 0 deletions src/core/components/stop.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ module.exports = (self) => {
self.state.stop()
self._blockService.unsetExchange()
self._bitswap.stop()
self._preload.stop()

series([
(cb) => self._mfsPreload.stop(cb),
(cb) => self.libp2p.stop(cb),
(cb) => self._repo.close(cb)
], done)
Expand Down
4 changes: 4 additions & 0 deletions src/core/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ const schema = Joi.object().keys({
Joi.string()
).allow(null),
repoOwner: Joi.boolean().default(true),
preload: Joi.object().keys({
enabled: Joi.boolean().default(true),
addresses: Joi.array().items(Joi.multiaddr().options({ convert: false }))
}).allow(null),
init: Joi.alternatives().try(
Joi.boolean(),
Joi.object().keys({ bits: Joi.number().integer() })
Expand Down
15 changes: 13 additions & 2 deletions src/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const boot = require('./boot')
const components = require('./components')
// replaced by repo-browser when running in the browser
const defaultRepo = require('./runtime/repo-nodejs')
const preload = require('./preload')
const mfsPreload = require('./mfs-preload')

class IPFS extends EventEmitter {
constructor (options) {
Expand All @@ -30,7 +32,14 @@ class IPFS extends EventEmitter {
this._options = {
init: true,
start: true,
EXPERIMENTAL: {}
EXPERIMENTAL: {},
preload: {
enabled: true,
addresses: [
'/dnsaddr/node0.preload.ipfs.io/https',
'/dnsaddr/node1.preload.ipfs.io/https'
]
}
}

options = config.validate(options || {})
Expand Down Expand Up @@ -78,6 +87,8 @@ class IPFS extends EventEmitter {
this._blockService = new BlockService(this._repo)
this._ipld = new Ipld(this._blockService)
this._pubsub = undefined
this._preload = preload(this)
this._mfsPreload = mfsPreload(this)

// IPFS Core exposed components
// - for booting up a node
Expand Down Expand Up @@ -134,7 +145,7 @@ class IPFS extends EventEmitter {
}

// ipfs.files
const mfs = components.mfs(this, this._options)
const mfs = components.mfs(this)

Object.keys(mfs).forEach(key => {
this.files[key] = mfs[key]
Expand Down
49 changes: 49 additions & 0 deletions src/core/mfs-preload.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
'use strict'

const debug = require('debug')

const log = debug('jsipfs:mfs-preload')
log.error = debug('jsipfs:mfs-preload:error')

module.exports = (self, options) => {
options = options || {}
options.interval = options.interval || 30 * 1000

let rootCid
let timeoutId

const preloadMfs = () => {
self.files.stat('/', (err, stats) => {
if (err) {
timeoutId = setTimeout(preloadMfs, options.interval)
return log.error('failed to stat MFS root for preload', err)
}

if (rootCid !== stats.hash) {
log(`preloading updated MFS root ${rootCid} -> ${stats.hash}`)

self._preload(stats.hash, (err) => {
timeoutId = setTimeout(preloadMfs, options.interval)
if (err) return log.error(`failed to preload MFS root ${stats.hash}`, err)
rootCid = stats.hash
})
}
})
}

return {
start (cb) {
self.files.stat('/', (err, stats) => {
if (err) return cb(err)
rootCid = stats.hash
log(`monitoring MFS root ${rootCid}`)
timeoutId = setTimeout(preloadMfs, options.interval)
cb()
})
},
stop (cb) {
clearTimeout(timeoutId)
cb()
}
}
}
88 changes: 88 additions & 0 deletions src/core/preload.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
'use strict'

const setImmediate = require('async/setImmediate')
const retry = require('async/retry')
const toUri = require('multiaddr-to-uri')
const debug = require('debug')
const CID = require('cids')
const preload = require('./runtime/preload-nodejs')

const log = debug('jsipfs:preload')
log.error = debug('jsipfs:preload:error')

const noop = (err) => { if (err) log.error(err) }

module.exports = self => {
const options = self._options.preload || {}
options.enabled = Boolean(options.enabled)
options.addresses = options.addresses || []

if (!options.enabled || !options.addresses.length) {
return (_, callback) => {
if (callback) {
setImmediate(() => callback())
}
}
}

let stopped = true
let requests = []
const apiUris = options.addresses.map(apiAddrToUri)

const api = (cid, callback) => {
callback = callback || noop

if (typeof cid !== 'string') {
try {
cid = new CID(cid).toBaseEncodedString()
} catch (err) {
return setImmediate(() => callback(err))
}
}

const fallbackApiUris = Array.from(apiUris)
let request
const now = Date.now()

retry({ times: fallbackApiUris.length }, (cb) => {
if (stopped) return cb(new Error(`preload aborted for ${cid}`))

// Remove failed request from a previous attempt
requests = requests.filter(r => r !== request)

const apiUri = fallbackApiUris.shift()

request = preload(`${apiUri}/api/v0/refs?r=true&arg=${cid}`, cb)
requests = requests.concat(request)
}, (err) => {
requests = requests.filter(r => r !== request)

if (err) {
return callback(err)
}

log(`preloaded ${cid} in ${Date.now() - now}ms`)
callback()
})
}

api.start = () => {
stopped = false
}

api.stop = () => {
stopped = true
log(`canceling ${requests.length} pending preload request(s)`)
requests.forEach(r => r.cancel())
requests = []
}

return api
}

function apiAddrToUri (addr) {
if (!(addr.endsWith('http') || addr.endsWith('https'))) {
addr = addr + '/http'
}
return toUri(addr)
}
Loading