-
Notifications
You must be signed in to change notification settings - Fork 27
/
mux.js
342 lines (293 loc) · 10.6 KB
/
mux.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
var Protocol = require('hypercore-protocol')
var readify = require('./ready')
var inherits = require('inherits')
var events = require('events')
var debug = require('debug')('multifeed')
var once = require('once')
// constants
var MULTIFEED = 'MULTIFEED'
var PROTOCOL_VERSION = '4.0.0'
// extensions
var EXT_HANDSHAKE = 'MULTIFEED_HANDSHAKE'
var EXT_MANIFEST = 'MULTIFEED_MANIFEST'
var EXT_REQUEST_FEEDS = 'MULTIFEED_REQUEST_FEEDS'
var EXT_REPLICATE_FEEDS = 'MULTIFEED_REPLICATE_FEEDS'
// errors
var ERR_VERSION_MISMATCH = 'ERR_VERSION_MISMATCH'
var ERR_CLIENT_MISMATCH = 'ERR_CLIENT_MISMATCH'
// `key` - protocol encryption key
function Multiplexer (isInitiator, key, opts) {
if (!(this instanceof Multiplexer)) return new Multiplexer(isInitiator, key, opts)
var self = this
self._opts = opts = opts || {}
this._id = opts._id || Math.floor(Math.random() * 10000).toString(16)
this._initiator = isInitiator
debug(this._id + ' [REPLICATION] New mux initialized', opts)
// initialize
self._localOffer = []
self._requestedFeeds = []
self._remoteOffer = []
self._activeFeedStreams = {}
var onFirstKey = true
if (Protocol.isProtocolStream(isInitiator)) {
var stream = this.stream = isInitiator
stream.on('discovery-key', ondiscoverykey)
} else {
var stream = this.stream = new Protocol(isInitiator, Object.assign({}, opts, {
ondiscoverykey
}))
}
function ondiscoverykey (key) {
if (onFirstKey) {
onFirstKey = false
if (!self.stream.remoteVerified(key)) {
self._finalize(new Error('Exchange key did not match remote'))
}
}
}
this._handshakeExt = this.stream.registerExtension(EXT_HANDSHAKE, {
onmessage: onHandshake,
onerror: function (err) {
self._finalize(err)
},
encoding: 'json'
})
function onHandshake (header) {
debug(self._id + ' [REPLICATION] recv\'d handshake: ', JSON.stringify(header))
var err
if (!compatibleVersions(header.version, PROTOCOL_VERSION)) {
debug(self._id + ' [REPLICATION] aborting; version mismatch (us=' + PROTOCOL_VERSION + ')')
err = new Error('protocol version mismatch! us=' + PROTOCOL_VERSION + ' them=' + header.version)
err.code = ERR_VERSION_MISMATCH
err.usVersion = PROTOCOL_VERSION
err.themVersion = header.version
self._finalize(err)
return
}
if (header.client !== MULTIFEED) {
debug(self._id + ' [REPLICATION] aborting; Client mismatch! expected ', MULTIFEED, 'but got', header.client)
err = new Error('Client mismatch! expected ' + MULTIFEED + ' but got ' + header.client)
err.code = ERR_CLIENT_MISMATCH
err.usClient = MULTIFEED
err.themClient = header.client
self._finalize(err)
return
}
// Wait a tick, otherwise the _ready handler below won't be listening for this event yet.
process.nextTick(function () {
self.emit('ready', header)
})
}
// Open a virtual feed that has the key set to the shared key.
this._feed = stream.open(key, {
onopen: function () {
onFirstKey = false
if (!stream.remoteVerified(key)) {
debug(self._id + ' [REPLICATION] aborting; shared key mismatch')
self._finalize(new Error('shared key version mismatch!'))
return
}
// send handshake
self._handshakeExt.send(Object.assign({}, opts, {
client: MULTIFEED,
version: PROTOCOL_VERSION,
userData: opts.userData
}))
}
})
this._manifestExt = stream.registerExtension(EXT_MANIFEST, {
onmessage: function (msg) {
debug(self._id, 'RECV\'D Ext MANIFEST:', JSON.stringify(msg))
self._remoteOffer = uniq(self._remoteOffer.concat(msg.keys))
self.emit('manifest', msg, self.requestFeeds.bind(self))
},
onerror: function (err) {
self._finalize(err)
},
encoding: 'json'
})
this._requestFeedsExt = stream.registerExtension(EXT_REQUEST_FEEDS, {
onmessage: function (msg) {
debug(self._id, 'RECV\'D Ext REQUEST_FEEDS:', msg)
self._onRequestFeeds(msg)
},
onerror: function (err) {
self._finalize(err)
},
encoding: 'json'
})
this._replicateFeedsExt = stream.registerExtension(EXT_REPLICATE_FEEDS, {
onmessage: function (msg) {
debug(self._id, 'RECV\'D Ext REPLICATE_FEEDS:', msg)
self._onRemoteReplicate(msg)
},
onerror: function (err) {
self._finalize(err)
},
encoding: 'json'
})
if (!self._opts.live) {
self.stream.on('prefinalize', onPrefinalize)
function onPrefinalize () {
self.stream.removeListener('prefinalize', onPrefinalize)
self._feed.close()
debug(self._id + ' [REPLICATION] feed finish/prefinalize (' + self.stream.prefinalize._tick + ')')
}
}
this._ready = readify(function (done) {
self.once('ready', function (remote) {
debug(self._id + ' [REPLICATION] remote connected and ready')
done(remote)
})
})
}
inherits(Multiplexer, events.EventEmitter)
Multiplexer.prototype.ready = function (cb) {
this._ready(cb)
}
Multiplexer.prototype._finalize = function (err) {
if (err) {
debug(this._id + ' [REPLICATION] destroyed due to', err)
this.stream.emit('error', err)
this.stream.destroy(err)
} else {
debug(this._id + ' [REPLICATION] finalized', err)
this.stream.finalize()
}
}
// Calls to this method results in the creation of a 'manifest'
// that gets transmitted to the other end.
// application is allowed to provide optional custom data in the opts for higher-level
// 'want' selections.
// The manifest-prop `keys` is required, and must equal an array of strings.
Multiplexer.prototype.offerFeeds = function (keys, opts) {
var manifest = Object.assign(opts || {}, {
keys: extractKeys(keys)
})
debug(this._id + ' [REPLICATON] sending manifest:', manifest)
manifest.keys.forEach(function (key) { this._localOffer.push(key) }.bind(this))
this._manifestExt.send(manifest)
}
// Sends your wishlist to the remote
// for classical multifeed `ACCEPT_ALL` behaviour both parts must call `want(remoteHas)`
Multiplexer.prototype.requestFeeds = function (keys) {
keys = extractKeys(keys)
keys.forEach(function (k) { this._requestedFeeds.push(k) }.bind(this))
debug(this._id + ' [REPLICATION] Sending feeds request', keys)
this._requestFeedsExt.send(keys)
}
Multiplexer.prototype._onRequestFeeds = function (keys) {
var self = this
var filtered = keys.filter(function (key) {
if (self._localOffer.indexOf(key) === -1) {
debug('[REPLICATION] Warning, remote requested feed that is not in offer', key)
return false
}
// All good, we accept the key request
return true
})
filtered = uniq(filtered)
// Tell remote which keys we will replicate
debug(this._id, '[REPLICATION] Sending REPLICATE_FEEDS')
this._replicateFeedsExt.send(filtered)
// Start replicating as promised.
this._replicateFeeds(filtered, false)
}
Multiplexer.prototype._onRemoteReplicate = function (keys) {
var self = this
var filtered = keys.filter(function (key) {
return self._requestedFeeds.indexOf(key) !== -1
})
// Start replicating as requested.
this._replicateFeeds(filtered, true, function () {
self.stream.emit('remote-feeds')
})
}
// Initializes new replication streams for feeds and joins their streams into
// the main stream.
Multiplexer.prototype._replicateFeeds = function (keys, terminateIfNoFeeds, cb) {
if (!cb) cb = noop
var self = this
keys = uniq(keys)
debug(this._id, '[REPLICATION] _replicateFeeds', keys.length, keys)
// Postpone stream finalization until all pending cores are added. Otherwise
// a non-live replication might terminate because it thinks all feeds have
// been synced, even though new ones are still in the process of being set up
// for sync.
this.stream.prefinalize.wait()
this.emit('replicate', keys, once(startFeedReplication))
return keys
function startFeedReplication (feeds) {
if (!Array.isArray(feeds)) feeds = [feeds]
var pending = feeds.length
// Stop postponement of prefinalization.
self.stream.prefinalize.continue()
// only the feeds passed to `feeds` option will be replicated (sent or received)
// hypercore-protocol has built in protection against receiving unexpected/not asked for data.
feeds.forEach(function (feed) {
feed.ready(function () { // wait for each feed to be ready before replicating.
var hexKey = feed.key.toString('hex')
// prevent a feed from being folded into the main stream twice.
if (typeof self._activeFeedStreams[hexKey] !== 'undefined') {
if (!--pending) cb()
return
}
debug(self._id, '[REPLICATION] replicating feed:', hexKey)
var fStream = feed.replicate(self._initiator, Object.assign({}, {
live: self._opts.live,
download: self._opts.download,
upload: self._opts.upload,
encrypt: self._opts.encrypt,
stream: self.stream
}))
// Store reference to this particular feed stream
self._activeFeedStreams[hexKey] = fStream
var cleanup = function (_, res) {
fStream.removeListener('end', cleanup)
fStream.removeListener('error', cleanup)
if (!self._activeFeedStreams[hexKey]) return
// delete feed stream reference
delete self._activeFeedStreams[hexKey]
debug(self._id, '[REPLICATION] feedStream closed:', hexKey.substr(0, 8))
}
fStream.once('end', cleanup)
fStream.once('error', cleanup)
if (!--pending) cb()
})
})
// Bail on replication entirely if there were no feeds to add, and none are pending or active.
if (feeds.length === 0 && Object.keys(self._activeFeedStreams).length === 0 && terminateIfNoFeeds) {
debug(self._id, '[REPLICATION] terminating mux: no feeds to sync')
self._feed.close()
process.nextTick(cb)
} else if (feeds.length === 0) {
process.nextTick(cb)
}
}
}
Multiplexer.prototype.knownFeeds = function () {
return this._localOffer.concat(this._remoteOffer)
}
module.exports = Multiplexer
// String, String -> Boolean
function compatibleVersions (v1, v2) {
var major1 = v1.split('.')[0]
var major2 = v2.split('.')[0]
return parseInt(major1) === parseInt(major2)
}
function extractKeys (keys) {
if (!Array.isArray(keys)) keys = [keys]
return keys.map(function (o) {
if (typeof o === 'string') return o
if (typeof o === 'object' && o.key) return o.key.toString('hex')
if (o instanceof Buffer) return o.toString('utf8')
})
.filter(function (o) { return !!o }) // remove invalid entries
}
function uniq (arr) {
return Object.keys(arr.reduce(function (m, i) {
m[i] = true
return m
}, {})).sort()
}
function noop () {}