Skip to content

Commit 9f70ea5

Browse files
fix: issues found when integrating into js-ipfs
1 parent b1a5eff commit 9f70ea5

File tree

14 files changed

+147
-75
lines changed

14 files changed

+147
-75
lines changed

src/decision/engine.js

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ const debug = require('debug')
44
const _ = require('highland')
55
const async = require('async')
66

7-
const log = debug('engine')
8-
log.error = debug('engine:error')
7+
const log = debug('bitswap:engine')
8+
log.error = debug('bitswap:engine:error')
99

1010
const Message = require('../message')
1111
const Wantlist = require('../wantlist')
@@ -29,7 +29,7 @@ module.exports = class Engine {
2929
const msg = new Message(false)
3030
msg.addBlock(env.block)
3131

32-
log('Sending block %s to %s', env.peer.toHexString(), env.block)
32+
log('Sending block to %s', env.peer.toB58String(), env.block.data.toString())
3333

3434
this.network.sendMessage(env.peer, msg, (err) => {
3535
if (err) {
@@ -55,6 +55,7 @@ module.exports = class Engine {
5555
if (!nextTask) return push(null, _.nil)
5656

5757
this.datastore.get(nextTask.entry.key, (err, block) => {
58+
log('fetched: %s', block.key.toString('hex'), block.data.toString())
5859
if (err || !block) {
5960
nextTask.done()
6061
} else {
@@ -78,11 +79,11 @@ module.exports = class Engine {
7879
}
7980

8081
wantlistForPeer (peerId) {
81-
if (!this.ledgerMap.has(peerId)) {
82+
if (!this.ledgerMap.has(peerId.toB58String())) {
8283
return new Map()
8384
}
8485

85-
return this.ledgerMap.get(peerId).wantlist.sortedEntries()
86+
return this.ledgerMap.get(peerId.toB58String()).wantlist.sortedEntries()
8687
}
8788

8889
peers () {
@@ -92,7 +93,7 @@ module.exports = class Engine {
9293
// Handle incoming messages
9394
messageReceived (peerId, msg, cb) {
9495
if (msg.empty) {
95-
log('received empty message from %s', peerId)
96+
log('received empty message from %s', peerId.toB58String())
9697
}
9798

9899
const ledger = this._findOrCreate(peerId)
@@ -103,7 +104,7 @@ module.exports = class Engine {
103104
}
104105

105106
this._processBlocks(msg.blocks, ledger)
106-
107+
log('wantlist', Array.from(msg.wantlist.values()))
107108
async.eachSeries(
108109
msg.wantlist.values(),
109110
this._processWantlist.bind(this, ledger, peerId),
@@ -133,19 +134,20 @@ module.exports = class Engine {
133134

134135
_processWantlist (ledger, peerId, entry, cb) {
135136
if (entry.cancel) {
136-
log('cancel %s', entry.key)
137+
log('cancel %s', entry.key.toString('hex'))
137138
ledger.cancelWant(entry.key)
138139
this.peerRequestQueue.remove(entry.key, peerId)
139140
async.setImmediate(() => cb())
140141
} else {
141-
log('wants %s - %s', entry.key, entry.priority)
142+
log('wants %s - %s', entry.key.toString('hex'), entry.priority)
142143
ledger.wants(entry.key, entry.priority)
143144

144145
// If we already have the block, serve it
145146
this.datastore.has(entry.key, (err, exists) => {
146147
if (err) {
147-
log('failed existence check %s', entry.key)
148-
} else {
148+
log('failed existence check %s', entry.key.toString('hex'))
149+
} else if (exists) {
150+
log('has want %s', entry.key.toString('hex'))
149151
this.peerRequestQueue.push(entry.entry, peerId)
150152
}
151153
cb()
@@ -155,7 +157,7 @@ module.exports = class Engine {
155157

156158
_processBlocks (blocks, ledger) {
157159
for (let block of blocks.values()) {
158-
log('got block %s %s bytes', block.key, block.data.length)
160+
log('got block %s %s bytes', block.key.toString('hex'), block.data.length)
159161
ledger.receivedBytes(block.data.length)
160162

161163
this.receivedBlock(block)
@@ -181,21 +183,21 @@ module.exports = class Engine {
181183
}
182184

183185
peerDisconnected (peerId) {
184-
// if (this.ledgerMap.has(peerId)) {
185-
// this.ledgerMap.delete(peerId)
186+
// if (this.ledgerMap.has(peerId.toB58String())) {
187+
// this.ledgerMap.delete(peerId.toB58String())
186188
// }
187189
//
188190
// TODO: figure out how to remove all other references
189191
// in the peerrequestqueue
190192
}
191193

192194
_findOrCreate (peerId) {
193-
if (this.ledgerMap.has(peerId)) {
194-
return this.ledgerMap.get(peerId)
195+
if (this.ledgerMap.has(peerId.toB58String())) {
196+
return this.ledgerMap.get(peerId.toB58String())
195197
}
196198

197199
const l = new Ledger(peerId)
198-
this.ledgerMap.set(peerId, l)
200+
this.ledgerMap.set(peerId.toB58String(), l)
199201

200202
return l
201203
}

src/decision/peer-request-queue.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ module.exports = class PeerRequestQueue {
5353

5454
// Add a new entry to the queue
5555
push (entry, to) {
56-
let partner = this.partners.get(to)
56+
let partner = this.partners.get(to.toB58String())
5757

5858
if (!partner) {
5959
partner = new ActivePartner()
6060
this.pQueue.push(partner)
61-
this.partners.set(to, partner)
61+
this.partners.set(to.toB58String(), partner)
6262
}
6363

6464
if (partner.activeBlocks.has(entry.key)) {
@@ -118,13 +118,13 @@ module.exports = class PeerRequestQueue {
118118
t.trash = true
119119

120120
// having canceled a block, we now account for that in the given partner
121-
this.partners.get(peerId).requests --
121+
this.partners.get(peerId.toB58String()).requests --
122122
}
123123
}
124124
}
125125

126126
function taskKey (peerId, key) {
127-
return `${peerId.toHexString()}:${key.toString('hex')}`
127+
return `${peerId.toB58String()}:${key.toString('hex')}`
128128
}
129129

130130
function partnerCompare (a, b) {

src/index.js

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ module.exports = class Bitwap {
3333

3434
this.notifications = new EventEmitter()
3535
this.notifications.setMaxListeners(cs.maxListeners)
36-
37-
this.wm.run()
3836
}
3937

4038
// handle messages received through the network
4139
_receiveMessage (peerId, incoming, cb) {
40+
cb = cb || (() => {})
41+
log('receiving message from %s', peerId.toB58String())
4242
this.engine.messageReceived(peerId, incoming, (err) => {
4343
if (err) {
4444
log('failed to receive message', incoming)
@@ -55,7 +55,7 @@ module.exports = class Bitwap {
5555
for (let block of iblocks.values()) {
5656
const found = this.wm.wl.contains(block.key)
5757
if (!found) {
58-
log('received un-askes-for %s from %s', block, peerId)
58+
log('received un-askes-for %s from %s', block.key.toString('hex'), peerId.toB58String())
5959
} else {
6060
keys.push(block.key)
6161
}
@@ -71,7 +71,7 @@ module.exports = class Bitwap {
7171
return innerCb()
7272
}
7373

74-
log('got block %s from %s', block, peerId)
74+
log('got block from %s', peerId.toB58String(), block.data.toString())
7575
innerCb()
7676
}),
7777
(innerCb) => this.hasBlock(block, (err) => {
@@ -104,14 +104,15 @@ module.exports = class Bitwap {
104104
}
105105

106106
_tryPutBlock (block, times, cb) {
107+
log('trying to put block %s', block.data.toString())
107108
async.retry({times, interval: 400}, (done) => {
108109
this.datastore.put(block, done)
109110
}, cb)
110111
}
111112

112113
// handle errors on the receiving channel
113114
_receiveError (err) {
114-
log.debug('Bitswap ReceiveError: %s', err.message)
115+
log.error('ReceiveError: %s', err.message)
115116
}
116117

117118
// handle new peers
@@ -155,6 +156,7 @@ module.exports = class Bitwap {
155156
const blocks = []
156157
const finish = (block) => {
157158
blocks.push(block)
159+
log('finish: %s/%s', blocks.length, keys.length)
158160
if (blocks.length === keys.length) {
159161
cb(null, blocks)
160162
}
@@ -163,20 +165,28 @@ module.exports = class Bitwap {
163165
keys.forEach((key) => {
164166
// Sanity check, we don't want to announce looking for blocks
165167
// when we might have them ourselves
166-
this.datastore.get(key, (err, res) => {
167-
if (!err && res) {
168-
this.wm.cancelWants([key])
169-
finish(res)
168+
this.datastore.has(key, (err, exists) => {
169+
if (err) {
170+
log('error in datastore.has: ', err.message)
170171
return
171172
}
172173

173-
if (err) {
174-
log('error in datastore.get: ', err.message)
175-
}
174+
if (exists) {
175+
this.datastore.get(key, (err, res) => {
176+
if (!err && res) {
177+
this.wm.cancelWants([key])
178+
finish(res)
179+
return
180+
}
176181

177-
this.notifications.once(`block:${key.toString('hex')}`, (block) => {
178-
finish(block)
179-
})
182+
if (err) {
183+
log('error in datastore.get: ', err.message)
184+
}
185+
})
186+
}
187+
})
188+
this.notifications.once(`block:${key.toString('hex')}`, (block) => {
189+
finish(block)
180190
})
181191
})
182192

@@ -197,6 +207,7 @@ module.exports = class Bitwap {
197207
log.error('Error writing block to datastore: %s', err.message)
198208
return cb(err)
199209
}
210+
log('put block: %s', block.key.toString('hex'))
200211
this.notifications.emit(`block:${block.key.toString('hex')}`, block)
201212
this.engine.receivedBlock(block)
202213
cb()
@@ -216,4 +227,15 @@ module.exports = class Bitwap {
216227
peers: this.engine.peers()
217228
}
218229
}
230+
231+
start () {
232+
this.wm.run()
233+
this.network.start()
234+
}
235+
236+
// Halt everything
237+
stop () {
238+
this.wm.stop()
239+
this.network.start()
240+
}
219241
}

src/message/index.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,31 +21,31 @@ class BitswapMessage {
2121
}
2222

2323
addEntry (key, priority, cancel) {
24-
const e = this.wantlist.get(key)
24+
const e = this.wantlist.get(key.toString('hex'))
2525

2626
if (e) {
2727
e.priority = priority
2828
e.cancel = Boolean(cancel)
2929
} else {
30-
this.wantlist.set(key, new Entry(key, priority, cancel))
30+
this.wantlist.set(key.toString('hex'), new Entry(key, priority, cancel))
3131
}
3232
}
3333

3434
addBlock (block) {
35-
this.blocks.set(block.key, block)
35+
this.blocks.set(block.key.toString('hex'), block)
3636
}
3737

3838
cancel (key) {
39-
this.wantlist.delete(key)
40-
this.addEntry(key, 0, true)
39+
this.wantlist.delete(key.toString('hex'))
40+
this.addEntry(key.toString('hex'), 0, true)
4141
}
4242

4343
toProto () {
4444
return pbm.Message.encode({
4545
wantlist: {
4646
entries: Array.from(this.wantlist.values()).map((e) => {
4747
return {
48-
block: String(e.key),
48+
block: e.key.toString('hex'),
4949
priority: Number(e.priority),
5050
cancel: Boolean(e.cancel)
5151
}
@@ -73,7 +73,7 @@ BitswapMessage.fromProto = (raw) => {
7373
const m = new BitswapMessage(dec.wantlist.full)
7474

7575
dec.wantlist.entries.forEach((e) => {
76-
m.addEntry(e.block, e.priority, e.cancel)
76+
m.addEntry(new Buffer(e.block, 'hex'), e.priority, e.cancel)
7777
})
7878
dec.blocks.forEach((b) => m.addBlock(new Block(b)))
7979

src/network/index.js

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,38 @@
22

33
const bl = require('bl')
44
const async = require('async')
5+
const debug = require('debug')
6+
57
const Message = require('../message')
8+
const log = debug('bitswap:network')
9+
10+
const PROTOCOL_IDENTIFIER = '/ipfs/bitswap/1.0.0'
611

712
module.exports = class Network {
813
constructor (libp2p, peerBook, bitswap) {
914
this.libp2p = libp2p
1015
this.peerBook = peerBook
1116
this.bitswap = bitswap
12-
13-
this._attachSwarmListeners()
1417
}
1518

16-
_attachSwarmListeners () {
17-
this.libp2p.swarm.handle('/ipfs/bitswap/1.0.0', this._onConnection.bind(this))
19+
start () {
20+
// bind event listeners
21+
this._onConnection = this._onConnection.bind(this)
22+
this._onPeerMux = this._onPeerMux.bind(this)
23+
this._onPeerMuxClosed = this._onPeerMuxClosed.bind(this)
24+
25+
this.libp2p.swarm.handle(PROTOCOL_IDENTIFIER, this._onConnection)
26+
27+
this.libp2p.swarm.on('peer-mux-established', this._onPeerMux)
28+
29+
this.libp2p.swarm.on('peer-mux-closed', this._onPeerMuxClosed)
30+
}
1831

19-
this.libp2p.swarm.on('peer-mux-established', this._onPeerMux.bind(this))
32+
stop () {
33+
this.libp2p.swarm.unhandle(PROTOCOL_IDENTIFIER)
34+
this.libp2p.swarm.removeEventListener('peer-mux-established', this._onPeerMux)
2035

21-
this.libp2p.swarm.on('peer-mux-closed', this._onPeerMuxClosed.bind(this))
36+
this.libp2p.swarm.removeEventListener('peer-mux-closed', this._onPeerMuxClosed)
2237
}
2338

2439
_onConnection (conn) {
@@ -47,6 +62,7 @@ module.exports = class Network {
4762

4863
// Connect to the given peer
4964
connectTo (peerId, cb) {
65+
log('connecting to %s', peerId.toB58String())
5066
const done = (err) => async.setImmediate(() => cb(err))
5167
// NOTE: For now, all this does is ensure that we are
5268
// connected. Once we have Peer Routing, we will be able
@@ -60,6 +76,8 @@ module.exports = class Network {
6076

6177
// Send the given msg (instance of Message) to the given peer
6278
sendMessage (peerId, msg, cb) {
79+
log('sendMessage to %s', peerId.toB58String())
80+
log('msg %s', msg.full, msg.wantlist, msg.blocks)
6381
const done = (err) => async.setImmediate(() => cb(err))
6482
let peerInfo
6583
try {
@@ -75,7 +93,7 @@ module.exports = class Network {
7593

7694
conn.write(msg.toProto())
7795
conn.once('error', (err) => done(err))
78-
conn.once('end', done)
96+
conn.once('finish', done)
7997
conn.end()
8098
})
8199
}

0 commit comments

Comments
 (0)