diff --git a/jetstream/jsbaseclient_api.ts b/jetstream/jsbaseclient_api.ts index ef688cb0..3b25327c 100644 --- a/jetstream/jsbaseclient_api.ts +++ b/jetstream/jsbaseclient_api.ts @@ -15,13 +15,15 @@ import { Empty } from "../nats-base-client/encoders.ts"; import { Codec, JSONCodec } from "../nats-base-client/codec.ts"; -import { extend } from "../nats-base-client/util.ts"; +import { backoff, delay, extend } from "../nats-base-client/util.ts"; import { NatsConnectionImpl } from "../nats-base-client/nats.ts"; import { checkJsErrorCode } from "./jsutil.ts"; import { + ErrorCode, JetStreamOptions, Msg, NatsConnection, + NatsError, RequestOptions, } from "../nats-base-client/core.ts"; import { ApiResponse } from "./jsapi_types.ts"; @@ -81,7 +83,7 @@ export class BaseApiClient { async _request( subj: string, data: unknown = null, - opts?: RequestOptions, + opts?: Partial & { retries?: number }, ): Promise { opts = opts || {} as RequestOptions; opts.timeout = this.timeout; @@ -91,12 +93,34 @@ export class BaseApiClient { a = this.jc.encode(data); } - const m = await this.nc.request( - subj, - a, - opts, - ); - return this.parseJsResponse(m); + let { retries } = opts as { + retries: number; + }; + + retries = retries || 1; + retries = retries === -1 ? Number.MAX_SAFE_INTEGER : retries; + const bo = backoff(); + + for (let i = 0; i < retries; i++) { + try { + const m = await this.nc.request( + subj, + a, + opts as RequestOptions, + ); + return this.parseJsResponse(m); + } catch (err) { + const ne = err as NatsError; + if ( + (ne.code === "503" || ne.code === ErrorCode.Timeout) && + i + 1 < retries + ) { + await delay(bo.backoff(i)); + } else { + throw err; + } + } + } } async findStream(subject: string): Promise { diff --git a/jetstream/jsclient.ts b/jetstream/jsclient.ts index 728c7f51..fea4be6a 100644 --- a/jetstream/jsclient.ts +++ b/jetstream/jsclient.ts @@ -792,7 +792,7 @@ export class JetStreamSubscriptionImpl extends TypedSubscription const subj = `${info.api.prefix}.CONSUMER.CREATE.${info.stream}`; - this.js._request(subj, req) + this.js._request(subj, req, { retries: -1 }) .then((v) => { const ci = v as ConsumerInfo; this.info!.config = ci.config;