-
-
Notifications
You must be signed in to change notification settings - Fork 42
/
stream.ts
179 lines (152 loc) 路 5.83 KB
/
stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import { createNanoEvents } from 'nanoevents'
import uuid from 'tiny-uid'
import type { Emitter } from 'nanoevents'
import type { JsonValue } from 'type-fest'
import type { Endpoint, HybridUnsubscriber, RuntimeContext, StreamInfo } from '../types'
import type { EndpointRuntime } from './endpoint-runtime'
import { parseEndpoint } from './endpoint'
/**
* Built on top of Bridge. Nothing much special except that Stream allows
* you to create a namespaced scope under a channel name of your choice
* and allows continuous e2e communication, with less possibility of
* conflicting messageId's, since streams are strictly scoped.
*/
export class Stream {
private static initDone = false
private static openStreams: Map<string, Stream> = new Map()
private emitter: Emitter = createNanoEvents()
private isClosed = false
constructor(private endpointRuntime: EndpointRuntime, private streamInfo: StreamInfo) {
if (!Stream.initDone) {
endpointRuntime.onMessage<{ streamId: string; action: 'transfer' | 'close'; streamTransfer: JsonValue }, string>('__crx_bridge_stream_transfer__', (msg) => {
const { streamId, streamTransfer, action } = msg.data
const stream = Stream.openStreams.get(streamId)
if (stream && !stream.isClosed) {
if (action === 'transfer')
stream.emitter.emit('message', streamTransfer)
if (action === 'close') {
Stream.openStreams.delete(streamId)
stream.handleStreamClose()
}
}
})
Stream.initDone = true
}
Stream.openStreams.set(this.streamInfo.streamId, this)
}
/**
* Returns stream info
*/
public get info(): StreamInfo {
return this.streamInfo
}
/**
* Sends a message to other endpoint.
* Will trigger onMessage on the other side.
*
* Warning: Before sending sensitive data, verify the endpoint using `stream.info.endpoint.isInternal()`
* The other side could be malicious webpage speaking same language as webext-bridge
* @param msg
*/
public send(msg?: JsonValue): void {
if (this.isClosed)
throw new Error('Attempting to send a message over closed stream. Use stream.onClose(<callback>) to keep an eye on stream status')
this.endpointRuntime.sendMessage('__crx_bridge_stream_transfer__', {
streamId: this.streamInfo.streamId,
streamTransfer: msg,
action: 'transfer',
}, this.streamInfo.endpoint)
}
/**
* Closes the stream.
* Will trigger stream.onClose(<callback>) on both endpoints.
* If needed again, spawn a new Stream, as this instance cannot be re-opened
* @param msg
*/
public close(msg?: JsonValue): void {
if (msg)
this.send(msg)
this.handleStreamClose()
this.endpointRuntime.sendMessage('__crx_bridge_stream_transfer__', {
streamId: this.streamInfo.streamId,
streamTransfer: null,
action: 'close',
}, this.streamInfo.endpoint)
}
/**
* Registers a callback to fire whenever other endpoint sends a message
* @param callback
*/
public onMessage<T extends JsonValue>(callback: (msg?: T) => void): HybridUnsubscriber {
return this.getDisposable('message', callback)
}
/**
* Registers a callback to fire whenever stream.close() is called on either endpoint
* @param callback
*/
public onClose<T extends JsonValue>(callback: (msg?: T) => void): HybridUnsubscriber {
return this.getDisposable('closed', callback)
}
private handleStreamClose = () => {
if (!this.isClosed) {
this.isClosed = true
this.emitter.emit('closed', true)
this.emitter.events = {}
}
}
private getDisposable(event: string, callback: () => void): HybridUnsubscriber {
const off = this.emitter.on(event, callback)
return Object.assign(off, {
dispose: off,
close: off,
})
}
}
export const createStreamWirings = (endpointRuntime: EndpointRuntime) => {
const openStreams = new Map<string, Stream>()
const onOpenStreamCallbacks = new Map<string, (stream: Stream) => void>()
const streamyEmitter = createNanoEvents()
endpointRuntime.onMessage<{ channel: string; streamId: string }, string>('__crx_bridge_stream_open__', (message) => {
return new Promise((resolve) => {
const { sender, data } = message
const { channel } = data
let watching = false
let off = () => { }
const readyup = () => {
const callback = onOpenStreamCallbacks.get(channel)
if (typeof callback === 'function') {
callback(new Stream(endpointRuntime, { ...data, endpoint: sender }))
if (watching)
off()
resolve(true)
}
else if (!watching) {
watching = true
off = streamyEmitter.on('did-change-stream-callbacks', readyup)
}
}
readyup()
})
})
async function openStream(channel: string, destination: RuntimeContext | Endpoint | string): Promise<Stream> {
if (openStreams.has(channel))
throw new Error('webext-bridge: A Stream is already open at this channel')
const endpoint = typeof destination === 'string' ? parseEndpoint(destination) : destination
const streamInfo: StreamInfo = { streamId: uuid(), channel, endpoint }
const stream = new Stream(endpointRuntime, streamInfo)
stream.onClose(() => openStreams.delete(channel))
await endpointRuntime.sendMessage('__crx_bridge_stream_open__', streamInfo as unknown as JsonValue, endpoint)
openStreams.set(channel, stream)
return stream
}
function onOpenStreamChannel(channel: string, callback: (stream: Stream) => void): void {
if (onOpenStreamCallbacks.has(channel))
throw new Error('webext-bridge: This channel has already been claimed. Stream allows only one-on-one communication')
onOpenStreamCallbacks.set(channel, callback)
streamyEmitter.emit('did-change-stream-callbacks')
}
return {
openStream,
onOpenStreamChannel,
}
}