-
Notifications
You must be signed in to change notification settings - Fork 42
/
ChannelMessages.ts
204 lines (178 loc) · 8.43 KB
/
ChannelMessages.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import { Timestamp } from "@pipeline/Types";
import { PMessage, RawID } from "@pipeline/parse/Types";
import { Message } from "@pipeline/process/Types";
import { DefaultMessageBitConfig } from "@pipeline/serialization/MessageSerialization";
import { MessagesArray } from "@pipeline/serialization/MessagesArray";
/** A group of raw parser messages that belong to the same author, in the same channel, in chronological order */
export type PMessageGroup = PMessage[];
/** Function to process a group of messages into Message's */
export type ProcessGroupFn = (group: PMessageGroup) => Message[];
/**
* This class handles all the messages in a channel, either processed or not.
* Responsible for receiving PMessage objects (parser messages) and generate groups of messages (PMessageGroup)
* to be processed into Message's
* We want to process messages in groups because it makes more accurate some analysis (e.g. language recognition).
* The group of messages generated by this class are sent by the same author, in chronological order.
*
* We have to deal with messages from different files (exports) that may have different starting times
* for the same channel, and even overlapping periods. So this class resolves all that.
* We have the concept of intervals, where each interval (a @see MessagesInterval instance) stores messages from
* a starting time to an ending time.
*
* ❗ This class assumes that messages are added in chronological order, unless they are separated by a `markEOF()` call.
* This way it can deal with out-of-order (between `markEOF` calls) and duplicated messages.
*
* If a message with a timestamp that is contained in an existing interval is added (but not previously already),
* it will be dropped (not that common, e.g. different exports with deleted messages)
*
* After all the processing is done, processed messages can be iterated via `*processedMessages()` in the
* correct chronological order.
*
* Note: in the future, we may want to process PMessage into IMessage's (some kind of intermediate messages) instead of
* just Message, so we can pass intermediate information not needed for the final Message. This will require
* refactoring the MessagesArray class.
*/
export class ChannelMessages {
/** All intervals found until now */
private intervals: MessagesInterval[] = [];
/** Currently open interval. Next messages will be stored here */
private openInterval?: MessagesInterval;
/**
* Adds a PMessage to the pending messages to processed in the current open interval.
* If there is no open interval, a new one will be created with the given message.
*/
addMessage(message: PMessage) {
// first we have to make sure that extending the open interval (or a new one) will not overlap with other intervals
for (const interval of this.intervals) {
if (interval !== this.openInterval && interval.isContained(message.timestamp)) {
// drop duplicate message, already included in another interval
return;
}
}
if (this.openInterval) {
// there is an open interval, we should be able to add this message to it without problems
this.openInterval.addMessageAndExtend(message);
} else {
// create a new interval
this.openInterval = new MessagesInterval(message);
// add and make sure intervals are chronologically sorted
this.intervals.push(this.openInterval);
this.intervals.sort((a, b) => a.startTimestamp - b.startTimestamp);
}
}
/**
* Mark the end of an input file. This allows the next message added (from another file)
* to have a different starting timestamp.
*
* It will close the current open interval, if any. Make sure to call `process` to process leftover messages.
*/
markEOF() {
this.openInterval = undefined;
}
/**
* Process as much pending messages as possible. Not all are processed since the current open interval
* may receive more messages from the same author in the future (remember we are grouping them by author).
*/
process(fn: ProcessGroupFn) {
for (const interval of this.intervals) interval.process(fn, interval !== this.openInterval);
}
/** @returns an iterator over the processed messages */
*processedMessages() {
for (const interval of this.intervals) yield* interval.processedMessages();
}
/** @returns the number of messages in this channel */
get numMessages() {
return this.intervals.reduce((acc, i) => acc + i.numMessages, 0);
}
}
/**
* This class represents a list of messages contained in a time interval.
* Messages stored here can be either pending to be processed or already processed. (PMessage → Message)
* Unprocessed messages (PMessage) are added via `addAndExtend(message)`. Eventually you
* should call `process(fn, isClosed)` to process all pending messages (into Message).
* The processing is not done by this class; instead you are expected to provide a function to process
* groups of messages into Message's.
* These groups of messages are PMessageGroup and are guaranteed to be from the same author, in chronological order.
* The `isClosed` parameter is used to indicate that the interval is closed and no more messages will be added, to
* allow process the leftover.
*
* It also keeps track of the index of each message by its ID.
*/
export class MessagesInterval {
// [start, end]
private start: Timestamp;
private end: Timestamp;
/** Messages pending to be grouped and processed. It should be very few elements here at a time */
private messageQueue: PMessage[] = [];
/** Messages already processed. */
private messages = new MessagesArray(DefaultMessageBitConfig);
/** Original platform IDs */
private ids: RawID[] = [];
constructor(initialMessage: PMessage) {
this.start = initialMessage.timestamp;
this.end = initialMessage.timestamp;
this.addMessageAndExtend(initialMessage);
}
/** Adds the PMessage to the queue and extends the end of the interval to contain it */
addMessageAndExtend(message: PMessage) {
if (message.timestamp < this.end) throw new Error("MessagesInterval can only be extended to the future");
// NOTE: we should check here that the id is not in this.ids, but it's too expensive
this.messageQueue.push(message);
this.ids.push(message.id);
this.end = message.timestamp;
}
/**
* Process all the messages in the queue.
* It will group (by author) them in insertion order and call `fn` for each group,
* storing the resulting Message as processed messages.
*
* @param isClosed if true, the interval is considered closed and it won't wait for new messages, thus processing the leftover
*/
process(fn: ProcessGroupFn, isClosed: boolean) {
if (this.messageQueue.length === 0) return;
const len = this.messageQueue.length;
let currentAuthor: RawID = this.messageQueue[0].authorId;
// [ M M M M M M M M ... ]
// ↑ l ↑ r (a group) [l, r)
let l = 0,
r = 1;
while (r < len) {
const author = this.messageQueue[r].authorId;
if (author !== currentAuthor) {
// process group
const group = this.messageQueue.slice(l, r);
for (const m of fn(group)) this.messages.push(m);
currentAuthor = author;
l = r;
}
r++;
}
if (isClosed) {
// no new messages are expected
// process last group
const group = this.messageQueue.slice(l, len);
for (const m of fn(group)) this.messages.push(m);
this.messageQueue = [];
} else {
// wait for more messages
this.messageQueue = this.messageQueue.slice(l, len);
}
}
/** @returns an iterator over the processed messages */
*processedMessages() {
let i = 0;
for (const msg of this.messages) {
yield { id: this.ids[i++], msg };
}
}
isContained(ts: Timestamp): boolean {
return this.start <= ts && ts <= this.end;
}
get startTimestamp() {
return this.start;
}
/** @returns the total number of messages in the interval */
get numMessages() {
return this.messageQueue.length + this.messages.length;
}
}