Skip to content

Commit

Permalink
Wrap eventHandlerQueue jobs with task with timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
indutny-signal committed Nov 9, 2022
1 parent 5cee260 commit d7a2669
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 116 deletions.
97 changes: 49 additions & 48 deletions ts/background.ts
Expand Up @@ -291,23 +291,25 @@ export async function startApp(): Promise<void> {
serverTrustRoot: window.getServerTrustRoot(),
});

function queuedEventListener<Args extends Array<unknown>>(
handler: (...args: Args) => Promise<void> | void,
function queuedEventListener<E extends Event>(
handler: (event: E) => Promise<void> | void,
track = true
): (...args: Args) => void {
return (...args: Args): void => {
eventHandlerQueue.add(async () => {
try {
await handler(...args);
} finally {
// message/sent: Message.handleDataMessage has its own queue and will
// trigger this event itself when complete.
// error: Error processing (below) also has its own queue and self-trigger.
if (track) {
window.Whisper.events.trigger('incrementProgress');
): (event: E) => void {
return (event: E): void => {
eventHandlerQueue.add(
createTaskWithTimeout(async () => {
try {
await handler(event);
} finally {
// message/sent: Message.handleDataMessage has its own queue and will
// trigger this event itself when complete.
// error: Error processing (below) also has its own queue and self-trigger.
if (track) {
window.Whisper.events.trigger('incrementProgress');
}
}
}
});
}, `queuedEventListener(${event.type}, ${event.timeStamp})`)
);
};
}

Expand Down Expand Up @@ -361,13 +363,13 @@ export async function startApp(): Promise<void> {
);
messageReceiver.addEventListener(
'decryption-error',
queuedEventListener((event: DecryptionErrorEvent) => {
queuedEventListener((event: DecryptionErrorEvent): void => {
onDecryptionErrorQueue.add(() => onDecryptionError(event));
})
);
messageReceiver.addEventListener(
'retry-request',
queuedEventListener((event: RetryRequestEvent) => {
queuedEventListener((event: RetryRequestEvent): void => {
onRetryRequestQueue.add(() => onRetryRequest(event));
})
);
Expand Down Expand Up @@ -437,7 +439,6 @@ export async function startApp(): Promise<void> {

const eventHandlerQueue = new PQueue({
concurrency: 1,
timeout: durations.MINUTE * 30,
});

// Note: this queue is meant to allow for stop/start of tasks, not limit parallelism.
Expand Down Expand Up @@ -2449,7 +2450,7 @@ export async function startApp(): Promise<void> {

window.waitForEmptyEventQueue = waitForEmptyEventQueue;

async function onEmpty() {
async function onEmpty(): Promise<void> {
const { storage } = window.textsecure;

await Promise.all([
Expand Down Expand Up @@ -2586,7 +2587,7 @@ export async function startApp(): Promise<void> {
connect();
}

function onConfiguration(ev: ConfigurationEvent) {
function onConfiguration(ev: ConfigurationEvent): void {
ev.confirm();

const { configuration } = ev;
Expand Down Expand Up @@ -2618,7 +2619,7 @@ export async function startApp(): Promise<void> {
}
}

function onTyping(ev: TypingEvent) {
function onTyping(ev: TypingEvent): void {
// Note: this type of message is automatically removed from cache in MessageReceiver

const { typing, sender, senderUuid, senderDevice } = ev;
Expand Down Expand Up @@ -2707,7 +2708,7 @@ export async function startApp(): Promise<void> {
});
}

async function onStickerPack(ev: StickerPackEvent) {
function onStickerPack(ev: StickerPackEvent): void {
ev.confirm();

const packs = ev.stickerPacks;
Expand Down Expand Up @@ -2741,13 +2742,13 @@ export async function startApp(): Promise<void> {
});
}

async function onGroupSyncComplete() {
async function onGroupSyncComplete(): Promise<void> {
log.info('onGroupSyncComplete');
await window.storage.put('synced_at', Date.now());
}

// Note: this handler is only for v1 groups received via 'group sync' messages
async function onGroupReceived(ev: GroupEvent) {
async function onGroupReceived(ev: GroupEvent): Promise<void> {
const details = ev.groupDetails;
const { id } = details;

Expand Down Expand Up @@ -2868,7 +2869,7 @@ export async function startApp(): Promise<void> {
maxSize: Infinity,
});

function onEnvelopeReceived({ envelope }: EnvelopeEvent) {
function onEnvelopeReceived({ envelope }: EnvelopeEvent): void {
const ourUuid = window.textsecure.storage.user.getUuid()?.toString();
if (envelope.sourceUuid && envelope.sourceUuid !== ourUuid) {
window.ConversationController.maybeMergeContacts({
Expand All @@ -2882,7 +2883,7 @@ export async function startApp(): Promise<void> {
// Note: We do very little in this function, since everything in handleDataMessage is
// inside a conversation-specific queue(). Any code here might run before an earlier
// message is processed in handleDataMessage().
function onMessageReceived(event: MessageEvent) {
async function onMessageReceived(event: MessageEvent): Promise<void> {
const { data, confirm } = event;

const messageDescriptor = getMessageDescriptor({
Expand Down Expand Up @@ -2947,7 +2948,7 @@ export async function startApp(): Promise<void> {
if (!isValidReactionEmoji(reaction.emoji)) {
log.warn('Received an invalid reaction emoji. Dropping it');
confirm();
return Promise.resolve();
return;
}

strictAssert(
Expand Down Expand Up @@ -2975,7 +2976,7 @@ export async function startApp(): Promise<void> {
// Note: We do not wait for completion here
Reactions.getSingleton().onReaction(reactionModel, message);
confirm();
return Promise.resolve();
return;
}

if (data.message.delete) {
Expand Down Expand Up @@ -3004,21 +3005,22 @@ export async function startApp(): Promise<void> {
Deletes.getSingleton().onDelete(deleteModel);

confirm();
return Promise.resolve();
return;
}

if (handleGroupCallUpdateMessage(data.message, messageDescriptor)) {
confirm();
return Promise.resolve();
return;
}

// Don't wait for handleDataMessage, as it has its own per-conversation queueing
message.handleDataMessage(data.message, event.confirm);

return Promise.resolve();
}

async function onProfileKeyUpdate({ data, confirm }: ProfileKeyUpdateEvent) {
async function onProfileKeyUpdate({
data,
confirm,
}: ProfileKeyUpdateEvent): Promise<void> {
const conversation = window.ConversationController.maybeMergeContacts({
aci: data.sourceUuid,
e164: data.source,
Expand Down Expand Up @@ -3279,7 +3281,7 @@ export async function startApp(): Promise<void> {
// Note: We do very little in this function, since everything in handleDataMessage is
// inside a conversation-specific queue(). Any code here might run before an earlier
// message is processed in handleDataMessage().
function onSentMessage(event: SentEvent) {
async function onSentMessage(event: SentEvent): Promise<void> {
const { data, confirm } = event;

const source = window.textsecure.storage.user.getNumber();
Expand Down Expand Up @@ -3327,7 +3329,7 @@ export async function startApp(): Promise<void> {
if (!isValidReactionEmoji(reaction.emoji)) {
log.warn('Received an invalid reaction emoji. Dropping it');
event.confirm();
return Promise.resolve();
return;
}

log.info('Queuing sent reaction for', reaction.targetTimestamp);
Expand All @@ -3345,7 +3347,7 @@ export async function startApp(): Promise<void> {
Reactions.getSingleton().onReaction(reactionModel, message);

event.confirm();
return Promise.resolve();
return;
}

if (data.message.delete) {
Expand All @@ -3367,20 +3369,18 @@ export async function startApp(): Promise<void> {
// Note: We do not wait for completion here
Deletes.getSingleton().onDelete(deleteModel);
confirm();
return Promise.resolve();
return;
}

if (handleGroupCallUpdateMessage(data.message, messageDescriptor)) {
event.confirm();
return Promise.resolve();
return;
}

// Don't wait for handleDataMessage, as it has its own per-conversation queueing
message.handleDataMessage(data.message, event.confirm, {
data,
});

return Promise.resolve();
}

type MessageDescriptor = {
Expand Down Expand Up @@ -3525,7 +3525,7 @@ export async function startApp(): Promise<void> {
}
}

function onError(ev: ErrorEvent) {
function onError(ev: ErrorEvent): void {
const { error } = ev;
log.error('background onError:', Errors.toLogFormat(error));

Expand All @@ -3540,7 +3540,7 @@ export async function startApp(): Promise<void> {
log.warn('background onError: Doing nothing with incoming error');
}

async function onViewOnceOpenSync(ev: ViewOnceOpenSyncEvent) {
function onViewOnceOpenSync(ev: ViewOnceOpenSyncEvent): void {
ev.confirm();

const { source, sourceUuid, timestamp } = ev;
Expand All @@ -3558,7 +3558,7 @@ export async function startApp(): Promise<void> {
ViewOnceOpenSyncs.getSingleton().onSync(sync);
}

async function onFetchLatestSync(ev: FetchLatestEvent) {
async function onFetchLatestSync(ev: FetchLatestEvent): Promise<void> {
ev.confirm();

const { eventType } = ev;
Expand All @@ -3567,6 +3567,7 @@ export async function startApp(): Promise<void> {

switch (eventType) {
case FETCH_LATEST_ENUM.LOCAL_PROFILE: {
log.info('onFetchLatestSync: fetching latest local profile');
const ourUuid = window.textsecure.storage.user.getUuid()?.toString();
const ourE164 = window.textsecure.storage.user.getNumber();
await Promise.all([
Expand Down Expand Up @@ -3620,7 +3621,7 @@ export async function startApp(): Promise<void> {
}
}

async function onMessageRequestResponse(ev: MessageRequestResponseEvent) {
function onMessageRequestResponse(ev: MessageRequestResponseEvent): void {
ev.confirm();

const {
Expand Down Expand Up @@ -3656,7 +3657,7 @@ export async function startApp(): Promise<void> {
MessageRequests.getSingleton().onResponse(sync);
}

function onReadReceipt(event: Readonly<ReadEvent>) {
function onReadReceipt(event: Readonly<ReadEvent>): void {
onReadOrViewReceipt({
logTitle: 'read receipt',
event,
Expand Down Expand Up @@ -3731,7 +3732,7 @@ export async function startApp(): Promise<void> {
MessageReceipts.getSingleton().onReceipt(receipt);
}

function onReadSync(ev: ReadSyncEvent) {
function onReadSync(ev: ReadSyncEvent): Promise<void> {
const { envelopeTimestamp, sender, senderUuid, timestamp } = ev.read;
const readAt = envelopeTimestamp;
const senderConversation = window.ConversationController.lookupOrCreate({
Expand Down Expand Up @@ -3770,7 +3771,7 @@ export async function startApp(): Promise<void> {
return ReadSyncs.getSingleton().onSync(receipt);
}

function onViewSync(ev: ViewSyncEvent) {
function onViewSync(ev: ViewSyncEvent): Promise<void> {
const { envelopeTimestamp, senderE164, senderUuid, timestamp } = ev.view;
const senderConversation = window.ConversationController.lookupOrCreate({
e164: senderE164,
Expand Down Expand Up @@ -3808,7 +3809,7 @@ export async function startApp(): Promise<void> {
return ViewSyncs.getSingleton().onSync(receipt);
}

function onDeliveryReceipt(ev: DeliveryEvent) {
function onDeliveryReceipt(ev: DeliveryEvent): void {
const { deliveryReceipt } = ev;
const {
envelopeTimestamp,
Expand Down
11 changes: 5 additions & 6 deletions ts/models/conversations.ts
Expand Up @@ -3498,10 +3498,9 @@ export class ConversationModel extends window.Backbone
): Promise<T> {
this.jobQueue = this.jobQueue || new PQueue({ concurrency: 1 });

const taskWithTimeout = createTaskWithTimeout(
callback,
`conversation ${this.idForLogging()}`
);
const logId = `conversation.queueJob(${this.idForLogging()}, ${name})`;

const taskWithTimeout = createTaskWithTimeout(callback, logId);

const abortController = new AbortController();
const { signal: abortSignal } = abortController;
Expand All @@ -3512,7 +3511,7 @@ export class ConversationModel extends window.Backbone
const waitTime = startedAt - queuedAt;

if (waitTime > JOB_REPORTING_THRESHOLD_MS) {
log.info(`Conversation job ${name} was blocked for ${waitTime}ms`);
log.info(`${logId}: was blocked for ${waitTime}ms`);
}

try {
Expand All @@ -3524,7 +3523,7 @@ export class ConversationModel extends window.Backbone
const duration = Date.now() - startedAt;

if (duration > JOB_REPORTING_THRESHOLD_MS) {
log.info(`Conversation job ${name} took ${duration}ms`);
log.info(`${logId}: took ${duration}ms`);
}
}
});
Expand Down
2 changes: 1 addition & 1 deletion ts/models/messages.ts
Expand Up @@ -2203,7 +2203,7 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const conversation = window.ConversationController.get(conversationId)!;
const idLog = `handleDataMessage/${conversation.idForLogging()} ${message.idForLogging()}`;
await conversation.queueJob('handleDataMessage', async () => {
await conversation.queueJob(idLog, async () => {
log.info(`${idLog}: starting processing in queue`);

// First, check for duplicates. If we find one, stop processing here.
Expand Down
2 changes: 2 additions & 0 deletions ts/services/contactSync.ts
Expand Up @@ -187,6 +187,8 @@ async function doContactSync({

await window.storage.put('synced_at', Date.now());
window.Whisper.events.trigger('contactSync:complete');

log.info(`${logId}: done`);
}

export async function onContactSync(ev: ContactSyncEvent): Promise<void> {
Expand Down

0 comments on commit d7a2669

Please sign in to comment.