-
-
Notifications
You must be signed in to change notification settings - Fork 2.6k
/
MessageCache.ts
452 lines (379 loc) · 13.8 KB
/
MessageCache.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
// Copyright 2019 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import cloneDeep from 'lodash/cloneDeep';
import { throttle } from 'lodash';
import LRU from 'lru-cache';
import type { MessageAttributesType } from '../model-types.d';
import type { MessageModel } from '../models/messages';
import * as Errors from '../types/errors';
import * as log from '../logging/log';
import { drop } from '../util/drop';
import { getEnvironment, Environment } from '../environment';
import { getMessageConversation } from '../util/getMessageConversation';
import { getMessageModelLogger } from '../util/MessageModelLogger';
import { getSenderIdentifier } from '../util/getSenderIdentifier';
import { isNotNil } from '../util/isNotNil';
import { map } from '../util/iterables';
import { softAssert, strictAssert } from '../util/assert';
import { isStory } from '../messages/helpers';
import { getStoryDataFromMessageAttributes } from './storyLoader';
const MAX_THROTTLED_REDUX_UPDATERS = 200;
export class MessageCache {
private state = {
messages: new Map<string, MessageAttributesType>(),
messageIdsBySender: new Map<string, string>(),
messageIdsBySentAt: new Map<number, Array<string>>(),
lastAccessedAt: new Map<string, number>(),
};
// Stores the models so that __DEPRECATED$register always returns the existing
// copy instead of a new model.
private modelCache = new Map<string, MessageModel>();
// Synchronously access a message's attributes from internal cache. Will
// return undefined if the message does not exist in memory.
public accessAttributes(
messageId: string
): Readonly<MessageAttributesType> | undefined {
const messageAttributes = this.state.messages.get(messageId);
return messageAttributes
? this.freezeAttributes(messageAttributes)
: undefined;
}
// Synchronously access a message's attributes from internal cache. Throws
// if the message does not exist in memory.
public accessAttributesOrThrow(
source: string,
messageId: string
): Readonly<MessageAttributesType> {
const messageAttributes = this.accessAttributes(messageId);
strictAssert(
messageAttributes,
`MessageCache.accessAttributesOrThrow/${source}: no message for id ${messageId}`
);
return messageAttributes;
}
// Evicts messages from the message cache if they have not been accessed past
// the expiry time.
public deleteExpiredMessages(expiryTime: number): void {
const now = Date.now();
for (const [messageId, messageAttributes] of this.state.messages) {
const timeLastAccessed = this.state.lastAccessedAt.get(messageId) ?? 0;
const conversation = getMessageConversation(messageAttributes);
const state = window.reduxStore.getState();
const selectedId = state?.conversations?.selectedConversationId;
const inActiveConversation =
conversation && selectedId && conversation.id === selectedId;
if (now - timeLastAccessed > expiryTime && !inActiveConversation) {
this.__DEPRECATED$unregister(messageId);
}
}
}
// Finds a message in the cache by sender identifier
public findBySender(
senderIdentifier: string
): Readonly<MessageAttributesType> | undefined {
const id = this.state.messageIdsBySender.get(senderIdentifier);
if (!id) {
return undefined;
}
return this.accessAttributes(id);
}
public replaceAllObsoleteConversationIds({
conversationId,
obsoleteId,
}: {
conversationId: string;
obsoleteId: string;
}): void {
for (const [messageId, messageAttributes] of this.state.messages) {
if (messageAttributes.conversationId !== obsoleteId) {
continue;
}
this.setAttributes({
messageId,
messageAttributes: { conversationId },
skipSaveToDatabase: true,
});
}
}
// Find the message's attributes whether in memory or in the database.
// Refresh the attributes in the cache if they exist. Throw if we cannot find
// a matching message.
public async resolveAttributes(
source: string,
messageId: string
): Promise<Readonly<MessageAttributesType>> {
const inMemoryMessageAttributes = this.accessAttributes(messageId);
if (inMemoryMessageAttributes) {
return inMemoryMessageAttributes;
}
let messageAttributesFromDatabase: MessageAttributesType | undefined;
try {
messageAttributesFromDatabase = await window.Signal.Data.getMessageById(
messageId
);
} catch (err: unknown) {
log.error(
`MessageCache.resolveAttributes(${messageId}): db error ${Errors.toLogFormat(
err
)}`
);
}
strictAssert(
messageAttributesFromDatabase,
`MessageCache.resolveAttributes/${source}: no message for id ${messageId}`
);
return this.freezeAttributes(messageAttributesFromDatabase);
}
// Updates a message's attributes and saves the message to cache and to the
// database. Option to skip the save to the database.
public setAttributes({
messageId,
messageAttributes: partialMessageAttributes,
skipSaveToDatabase = false,
}: {
messageId: string;
messageAttributes: Partial<MessageAttributesType>;
skipSaveToDatabase: boolean;
}): void {
let messageAttributes = this.accessAttributes(messageId);
softAssert(messageAttributes, 'could not find message attributes');
if (!messageAttributes) {
// We expect message attributes to be defined in cache if one is trying to
// set new attributes. In the case that the attributes are missing in cache
// we'll add whatever we currently have to cache as a defensive measure so
// that the code continues to work properly downstream. The softAssert above
// that logs/debugger should be addressed upstream immediately by ensuring
// that message is in cache.
const partiallyCachedMessage = {
id: messageId,
...partialMessageAttributes,
} as MessageAttributesType;
this.addMessageToCache(partiallyCachedMessage);
messageAttributes = partiallyCachedMessage;
}
this.state.messageIdsBySender.delete(
getSenderIdentifier(messageAttributes)
);
const nextMessageAttributes = {
...messageAttributes,
...partialMessageAttributes,
};
const { id, sent_at: sentAt } = nextMessageAttributes;
const previousIdsBySentAt = this.state.messageIdsBySentAt.get(sentAt);
let nextIdsBySentAtSet: Set<string>;
if (previousIdsBySentAt) {
nextIdsBySentAtSet = new Set(previousIdsBySentAt);
nextIdsBySentAtSet.add(id);
} else {
nextIdsBySentAtSet = new Set([id]);
}
this.state.messages.set(id, nextMessageAttributes);
this.state.lastAccessedAt.set(id, Date.now());
this.state.messageIdsBySender.set(
getSenderIdentifier(messageAttributes),
id
);
this.markModelStale(nextMessageAttributes);
this.throttledUpdateRedux(nextMessageAttributes);
if (skipSaveToDatabase) {
return;
}
drop(
window.Signal.Data.saveMessage(messageAttributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
})
);
}
private throttledReduxUpdaters = new LRU<string, typeof this.updateRedux>({
max: MAX_THROTTLED_REDUX_UPDATERS,
});
private throttledUpdateRedux(attributes: MessageAttributesType) {
let updater = this.throttledReduxUpdaters.get(attributes.id);
if (!updater) {
updater = throttle(this.updateRedux.bind(this), 200, {
leading: true,
trailing: true,
});
this.throttledReduxUpdaters.set(attributes.id, updater);
}
updater(attributes);
}
private updateRedux(attributes: MessageAttributesType) {
if (!window.reduxActions) {
return;
}
if (isStory(attributes)) {
const storyData = getStoryDataFromMessageAttributes({
...attributes,
});
if (!storyData) {
return;
}
window.reduxActions.stories.storyChanged(storyData);
// We don't want messageChanged to run
return;
}
window.reduxActions.conversations.messageChanged(
attributes.id,
attributes.conversationId,
attributes
);
}
// When you already have the message attributes from the db and want to
// ensure that they're added to the cache. The latest attributes from cache
// are returned if they exist, if not the attributes passed in are returned.
public toMessageAttributes(
messageAttributes: MessageAttributesType
): Readonly<MessageAttributesType> {
this.addMessageToCache(messageAttributes);
const nextMessageAttributes = this.state.messages.get(messageAttributes.id);
strictAssert(
nextMessageAttributes,
`MessageCache.toMessageAttributes: no message for id ${messageAttributes.id}`
);
if (getEnvironment() === Environment.Development) {
return Object.freeze(cloneDeep(nextMessageAttributes));
}
return nextMessageAttributes;
}
static install(): MessageCache {
const instance = new MessageCache();
window.MessageCache = instance;
return instance;
}
private addMessageToCache(messageAttributes: MessageAttributesType): void {
if (!messageAttributes.id) {
return;
}
if (this.state.messages.has(messageAttributes.id)) {
this.state.lastAccessedAt.set(messageAttributes.id, Date.now());
return;
}
const { id, sent_at: sentAt } = messageAttributes;
const previousIdsBySentAt = this.state.messageIdsBySentAt.get(sentAt);
let nextIdsBySentAtSet: Set<string>;
if (previousIdsBySentAt) {
nextIdsBySentAtSet = new Set(previousIdsBySentAt);
nextIdsBySentAtSet.add(id);
} else {
nextIdsBySentAtSet = new Set([id]);
}
this.state.messages.set(messageAttributes.id, { ...messageAttributes });
this.state.lastAccessedAt.set(messageAttributes.id, Date.now());
this.state.messageIdsBySentAt.set(sentAt, Array.from(nextIdsBySentAtSet));
this.state.messageIdsBySender.set(
getSenderIdentifier(messageAttributes),
id
);
}
private freezeAttributes(
messageAttributes: MessageAttributesType
): Readonly<MessageAttributesType> {
this.addMessageToCache(messageAttributes);
if (getEnvironment() === Environment.Development) {
return Object.freeze(cloneDeep(messageAttributes));
}
return messageAttributes;
}
private removeMessage(messageId: string): void {
const messageAttributes = this.state.messages.get(messageId);
if (!messageAttributes) {
return;
}
const { id, sent_at: sentAt } = messageAttributes;
const nextIdsBySentAtSet =
new Set(this.state.messageIdsBySentAt.get(sentAt)) || new Set();
nextIdsBySentAtSet.delete(id);
if (nextIdsBySentAtSet.size) {
this.state.messageIdsBySentAt.set(sentAt, Array.from(nextIdsBySentAtSet));
} else {
this.state.messageIdsBySentAt.delete(sentAt);
}
this.state.messages.delete(messageId);
this.state.lastAccessedAt.delete(messageId);
this.state.messageIdsBySender.delete(
getSenderIdentifier(messageAttributes)
);
}
// Deprecated methods below
// Adds the message into the cache and eturns a Proxy that resembles
// a MessageModel
public __DEPRECATED$register(
id: string,
data: MessageModel | MessageAttributesType,
location: string
): MessageModel {
if (!id || !data) {
throw new Error(
'MessageCache.__DEPRECATED$register: Got falsey id or message'
);
}
const existing = this.__DEPRECATED$getById(id);
if (existing) {
this.addMessageToCache(existing.attributes);
return existing;
}
const modelProxy = this.toModel(data);
const messageAttributes = 'attributes' in data ? data.attributes : data;
this.addMessageToCache(messageAttributes);
modelProxy.registerLocations.add(location);
return modelProxy;
}
// Deletes the message from our cache
public __DEPRECATED$unregister(id: string): void {
const model = this.modelCache.get(id);
if (!model) {
return;
}
this.removeMessage(id);
this.modelCache.delete(id);
}
// Finds a message in the cache by Id
public __DEPRECATED$getById(id: string): MessageModel | undefined {
const data = this.state.messages.get(id);
if (!data) {
return undefined;
}
return this.toModel(data);
}
// Finds a message in the cache by sentAt/timestamp
public __DEPRECATED$filterBySentAt(sentAt: number): Iterable<MessageModel> {
const items = this.state.messageIdsBySentAt.get(sentAt) ?? [];
const attrs = items.map(id => this.accessAttributes(id)).filter(isNotNil);
return map(attrs, data => this.toModel(data));
}
// Marks cached model as "should be stale" to discourage continued use.
// The model's attributes are directly updated so that the model is in sync
// with the in-memory attributes.
private markModelStale(messageAttributes: MessageAttributesType): void {
const { id } = messageAttributes;
const model = this.modelCache.get(id);
if (!model) {
return;
}
model.attributes = { ...messageAttributes };
if (getEnvironment() === Environment.Development) {
log.warn('MessageCache: stale model', {
cid: model.cid,
locations: Array.from(model.registerLocations).join('+'),
});
}
}
// Creates a proxy object for MessageModel which logs usage in development
// so that we're able to migrate off of models
private toModel(
messageAttributes: MessageAttributesType | MessageModel
): MessageModel {
const existingModel = this.modelCache.get(messageAttributes.id);
if (existingModel) {
return existingModel;
}
const model =
'attributes' in messageAttributes
? messageAttributes
: new window.Whisper.Message(messageAttributes);
const proxy = getMessageModelLogger(model);
this.modelCache.set(messageAttributes.id, proxy);
return proxy;
}
}