/
core.ts
341 lines (312 loc) · 9.46 KB
/
core.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
import * as u from './util'
import {
Config,
Outgoing,
Incoming,
Transport,
Transform,
ScopeStr,
} from './types'
const Muxrpc = require('muxrpc')
const pull = require('pull-stream')
// const Rate = require('pull-rate')
const MultiServer = require('multiserver')
const Inactive = require('pull-inactivity')
const debug = require('debug')('secret-stack')
function isPlainObject (o: unknown): o is Record<string, unknown> {
return o && typeof o === 'object' && !Array.isArray(o)
}
function toBase64 (s: Buffer | string) {
if (typeof s === 'string') return s
else return s.toString('base64') // assume a buffer
}
function each<T> (
objOrArr: Record<string, T> | Array<T>,
iter: (t: T, k: string | number, o: Record<string, T> | Array<T>) => void
) {
if (Array.isArray(objOrArr)) {
objOrArr.forEach(iter)
} else {
for (const key in objOrArr) iter(objOrArr[key], key, objOrArr)
}
}
function assertHasNameAndCreate (
obj: Transform | Transport,
type: 'transform' | 'transport'
) {
if (
!isPlainObject(obj) ||
typeof obj.name !== 'string' ||
typeof obj.create !== 'function'
) {
throw new Error(type + ' must be {name: string, create: function}')
}
}
// TODO: should probably replace this with ssb-ref#toMultiServerAddress or
// just delete this and let multiserver handle invalid addresses. The 2nd option
// sounds better, because we might already have address validation in ssb-conn
// and so we don't need that kind of logic in secret-stack anymore.
function coearseAddress (address: unknown) {
if (isPlainObject(address)) {
let protocol = 'net'
if (typeof address.host === 'string' && address.host.endsWith('.onion')) {
protocol = 'onion'
}
return (
[protocol, address.host, address.port].join(':') +
'~' +
['shs', toBase64(address.key as string)].join(':')
)
}
return address
}
/*
// Could be useful
function msLogger (stream) {
const meta = { tx: 0, rx: 0, pk: 0 }
stream = Rate(stream, function (len, up) {
meta.pk++
if (up) meta.tx += len
else meta.rx += len
})
stream.meta = meta
return stream
}
*/
function isPermsList (list: unknown) {
if (list === null) return true
if (typeof list === 'undefined') return true
return Array.isArray(list) && list.every((x) => typeof x === 'string')
}
function isPermissions (perms: unknown) {
// allow: null means enable everything.
return (
perms &&
isPlainObject(perms) &&
isPermsList(perms.allow) &&
isPermsList(perms.deny)
)
}
export = {
manifest: {
auth: 'async',
address: 'sync',
manifest: 'sync',
multiserver: {
parse: 'sync',
address: 'sync'
}
},
init (api: any, opts: Config, permissions: any, manifest: any) {
let timeoutInactivity: number
if (!isNaN(opts.timers?.inactivity as any)) {
timeoutInactivity = opts.timers?.inactivity!
}
// if opts.timers are set, pick a longer default
// but if not, set a short default (as needed in the tests)
timeoutInactivity = timeoutInactivity! || (opts.timers ? 600e3 : 5e3)
if (!opts.connections) {
const netIn: Incoming = {
scope: ['device', 'local', 'public'],
transform: 'shs',
...(opts.host ? {host: opts.host} : null),
...(opts.port ? {port: opts.port} : null),
}
const netOut: Outgoing = {
transform: 'shs'
}
opts.connections = {
incoming: {
net: [netIn]
},
outgoing: {
net: [netOut]
}
}
}
const peers: any = (api.peers = {})
const transports: Array<Transport> = []
const transforms: Array<Transform> = []
let server: any
let ms: any
let msClient: any
function setupMultiserver () {
if (api.closed) return
if (server) return server
if (transforms.length < 1) {
throw new Error('secret-stack needs at least 1 transform protocol')
}
const serverSuites: Array<[unknown, unknown]> = []
const clientSuites: Array<[unknown, unknown]> = []
for (const incTransport in opts.connections!.incoming) {
opts.connections!.incoming[incTransport].forEach((inc) => {
transforms.forEach((transform) => {
transports.forEach((transport) => {
if (
transport.name === incTransport &&
transform.name === inc.transform
) {
const msPlugin = transport.create(inc)
const msTransformPlugin = transform.create()
if (msPlugin.scope() !== inc.scope) {
throw new Error(
'transport:' +
transport.name +
' did not remember scope, expected:' +
inc.scope +
' got:' +
msPlugin.scope()
)
}
debug(
'creating server %s %s host=%s port=%d scope=%s',
incTransport,
transform.name,
inc.host,
inc.port,
inc.scope || 'undefined'
)
serverSuites.push([msPlugin, msTransformPlugin])
}
})
})
})
}
for (const outTransport in opts.connections!.outgoing) {
opts.connections!.outgoing[outTransport].forEach((out) => {
transforms.forEach((transform) => {
transports.forEach((transport) => {
if (
transport.name === outTransport &&
transform.name === out.transform
) {
const msPlugin = transport.create(out)
const msTransformPlugin = transform.create()
clientSuites.push([msPlugin, msTransformPlugin])
}
})
})
})
}
msClient = MultiServer(clientSuites)
ms = MultiServer(serverSuites)
server = ms.server(setupRPC, null, () => {
api.emit('multiserver:listening') // XXX return all scopes listing on?
})
if (!server) throw new Error('expected server')
return server
}
setImmediate(setupMultiserver)
function setupRPC (stream: any, manf: unknown, isClient?: boolean) {
// idea: make muxrpc part of the multiserver stream so that we can upgrade it.
// we'd need to fallback to using default muxrpc on ordinary connections.
// but maybe the best way to represent that would be to coearse addresses to
// include ~mux1 at the end if they didn't specify a muxrpc version.
const _id = '@' + u.toId(stream.remote)
const rpc = Muxrpc(
manifest,
manf ?? manifest,
api,
_id,
isClient
? permissions.anonymous
: isPermissions(stream.auth)
? stream.auth
: permissions.anonymous,
false
)
rpc.id = _id
let rpcStream = rpc.stream
if (timeoutInactivity > 0 && api.id !== rpc.id) {
rpcStream = Inactive(rpcStream, timeoutInactivity)
}
rpc.meta = stream.meta
rpc.stream.address = stream.address
pull(stream, rpcStream, stream)
// keep track of current connections.
if (!peers[rpc.id]) peers[rpc.id] = []
peers[rpc.id].push(rpc)
rpc.once('closed', () => {
peers[rpc.id].splice(peers[rpc.id].indexOf(rpc), 1)
})
api.emit('rpc:connect', rpc, !!isClient)
return rpc
}
return {
config: opts,
// can be called remotely.
auth (_pub: unknown, cb: Function) {
cb()
},
address (scope?: ScopeStr) {
return api.getAddress(scope)
},
getAddress (scope?: ScopeStr) {
setupMultiserver()
return ms.stringify(scope) || null
},
manifest () {
return manifest
},
getManifest () {
return this.manifest()
},
// cannot be called remote.
connect (address: unknown, cb: Function) {
setupMultiserver()
msClient.client(
coearseAddress(address),
(err: unknown, stream: unknown) => {
if (err) cb(err)
else cb(null, setupRPC(stream, null, true))
}
)
},
multiserver: {
transport (transport: Transport) {
if (server) {
throw new Error('cannot add protocol after server initialized')
}
assertHasNameAndCreate(transport, 'transport')
debug('Adding transport %s', transport.name)
transports.push(transport)
return this
},
transform (transform: Transform) {
assertHasNameAndCreate(transform, 'transform')
debug('Adding transform %s', transform.name)
transforms.push(transform)
return this
},
parse (str: string) {
return ms.parse(str)
},
address (scope?: ScopeStr) {
setupMultiserver()
return ms.stringify(scope) || null
}
},
close (err: unknown, cb: Function) {
if (typeof err === 'function') {
cb = err
err = null
}
api.closed = true
if (!server) cb && cb()
else {
(server.close ?? server)((err: unknown) => {
api.emit('close', err)
cb && cb(err)
})
}
if (err) {
each(peers, (connections: any) => {
each(connections, (rpc: any) => {
rpc.close(err)
})
})
}
}
}
}
}