diff --git a/package.json b/package.json index 590cb05a..1d550bbc 100644 --- a/package.json +++ b/package.json @@ -52,8 +52,8 @@ "iso-random-stream": "^1.1.1", "it-all": "^1.0.2", "it-drain": "^1.0.1", - "libp2p": "^0.27.0", - "libp2p-kad-dht": "^0.18.3", + "libp2p": "^0.28.0", + "libp2p-kad-dht": "^0.19.1", "libp2p-mplex": "^0.9.2", "libp2p-secio": "^0.12.1", "libp2p-tcp": "^0.14.2", @@ -65,9 +65,7 @@ "p-defer": "^3.0.0", "p-event": "^4.1.0", "p-wait-for": "^3.1.0", - "peer-book": "~0.9.0", "peer-id": "^0.13.5", - "peer-info": "^0.17.0", "promisify-es6": "^1.0.3", "rimraf": "^3.0.0", "sinon": "^9.0.0", @@ -84,6 +82,7 @@ "it-length-prefixed": "^3.0.0", "it-pipe": "^1.1.0", "just-debounce-it": "^1.1.0", + "libp2p-interfaces": "^0.3.0", "moving-average": "^1.0.0", "multicodec": "^1.0.0", "multihashing-async": "^0.8.0", diff --git a/src/index.js b/src/index.js index 0ef91d17..5c39c00a 100644 --- a/src/index.js +++ b/src/index.js @@ -37,7 +37,7 @@ const statsKeys = [ class Bitswap { constructor (libp2p, blockstore, options) { this._libp2p = libp2p - this._log = logger(this.peerInfo.id) + this._log = logger(this.peerId) this._options = Object.assign({}, defaultOptions, options) @@ -54,16 +54,16 @@ class Bitswap { // local database this.blockstore = blockstore - this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network, this._stats) + this.engine = new DecisionEngine(this.peerId, blockstore, this.network, this._stats) // handle message sending - this.wm = new WantManager(this.peerInfo.id, this.network, this._stats) + this.wm = new WantManager(this.peerId, this.network, this._stats) - this.notifications = new Notifications(this.peerInfo.id) + this.notifications = new Notifications(this.peerId) } - get peerInfo () { - return this._libp2p.peerInfo + get peerId () { + return this._libp2p.peerId } // handle messages received through the network diff --git a/src/network.js b/src/network.js index 8d34b157..207a3088 100644 --- a/src/network.js +++ b/src/network.js @@ -3,6 +3,8 @@ const lp = require('it-length-prefixed') const pipe = require('it-pipe') +const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology') + const Message = require('./types/message') const CONSTANTS = require('./constants') const logger = require('./utils').logger @@ -13,7 +15,7 @@ const BITSWAP120 = '/ipfs/bitswap/1.2.0' class Network { constructor (libp2p, bitswap, options, stats) { - this._log = logger(libp2p.peerInfo.id, 'network') + this._log = logger(libp2p.peerId, 'network') options = options || {} this.libp2p = libp2p this.bitswap = bitswap @@ -37,14 +39,21 @@ class Network { this._running = true this.libp2p.handle(this.protocols, this._onConnection) - this.libp2p.on('peer:connect', this._onPeerConnect) - this.libp2p.on('peer:disconnect', this._onPeerDisconnect) + // register protocol with topology + const topology = new MulticodecTopology({ + multicodecs: this.protocols, + handlers: { + onConnect: this._onPeerConnect, + onDisconnect: this._onPeerDisconnect + } + }) + this._registrarId = this.libp2p.registrar.register(topology) // All existing connections are like new ones for us for (const peer of this.libp2p.peerStore.peers.values()) { - if (this.libp2p.registrar.getConnection(peer)) { - this._onPeerConnect(peer) - } + const conn = this.libp2p.connectionManager.get(peer.id) + + conn && this._onPeerConnect(conn) } } @@ -54,8 +63,8 @@ class Network { // Unhandle both, libp2p doesn't care if it's not already handled this.libp2p.unhandle(this.protocols) - this.libp2p.removeListener('peer:connect', this._onPeerConnect) - this.libp2p.removeListener('peer:disconnect', this._onPeerDisconnect) + // unregister protocol and handlers + this.libp2p.registrar.unregister(this._registrarId) } /** @@ -92,12 +101,12 @@ class Network { } } - _onPeerConnect (peerInfo) { - this.bitswap._onPeerConnected(peerInfo.id) + _onPeerConnect (peerId) { + this.bitswap._onPeerConnected(peerId) } - _onPeerDisconnect (peerInfo) { - this.bitswap._onPeerDisconnected(peerInfo.id) + _onPeerDisconnect (peerId) { + this.bitswap._onPeerDisconnected(peerId) } /** @@ -181,7 +190,7 @@ class Network { /** * Connects to another peer * - * @param {PeerInfo|PeerId|Multiaddr} peer + * @param {PeerId|Multiaddr} peer * @param {Object} options * @param {AbortSignal} options.abortSignal * @returns {Promise} diff --git a/test/bitswap-stats.js b/test/bitswap-stats.js index 94167f93..82dfe9b7 100644 --- a/test/bitswap-stats.js +++ b/test/bitswap-stats.js @@ -169,7 +169,8 @@ describe('bitswap stats', () => { bs2 = bitswaps[1] bs2.start() - await libp2pNodes[0].dial(libp2pNodes[1].peerInfo) + const ma = `${libp2pNodes[1].multiaddrs[0]}/p2p/${libp2pNodes[1].peerId.toB58String()}` + await libp2pNodes[0].dial(ma) block = await makeBlock() @@ -212,7 +213,7 @@ describe('bitswap stats', () => { }) it('has peer stats', async () => { - const peerStats = bs2.stat().forPeer(libp2pNodes[0].peerInfo.id) + const peerStats = bs2.stat().forPeer(libp2pNodes[0].peerId) expect(peerStats).to.exist() const stats = await pEvent(peerStats, 'update') diff --git a/test/bitswap.js b/test/bitswap.js index 2679e437..727c4097 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -2,9 +2,9 @@ 'use strict' const { expect } = require('aegir/utils/chai') -const delay = require('delay') const PeerId = require('peer-id') const sinon = require('sinon') +const pWaitFor = require('p-wait-for') const Bitswap = require('../src') @@ -38,9 +38,12 @@ describe('bitswap without DHT', function () { ]) // connect 0 -> 1 && 1 -> 2 + const ma1 = `${nodes[1].libp2pNode.multiaddrs[0]}/p2p/${nodes[1].libp2pNode.peerId.toB58String()}` + const ma2 = `${nodes[2].libp2pNode.multiaddrs[0]}/p2p/${nodes[2].libp2pNode.peerId.toB58String()}` + await Promise.all([ - nodes[0].libp2pNode.dial(nodes[1].libp2pNode.peerInfo), - nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo) + nodes[0].libp2pNode.dial(ma1), + nodes[1].libp2pNode.dial(ma2) ]) }) @@ -132,9 +135,19 @@ describe('bitswap with DHT', function () { ]) // connect 0 -> 1 && 1 -> 2 + const ma1 = `${nodes[1].libp2pNode.multiaddrs[0]}/p2p/${nodes[1].libp2pNode.peerId.toB58String()}` + const ma2 = `${nodes[2].libp2pNode.multiaddrs[0]}/p2p/${nodes[2].libp2pNode.peerId.toB58String()}` + + await Promise.all([ + nodes[0].libp2pNode.dial(ma1), + nodes[1].libp2pNode.dial(ma2) + ]) + + // await dht routing table are updated await Promise.all([ - nodes[0].libp2pNode.dial(nodes[1].libp2pNode.peerInfo), - nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo) + pWaitFor(() => nodes[0].libp2pNode._dht.routingTable.size >= 1), + pWaitFor(() => nodes[1].libp2pNode._dht.routingTable.size >= 2), + pWaitFor(() => nodes[2].libp2pNode._dht.routingTable.size >= 1) ]) }) @@ -148,10 +161,11 @@ describe('bitswap with DHT', function () { it('put a block in 2, get it in 0', async () => { const block = await makeBlock() + const provideSpy = sinon.spy(nodes[2].libp2pNode._dht, 'provide') await nodes[2].bitswap.put(block) - // Give put time to process - await delay(100) + // wait for the DHT to finish providing + await provideSpy.returnValues[0] const blockRetrieved = await nodes[0].bitswap.get(block.cid) expect(block.data).to.eql(blockRetrieved.data) diff --git a/test/network/network.node.js b/test/network/network.node.js index acad3053..abfef09c 100644 --- a/test/network/network.node.js +++ b/test/network/network.node.js @@ -76,7 +76,7 @@ describe('network', () => { it('connectTo fail', async () => { try { - await networkA.connectTo(p2pB.peerInfo.id) + await networkA.connectTo(p2pB.peerId) assert.fail() } catch (err) { expect(err).to.exist() @@ -87,16 +87,17 @@ describe('network', () => { var counter = 0 bitswapMockA._onPeerConnected = (peerId) => { - expect(peerId.toB58String()).to.equal(p2pB.peerInfo.id.toB58String()) + expect(peerId.toB58String()).to.equal(p2pB.peerId.toB58String()) counter++ } bitswapMockB._onPeerConnected = (peerId) => { - expect(peerId.toB58String()).to.equal(p2pA.peerInfo.id.toB58String()) + expect(peerId.toB58String()).to.equal(p2pA.peerId.toB58String()) counter++ } - await p2pA.dial(p2pB.peerInfo) + const ma = `${p2pB.multiaddrs[0]}/p2p/${p2pB.peerId.toB58String()}` + await p2pA.dial(ma) await pWaitFor(() => counter >= 2) bitswapMockA._onPeerConnected = () => {} @@ -104,7 +105,8 @@ describe('network', () => { }) it('connectTo success', async () => { - await networkA.connectTo(p2pB.peerInfo) + const ma = `${p2pB.multiaddrs[0]}/p2p/${p2pB.peerId.toB58String()}` + await networkA.connectTo(ma) }) const versions = [{ @@ -134,7 +136,8 @@ describe('network', () => { bitswapMockB._receiveError = (err) => deferred.reject(err) - const { stream } = await p2pA.dialProtocol(p2pB.peerInfo, '/ipfs/bitswap/' + version.num) + const ma = `${p2pB.multiaddrs[0]}/p2p/${p2pB.peerId.toB58String()}` + const { stream } = await p2pA.dialProtocol(ma, '/ipfs/bitswap/' + version.num) await pipe( [version.serialize(msg)], lp.encode(), @@ -165,11 +168,12 @@ describe('network', () => { bitswapMockB._receiveError = deferred.reject - await networkA.sendMessage(p2pB.peerInfo.id, msg) + await networkA.sendMessage(p2pB.peerId, msg) }) it('dial to peer on Bitswap 1.0.0', async () => { - const { protocol } = await p2pA.dialProtocol(p2pC.peerInfo, ['/ipfs/bitswap/1.1.0', '/ipfs/bitswap/1.0.0']) + const ma = `${p2pC.multiaddrs[0]}/p2p/${p2pC.peerId.toB58String()}` + const { protocol } = await p2pA.dialProtocol(ma, ['/ipfs/bitswap/1.1.0', '/ipfs/bitswap/1.0.0']) expect(protocol).to.equal('/ipfs/bitswap/1.0.0') }) @@ -194,7 +198,7 @@ describe('network', () => { bitswapMockC._receiveError = deferred.reject - await networkA.sendMessage(p2pC.peerInfo.id, msg) + await networkA.sendMessage(p2pC.peerId, msg) await deferred.promise }) @@ -208,8 +212,9 @@ describe('network', () => { networkA.start() networkB.start() - // FIXME: have to already be connected as sendMessage only accepts a peer id, not a PeerInfo - await p2pA.dial(p2pB.peerInfo) + // In a real network scenario, peers will be discovered and their addresses + // will be added to the addressBook before bitswap kicks in + p2pA.peerStore.addressBook.set(p2pB.peerId, p2pB.multiaddrs) const deferred = pDefer() @@ -217,7 +222,7 @@ describe('network', () => { deferred.resolve() } - await networkA.sendMessage(p2pB.peerInfo.id, new Message(true)) + await networkA.sendMessage(p2pB.peerId, new Message(true)) return deferred }) diff --git a/test/utils/connect-all.js b/test/utils/connect-all.js index 100bebd2..40a34e0d 100644 --- a/test/utils/connect-all.js +++ b/test/utils/connect-all.js @@ -5,7 +5,7 @@ const without = require('lodash.without') module.exports = async (nodes) => { for (const node of nodes) { for (const otherNode of without(nodes, node)) { - await node.libp2pNode.dial(otherNode.bitswap.peerInfo) + await node.libp2pNode.dial(otherNode.bitswap.peerId) } } } diff --git a/test/utils/create-libp2p-node.js b/test/utils/create-libp2p-node.js index 4e630a35..98649980 100644 --- a/test/utils/create-libp2p-node.js +++ b/test/utils/create-libp2p-node.js @@ -5,8 +5,8 @@ const MPLEX = require('libp2p-mplex') const SECIO = require('libp2p-secio') const libp2p = require('libp2p') const KadDHT = require('libp2p-kad-dht') -const PeerInfo = require('peer-info') const PeerId = require('peer-id') + const defaultsDeep = require('@nodeutils/defaults-deep') class Node extends libp2p { @@ -38,10 +38,13 @@ class Node extends libp2p { async function createLibp2pNode (options = {}) { const id = await PeerId.create({ bits: 512 }) - const peerInfo = new PeerInfo(id) - peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/0') - options.peerInfo = peerInfo - const node = new Node(options) + const node = new Node({ + peerId: id, + addresses: { + listen: ['/ip4/0.0.0.0/tcp/0'] + }, + ...options + }) await node.start() return node diff --git a/test/utils/mocks.js b/test/utils/mocks.js index 2179b371..a0635211 100644 --- a/test/utils/mocks.js +++ b/test/utils/mocks.js @@ -1,8 +1,9 @@ 'use strict' const range = require('lodash.range') + const PeerId = require('peer-id') -const PeerInfo = require('peer-info') + const PeerStore = require('libp2p/src/peer-store') const Node = require('./create-libp2p-node').bundle const tmpdir = require('ipfs-utils/src/temp-dir') @@ -15,17 +16,25 @@ const Bitswap = require('../../src') * Create a mock libp2p node */ exports.mockLibp2pNode = () => { - const peerInfo = new PeerInfo(PeerId.createFromHexString('122019318b6e5e0cf93a2314bf01269a2cc23cd3dcd452d742cdb9379d8646f6e4a9')) + const peerId = PeerId.createFromHexString('122019318b6e5e0cf93a2314bf01269a2cc23cd3dcd452d742cdb9379d8646f6e4a9') return Object.assign(new EventEmitter(), { - peerInfo: peerInfo, + peerId, + multiaddrs: [], handle () {}, unhandle () {}, + registrar: { + register () {}, + unregister () {} + }, contentRouting: { provide: async (cid) => {}, // eslint-disable-line require-await findProviders: async (cid, timeout) => { return [] } // eslint-disable-line require-await }, - on () {}, + connectionManager: { + on () {}, + removeListener () {} + }, async dial (peer) { // eslint-disable-line require-await }, async dialProtocol (peer, protocol) { // eslint-disable-line require-await @@ -139,58 +148,44 @@ exports.applyNetwork = (bs, n) => { bs.engine.network = n } -let basePort = 12000 - /** * @private * @param {number} n The number of nodes in the network * @param {boolean} enableDHT Whether or not to run the dht */ exports.genBitswapNetwork = async (n, enableDHT = false) => { - const netArray = [] // bitswap, peerStore, libp2p, peerInfo, repo + const netArray = [] // bitswap, peerStore, libp2p, peerId, repo - // create PeerInfo and libp2p.Node for each + // create PeerId and libp2p.Node for each const peers = await Promise.all( - range(n).map(i => PeerInfo.create()) + range(n).map(i => PeerId.create()) ) peers.forEach((p, i) => { - basePort++ - p.multiaddrs.add('/ip4/127.0.0.1/tcp/' + basePort + '/ipfs/' + p.id.toB58String()) - const l = new Node({ - peerInfo: p, + peerId: p, + addresses: { + listen: ['/ip4/127.0.0.1/tcp/0'] + }, config: { dht: { enabled: enableDHT } } }) - netArray.push({ peerInfo: p, libp2p: l }) - }) - - // create PeerStore and populate peerStore - netArray.forEach((net, i) => { - const pb = netArray[i].libp2p.peerStore - netArray.forEach((net, j) => { - if (i === j) { - return - } - pb.put(net.peerInfo) - }) - netArray[i].peerStore = pb + netArray.push({ peerId: p, libp2p: l }) }) // create the repos const tmpDir = tmpdir() netArray.forEach((net, i) => { - const repoPath = tmpDir + '/' + net.peerInfo.id.toB58String() + const repoPath = tmpDir + '/' + net.peerId.toB58String() net.repo = new Repo(repoPath) }) await Promise.all( netArray.map(async (net) => { - const repoPath = tmpDir + '/' + net.peerInfo.id.toB58String() + const repoPath = tmpDir + '/' + net.peerId.toB58String() net.repo = new Repo(repoPath) await net.repo.init({}) @@ -203,6 +198,18 @@ exports.genBitswapNetwork = async (n, enableDHT = false) => { netArray.map((net) => net.libp2p.start()) ) + // create PeerStore and populate peerStore + netArray.forEach((net, i) => { + const pb = netArray[i].libp2p.peerStore + netArray.forEach((net, j) => { + if (i === j) { + return + } + pb.addressBook.set(net.peerId, net.libp2p.multiaddrs) + }) + netArray[i].peerStore = pb + }) + // create every BitSwap netArray.forEach((net) => { net.bitswap = new Bitswap(net.libp2p, net.repo.blocks, net.peerStore)