Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

feat: make ipfs.libp2p the libp2p node rather than component #1832

Merged
merged 5 commits into from
Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/core/components/dht.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module.exports = (self) => {

options = options || {}

self._libp2pNode.dht.get(key, options.timeout, callback)
self.libp2p.dht.get(key, options.timeout, callback)
}),

/**
Expand All @@ -50,7 +50,7 @@ module.exports = (self) => {
return callback(new Error('Not valid key'))
}

self._libp2pNode.dht.put(key, value, callback)
self.libp2p.dht.put(key, value, callback)
}),

/**
Expand Down Expand Up @@ -83,7 +83,7 @@ module.exports = (self) => {

opts = opts || {}

self._libp2pNode.contentRouting.findProviders(key, opts.timeout || null, callback)
self.libp2p.contentRouting.findProviders(key, opts.timeout || null, callback)
}),

/**
Expand All @@ -98,7 +98,7 @@ module.exports = (self) => {
peer = PeerId.createFromB58String(peer)
}

self._libp2pNode.peerRouting.findPeer(peer, (err, info) => {
self.libp2p.peerRouting.findPeer(peer, (err, info) => {
if (err) {
return callback(err)
}
Expand Down Expand Up @@ -154,7 +154,7 @@ module.exports = (self) => {
// TODO: Implement recursive providing
} else {
each(keys, (cid, cb) => {
self._libp2pNode.contentRouting.provide(cid, cb)
self.libp2p.contentRouting.provide(cid, cb)
}, callback)
}
})
Expand All @@ -173,7 +173,7 @@ module.exports = (self) => {
}

// TODO expose this method in peerRouting
self._libp2pNode._dht.getClosestPeers(peerId.toBytes(), (err, peerIds) => {
self.libp2p._dht.getClosestPeers(peerId.toBytes(), (err, peerIds) => {
if (err) {
return callback(err)
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/components/is-online.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module.exports = function isOnline (self) {
return () => {
return Boolean(self._bitswap && self._libp2pNode && self._libp2pNode.isStarted())
return Boolean(self._bitswap && self.libp2p && self.libp2p.isStarted())
}
}
194 changes: 83 additions & 111 deletions src/core/components/libp2p.js
Original file line number Diff line number Diff line change
@@ -1,129 +1,101 @@
'use strict'

const promisify = require('promisify-es6')
const get = require('lodash/get')
const defaultsDeep = require('@nodeutils/defaults-deep')
const ipnsUtils = require('../ipns/routing/utils')

module.exports = function libp2p (self) {
return {
start: promisify((callback) => {
self.config.get(gotConfig)
module.exports = function libp2p (self, config) {
const options = self._options || {}
config = config || {}

function gotConfig (err, config) {
if (err) {
return callback(err)
}
// Always create libp2p via a bundle function
const createBundle = typeof options.libp2p === 'function'
? options.libp2p
: defaultBundle

const defaultBundle = (opts) => {
const libp2pDefaults = {
datastore: opts.datastore,
peerInfo: opts.peerInfo,
peerBook: opts.peerBook,
config: {
peerDiscovery: {
mdns: {
enabled: get(opts.options, 'config.Discovery.MDNS.Enabled',
get(opts.config, 'Discovery.MDNS.Enabled', true))
},
webRTCStar: {
enabled: get(opts.options, 'config.Discovery.webRTCStar.Enabled',
get(opts.config, 'Discovery.webRTCStar.Enabled', true))
},
bootstrap: {
list: get(opts.options, 'config.Bootstrap',
get(opts.config, 'Bootstrap', []))
}
},
relay: {
enabled: get(opts.options, 'relay.enabled',
get(opts.config, 'relay.enabled', false)),
hop: {
enabled: get(opts.options, 'relay.hop.enabled',
get(opts.config, 'relay.hop.enabled', false)),
active: get(opts.options, 'relay.hop.active',
get(opts.config, 'relay.hop.active', false))
}
},
dht: {
validators: {
ipns: ipnsUtils.validator
},
selectors: {
ipns: ipnsUtils.selector
}
},
EXPERIMENTAL: {
dht: get(opts.options, 'EXPERIMENTAL.dht', false),
pubsub: get(opts.options, 'EXPERIMENTAL.pubsub', false)
}
},
connectionManager: get(opts.options, 'connectionManager',
get(opts.config, 'connectionManager', {}))
}
const { datastore } = self._repo
const peerInfo = self._peerInfo
const peerBook = self._peerInfoBook
const libp2p = createBundle({ options, config, datastore, peerInfo, peerBook })
let discoveredPeers = []

const libp2pOptions = defaultsDeep(
get(self._options, 'libp2p', {}),
libp2pDefaults
)
const putAndDial = peerInfo => {
peerBook.put(peerInfo)
libp2p.dial(peerInfo, () => {})
}

// Required inline to reduce startup time
// Note: libp2p-nodejs gets replaced by libp2p-browser when webpacked/browserified
const Node = require('../runtime/libp2p-nodejs')
return new Node(libp2pOptions)
}
libp2p.on('start', () => {
peerInfo.multiaddrs.forEach((ma) => {
self._print('Swarm listening on', ma.toString())
})
discoveredPeers.forEach(putAndDial)
discoveredPeers = []
})

// Always create libp2p via a bundle function
let libp2pBundle = get(self._options, 'libp2p', null)
if (typeof libp2pBundle !== 'function') {
libp2pBundle = defaultBundle
}
libp2p.on('peer:discovery', (peerInfo) => {
if (self.isOnline()) {
putAndDial(peerInfo)
} else {
discoveredPeers.push(peerInfo)
}
})

self._libp2pNode = libp2pBundle({
options: self._options,
config: config,
datastore: self._repo.datastore,
peerInfo: self._peerInfo,
peerBook: self._peerInfoBook
})
libp2p.on('peer:connect', peerInfo => peerBook.put(peerInfo))

let discoveredPeers = []
return libp2p
}

const putAndDial = peerInfo => {
self._peerInfoBook.put(peerInfo)
self._libp2pNode.dial(peerInfo, () => {})
function defaultBundle ({ datastore, peerInfo, peerBook, options, config }) {
const libp2pDefaults = {
datastore,
peerInfo,
peerBook,
config: {
peerDiscovery: {
mdns: {
enabled: get(options, 'config.Discovery.MDNS.Enabled',
get(config, 'Discovery.MDNS.Enabled', true))
},
webRTCStar: {
enabled: get(options, 'config.Discovery.webRTCStar.Enabled',
get(config, 'Discovery.webRTCStar.Enabled', true))
},
bootstrap: {
list: get(options, 'config.Bootstrap',
get(config, 'Bootstrap', []))
}

self._libp2pNode.on('start', () => {
discoveredPeers.forEach(putAndDial)
discoveredPeers = []
})

self._libp2pNode.on('peer:discovery', (peerInfo) => {
if (self.isOnline()) {
putAndDial(peerInfo)
} else {
discoveredPeers.push(peerInfo)
}
})

self._libp2pNode.on('peer:connect', (peerInfo) => {
self._peerInfoBook.put(peerInfo)
})

self._libp2pNode.start((err) => {
if (err) { return callback(err) }

self._libp2pNode.peerInfo.multiaddrs.forEach((ma) => {
self._print('Swarm listening on', ma.toString())
})

callback()
})
},
relay: {
enabled: get(options, 'relay.enabled',
get(config, 'relay.enabled', false)),
hop: {
enabled: get(options, 'relay.hop.enabled',
get(config, 'relay.hop.enabled', false)),
active: get(options, 'relay.hop.active',
get(config, 'relay.hop.active', false))
}
},
dht: {
validators: {
ipns: ipnsUtils.validator
},
selectors: {
ipns: ipnsUtils.selector
}
},
EXPERIMENTAL: {
dht: get(options, 'EXPERIMENTAL.dht', false),
pubsub: get(options, 'EXPERIMENTAL.pubsub', false)
}
}),
stop: promisify((callback) => {
self._libp2pNode.stop(callback)
})
},
connectionManager: get(options, 'connectionManager',
get(config, 'connectionManager', {}))
}

const libp2pOptions = defaultsDeep(get(options, 'libp2p', {}), libp2pDefaults)

// Required inline to reduce startup time
// Note: libp2p-nodejs gets replaced by libp2p-browser when webpacked/browserified
const Node = require('../runtime/libp2p-nodejs')
return new Node(libp2pOptions)
}
4 changes: 2 additions & 2 deletions src/core/components/ping-pull-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ module.exports = function pingPullStream (self) {

const source = Pushable()

getPeer(self._libp2pNode, source, peerId, (err, peer) => {
getPeer(self.libp2p, source, peerId, (err, peer) => {
if (err) {
log.error(err)
source.end(err)
return
}

runPing(self._libp2pNode, source, opts.count, peer, (err) => {
runPing(self.libp2p, source, opts.count, peer, (err) => {
if (err) {
log.error(err)
source.push(getPacket({ success: false, text: err.toString() }))
Expand Down
14 changes: 7 additions & 7 deletions src/core/components/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module.exports = function pubsub (self) {

if (!callback) {
return new Promise((resolve, reject) => {
self._libp2pNode.pubsub.subscribe(topic, options, handler, (err) => {
self.libp2p.pubsub.subscribe(topic, options, handler, (err) => {
if (err) {
return reject(err)
}
Expand All @@ -33,7 +33,7 @@ module.exports = function pubsub (self) {
})
}

self._libp2pNode.pubsub.subscribe(topic, options, handler, callback)
self.libp2p.pubsub.subscribe(topic, options, handler, callback)
},

unsubscribe: (topic, handler, callback) => {
Expand All @@ -43,7 +43,7 @@ module.exports = function pubsub (self) {
: Promise.reject(errPubsubDisabled())
}

self._libp2pNode.pubsub.unsubscribe(topic, handler)
self.libp2p.pubsub.unsubscribe(topic, handler)

if (!callback) {
return Promise.resolve()
Expand All @@ -56,28 +56,28 @@ module.exports = function pubsub (self) {
if (!self._options.EXPERIMENTAL.pubsub) {
return setImmediate(() => callback(errPubsubDisabled()))
}
self._libp2pNode.pubsub.publish(topic, data, callback)
self.libp2p.pubsub.publish(topic, data, callback)
}),

ls: promisify((callback) => {
if (!self._options.EXPERIMENTAL.pubsub) {
return setImmediate(() => callback(errPubsubDisabled()))
}
self._libp2pNode.pubsub.ls(callback)
self.libp2p.pubsub.ls(callback)
}),

peers: promisify((topic, callback) => {
if (!self._options.EXPERIMENTAL.pubsub) {
return setImmediate(() => callback(errPubsubDisabled()))
}
self._libp2pNode.pubsub.peers(topic, callback)
self.libp2p.pubsub.peers(topic, callback)
}),

setMaxListeners (n) {
if (!self._options.EXPERIMENTAL.pubsub) {
throw errPubsubDisabled()
}
self._libp2pNode.pubsub.setMaxListeners(n)
self.libp2p.pubsub.setMaxListeners(n)
}
}
}
Loading