Skip to content

Commit

Permalink
When a job fails, respect the Retry-After header if applicable
Browse files Browse the repository at this point in the history
  • Loading branch information
EvanHahn-Signal committed Sep 2, 2021
1 parent c7873dd commit 1f45bce
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 30 deletions.
4 changes: 1 addition & 3 deletions ts/challenge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,7 @@ export class ChallengeHandler {
throw error;
}

const retryAfter = parseRetryAfter(
error.responseHeaders['retry-after'].toString()
);
const retryAfter = parseRetryAfter(error.responseHeaders['retry-after']);

window.log.info(`challenge: retry after ${retryAfter}ms`);
this.options.onChallengeFailed(retryAfter);
Expand Down
9 changes: 2 additions & 7 deletions ts/jobs/helpers/commonShouldJobContinue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,12 @@ import { isDone as isDeviceLinked } from '../../util/registration';
export async function commonShouldJobContinue({
attempt,
log,
maxRetryTime,
timestamp,
timeRemaining,
}: Readonly<{
attempt: number;
log: LoggerType;
maxRetryTime: number;
timestamp: number;
timeRemaining: number;
}>): Promise<boolean> {
const maxJobAge = timestamp + maxRetryTime;
const timeRemaining = maxJobAge - Date.now();

if (timeRemaining <= 0) {
log.info("giving up because it's been too long");
return false;
Expand Down
16 changes: 12 additions & 4 deletions ts/jobs/helpers/handleCommonJobRequestError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@

import type { LoggerType } from '../../logging/log';
import { parseIntWithFallback } from '../../util/parseIntWithFallback';
import { sleepFor413RetryAfterTimeIfApplicable } from './sleepFor413RetryAfterTimeIfApplicable';

export function handleCommonJobRequestError(
err: unknown,
log: LoggerType
): void {
export async function handleCommonJobRequestError({
err,
log,
timeRemaining,
}: Readonly<{
err: unknown;
log: LoggerType;
timeRemaining: number;
}>): Promise<void> {
if (!(err instanceof Error)) {
throw err;
}
Expand All @@ -18,5 +24,7 @@ export function handleCommonJobRequestError(
return;
}

await sleepFor413RetryAfterTimeIfApplicable({ err, log, timeRemaining });

throw err;
}
7 changes: 4 additions & 3 deletions ts/jobs/helpers/readAndViewSyncHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ export async function runReadOrViewSyncJob({
return;
}

const timeRemaining = timestamp + maxRetryTime - Date.now();

const shouldContinue = await commonShouldJobContinue({
attempt,
log,
maxRetryTime,
timestamp,
timeRemaining,
});
if (!shouldContinue) {
return;
Expand All @@ -117,6 +118,6 @@ export async function runReadOrViewSyncJob({
})
);
} catch (err: unknown) {
handleCommonJobRequestError(err, log);
await handleCommonJobRequestError({ err, log, timeRemaining });
}
}
37 changes: 37 additions & 0 deletions ts/jobs/helpers/sleepFor413RetryAfterTimeIfApplicable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only

import type { LoggerType } from '../../logging/log';
import { sleep } from '../../util/sleep';
import { parseRetryAfter } from '../../util/parseRetryAfter';
import { isRecord } from '../../util/isRecord';

export async function sleepFor413RetryAfterTimeIfApplicable({
err,
log,
timeRemaining,
}: Readonly<{
err: unknown;
log: Pick<LoggerType, 'info'>;
timeRemaining: number;
}>): Promise<void> {
if (
timeRemaining <= 0 ||
!(err instanceof Error) ||
err.code !== 413 ||
!isRecord(err.responseHeaders)
) {
return;
}

const retryAfter = Math.min(
parseRetryAfter(err.responseHeaders['retry-after']),
timeRemaining
);

log.info(
`Got a 413 response code. Sleeping for ${retryAfter} millisecond(s)`
);

await sleep(retryAfter);
}
17 changes: 15 additions & 2 deletions ts/jobs/normalMessageSendJobQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import PQueue from 'p-queue';
import type { LoggerType } from '../logging/log';
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
import { commonShouldJobContinue } from './helpers/commonShouldJobContinue';
import { sleepFor413RetryAfterTimeIfApplicable } from './helpers/sleepFor413RetryAfterTimeIfApplicable';
import type { MessageModel } from '../models/messages';
import { getMessageById } from '../messages/getMessageById';
import type { ConversationModel } from '../models/conversations';
Expand Down Expand Up @@ -123,15 +124,15 @@ export class NormalMessageSendJobQueue extends JobQueue<NormalMessageSendJobData
const { messageId, conversationId } = data;

await this.enqueue(conversationId, async () => {
const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now();
const isFinalAttempt = attempt >= MAX_ATTEMPTS;

// We don't immediately use this value because we may want to mark the message
// failed before doing so.
const shouldContinue = await commonShouldJobContinue({
attempt,
log,
maxRetryTime: MAX_RETRY_TIME,
timestamp,
timeRemaining,
});

await window.ConversationController.loadPromise();
Expand Down Expand Up @@ -347,6 +348,18 @@ export class NormalMessageSendJobQueue extends JobQueue<NormalMessageSendJobData
return;
}

if (!isFinalAttempt) {
const maybe413Error: undefined | Error = messageSendErrors.find(
(messageSendError: unknown) =>
messageSendError instanceof Error && messageSendError.code === 413
);
await sleepFor413RetryAfterTimeIfApplicable({
err: maybe413Error,
log,
timeRemaining,
});
}

throw err;
}
});
Expand Down
7 changes: 4 additions & 3 deletions ts/jobs/viewedReceiptsJobQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ export class ViewedReceiptsJobQueue extends JobQueue<ViewedReceiptsJobData> {
}: Readonly<{ data: ViewedReceiptsJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now();

const shouldContinue = await commonShouldJobContinue({
attempt,
log,
maxRetryTime: MAX_RETRY_TIME,
timestamp,
timeRemaining,
});
if (!shouldContinue) {
return;
Expand All @@ -52,7 +53,7 @@ export class ViewedReceiptsJobQueue extends JobQueue<ViewedReceiptsJobData> {
try {
await sendViewedReceipt(data.viewedReceipt);
} catch (err: unknown) {
handleCommonJobRequestError(err, log);
await handleCommonJobRequestError({ err, log, timeRemaining });
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions ts/test-both/util/parseRetryAfter_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@ import { assert } from 'chai';
import { parseRetryAfter } from '../../util/parseRetryAfter';

describe('parseRetryAfter', () => {
it('should return 0 on invalid input', () => {
it('should return 1 second when passed non-strings', () => {
assert.equal(parseRetryAfter(undefined), 1000);
assert.equal(parseRetryAfter(1234), 1000);
});

it('should return 1 second with invalid strings', () => {
assert.equal(parseRetryAfter('nope'), 1000);
assert.equal(parseRetryAfter('1ff'), 1000);
});

it('should return milleseconds on valid input', () => {
it('should return milliseconds on valid input', () => {
assert.equal(parseRetryAfter('100'), 100000);
});

it('should return apply minimum value', () => {
it('should return 1 second at minimum', () => {
assert.equal(parseRetryAfter('0'), 1000);
assert.equal(parseRetryAfter('-1'), 1000);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only

import { assert } from 'chai';
import * as sinon from 'sinon';
import { HTTPError } from '../../../textsecure/Errors';
import * as durations from '../../../util/durations';

import { sleepFor413RetryAfterTimeIfApplicable } from '../../../jobs/helpers/sleepFor413RetryAfterTimeIfApplicable';

describe('sleepFor413RetryAfterTimeIfApplicable', () => {
const createLogger = () => ({ info: sinon.spy() });

let sandbox: sinon.SinonSandbox;
let clock: sinon.SinonFakeTimers;

beforeEach(() => {
sandbox = sinon.createSandbox();
clock = sandbox.useFakeTimers();
});

afterEach(() => {
sandbox.restore();
});

it('does nothing if not passed a 413 HTTP error', async () => {
const log = createLogger();

const errors = [
undefined,
new Error('Normal error'),
new HTTPError('Uh oh', { code: 422, headers: {}, response: {} }),
];
await Promise.all(
errors.map(async err => {
await sleepFor413RetryAfterTimeIfApplicable({
err,
log,
timeRemaining: 1234,
});
})
);

sinon.assert.notCalled(log.info);
});

it('waits for 1 second if receiving a 413 HTTP error without a Retry-After header', async () => {
const err = new HTTPError('Slow down', {
code: 413,
headers: {},
response: {},
});

let done = false;

(async () => {
await sleepFor413RetryAfterTimeIfApplicable({
err,
log: createLogger(),
timeRemaining: 1234,
});
done = true;
})();

await clock.tickAsync(999);
assert.isFalse(done);

await clock.tickAsync(2);
assert.isTrue(done);
});

it('waits for Retry-After seconds if receiving a 413', async () => {
const err = new HTTPError('Slow down', {
code: 413,
headers: { 'retry-after': '200' },
response: {},
});

let done = false;

(async () => {
await sleepFor413RetryAfterTimeIfApplicable({
err,
log: createLogger(),
timeRemaining: 123456789,
});
done = true;
})();

await clock.tickAsync(199 * durations.SECOND);
assert.isFalse(done);

await clock.tickAsync(2 * durations.SECOND);
assert.isTrue(done);
});

it("won't wait longer than the remaining time", async () => {
const err = new HTTPError('Slow down', {
code: 413,
headers: { 'retry-after': '99999' },
response: {},
});

let done = false;

(async () => {
await sleepFor413RetryAfterTimeIfApplicable({
err,
log: createLogger(),
timeRemaining: 3 * durations.SECOND,
});
done = true;
})();

await clock.tickAsync(4 * durations.SECOND);
assert.isTrue(done);
});

it('logs how long it will wait', async () => {
const log = createLogger();
const err = new HTTPError('Slow down', {
code: 413,
headers: { 'retry-after': '123' },
response: {},
});

sleepFor413RetryAfterTimeIfApplicable({ err, log, timeRemaining: 9999999 });
await clock.nextAsync();

sinon.assert.calledOnce(log.info);
sinon.assert.calledWith(log.info, sinon.match(/123000 millisecond\(s\)/));
});
});
9 changes: 6 additions & 3 deletions ts/textsecure/Errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

import { parseRetryAfter } from '../util/parseRetryAfter';

import { CallbackResultType } from './Types.d';
import type { CallbackResultType } from './Types.d';
import type { HeaderListType } from './WebAPI';

function appendStack(newError: Error, originalError: Error) {
// eslint-disable-next-line no-param-reassign
Expand Down Expand Up @@ -129,6 +130,8 @@ export class OutgoingMessageError extends ReplayableError {
export class SendMessageNetworkError extends ReplayableError {
identifier: string;

responseHeaders?: HeaderListType | undefined;

constructor(identifier: string, _m: unknown, httpError: Error) {
super({
name: 'SendMessageNetworkError',
Expand All @@ -137,6 +140,7 @@ export class SendMessageNetworkError extends ReplayableError {

[this.identifier] = identifier.split('.');
this.code = httpError.code;
this.responseHeaders = httpError.responseHeaders;

appendStack(this, httpError);
}
Expand Down Expand Up @@ -166,8 +170,7 @@ export class SendMessageChallengeError extends ReplayableError {

const headers = httpError.responseHeaders || {};

this.retryAfter =
Date.now() + parseRetryAfter((headers['retry-after'] ?? 0).toString());
this.retryAfter = Date.now() + parseRetryAfter(headers['retry-after']);

appendStack(this, httpError);
}
Expand Down
2 changes: 1 addition & 1 deletion ts/textsecure/WebAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ function getContentType(response: Response) {
}

type FetchHeaderListType = { [name: string]: string };
type HeaderListType = { [name: string]: string | ReadonlyArray<string> };
export type HeaderListType = { [name: string]: string | ReadonlyArray<string> };
type HTTPCodeType = 'GET' | 'POST' | 'PUT' | 'DELETE' | 'PATCH';

type RedactUrl = (url: string) => string;
Expand Down

0 comments on commit 1f45bce

Please sign in to comment.