-
Notifications
You must be signed in to change notification settings - Fork 42
/
MessagesArray.ts
73 lines (63 loc) · 2.43 KB
/
MessagesArray.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import { Message } from "@pipeline/process/Types";
import { BitStream } from "@pipeline/serialization/BitStream";
import { MessageBitConfig, readMessage, writeMessage } from "@pipeline/serialization/MessageSerialization";
/**
* Acts just like an array, but it serializes the messages to a BitStream to improve memory usage and performance.
*
* It needs a `MessageBitConfig` to know the bit configuration of the messages.
*/
export class MessagesArray implements Iterable<Message> {
/** The stream where messages are written of read from */
public stream: BitStream;
/**
* The number of messages in the array.
* We have to store this value separately, since otherwise we would have to iterate over all the messages to get the length.
*/
public length: number;
/**
* The offset where the messages start in the stream.
* This is the offset that was initially in the stream when the array was created.
*/
private readonly startOffset: number;
/**
* Creates an array. You can provide an existing stream.
*
* @param count the number of messages in the stream
*/
constructor(public readonly bitConfig: MessageBitConfig, stream?: BitStream, count?: number) {
if (stream) {
if (count === undefined) throw new Error("Count is required");
this.stream = stream;
this.length = count;
this.startOffset = stream.offset;
} else {
// empty
this.stream = new BitStream();
this.length = 0;
this.startOffset = 0;
}
}
/** Adds a message at the end */
push(item: Message) {
writeMessage(item, this.stream, this.bitConfig);
this.length++;
}
/**
* Iterates over all messages in the array.
*
* ⚠️ You can't call this method and push messages at the same time, since we are changing the stream offset.
*/
*[Symbol.iterator]() {
// save and later restore the current stream offset
const originalOffset = this.stream.offset;
this.stream.offset = this.startOffset; // start from the beginning
for (let i = 0; i < this.length; i++) {
yield readMessage(this.stream, this.bitConfig);
}
this.stream.offset = originalOffset;
}
/** @returns the number of bytes occupied by the messages */
get byteLength() {
return this.stream.offset - this.startOffset;
}
}