-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
Cbor.ts
75 lines (67 loc) · 1.92 KB
/
Cbor.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import type { Peer } from "../../peer.js";
import { Decoder, Encoder } from "cbor-x";
import { StreamConnection } from "./StreamConnection.js";
const NullValue = Symbol.for(null);
function concatUint8Array(buffer1: Uint8Array, buffer2: Uint8Array) {
const tmp = new Uint8Array(buffer1.byteLength + buffer2.byteLength);
tmp.set(buffer1, 0);
tmp.set(buffer2, buffer1.byteLength);
return new Uint8Array(tmp.buffer);
}
const iterateOver = async function* (stream: ReadableStream) {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) return;
yield value;
}
} finally {
reader.releaseLock();
}
};
export class Cbor extends StreamConnection {
readonly serialization = "Cbor";
private _encoder = new Encoder();
private _decoder = new Decoder();
private _inc;
private _decoderStream = new TransformStream<ArrayBuffer, unknown>({
transform: (abchunk, controller) => {
let chunk = new Uint8Array(abchunk);
if (this._inc) {
chunk = concatUint8Array(this._inc, chunk);
this._inc = null;
}
let values;
try {
values = this._decoder.decodeMultiple(chunk);
} catch (error) {
if (error.incomplete) {
this._inc = chunk.subarray(error.lastPosition);
values = error.values;
} else throw error;
} finally {
for (let value of values || []) {
if (value === null) value = NullValue;
controller.enqueue(value);
}
}
},
});
constructor(peerId: string, provider: Peer, options: any) {
super(peerId, provider, { ...options, reliable: true });
void this._rawReadStream.pipeTo(this._decoderStream.writable);
(async () => {
for await (const msg of iterateOver(this._decoderStream.readable)) {
if (msg.__peerData?.type === "close") {
this.close();
return;
}
this.emit("data", msg);
}
})();
}
protected override _send(data) {
return this.writer.write(this._encoder.encode(data));
}
}