From 653ebefa1f9fc56114cf3b2b0d9b7cd4a1dc919a Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 11 May 2026 06:30:00 +0100 Subject: [PATCH 1/2] feat(redis): add reconnectOnError for READONLY / LOADING reply errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During an ElastiCache role swap or node-type change, the TCP/TLS connection stays open but the server starts answering with READONLY (the client is talking to a node that became a replica) or LOADING (node still loading data from disk). Without an explicit hook, these errors surface to caller code as ReplyError instances — every write op on the affected connection fails until the cluster cuts over. Returning 2 from reconnectOnError tells ioredis to tear down the connection, reconnect, and re-issue the failed command. After reconnect, DNS / SG state routes the new socket to a writable node. The shared createRedisClient helper picks this up automatically. defaultReconnectOnError is exported for direct ioredis call sites that bypass createRedisClient. --- .../redis-reconnect-on-readonly-loading.md | 6 ++++++ internal-packages/redis/src/index.ts | 20 +++++++++++++++++++ 2 files changed, 26 insertions(+) create mode 100644 .server-changes/redis-reconnect-on-readonly-loading.md diff --git a/.server-changes/redis-reconnect-on-readonly-loading.md b/.server-changes/redis-reconnect-on-readonly-loading.md new file mode 100644 index 00000000000..3173d99860b --- /dev/null +++ b/.server-changes/redis-reconnect-on-readonly-loading.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Add `reconnectOnError` to the shared ioredis client config so READONLY / LOADING reply errors during ElastiCache node-type changes trigger a disconnect-reconnect-retry cycle instead of surfacing to caller code. diff --git a/internal-packages/redis/src/index.ts b/internal-packages/redis/src/index.ts index bdd69315cff..0b02c14a3a2 100644 --- a/internal-packages/redis/src/index.ts +++ b/internal-packages/redis/src/index.ts @@ -3,12 +3,32 @@ import { Logger } from "@trigger.dev/core/logger"; export { Redis, type Callback, type RedisOptions, type Result, type RedisCommander } from "ioredis"; +/** + * Reply-error -> reconnect mapping. Without this hook, an ElastiCache + * vertical scale-up surfaces tens of thousands of READONLY / LOADING + * reply errors to caller code over a healthy TCP/TLS connection (the + * client keeps talking to a node whose role swapped underneath it). + * + * Returning 2 tells ioredis to disconnect, reconnect, and retry the + * command that triggered the error. After reconnect, DNS / SG routing + * should land on a writable primary. + * + * Empirical confirmation on the harness in TRI-8878: this option + * reduced a scale-up event from ~437,000 caller-surfaced errors to 2. + */ +export function defaultReconnectOnError(err: Error): boolean | 1 | 2 { + const msg = err.message ?? ""; + if (msg.startsWith("READONLY") || msg.startsWith("LOADING")) return 2; + return false; +} + const defaultOptions: Partial = { retryStrategy: (times: number) => { const delay = Math.min(times * 50, 1000); return delay; }, maxRetriesPerRequest: process.env.GITHUB_ACTIONS ? 50 : process.env.VITEST ? 5 : 20, + reconnectOnError: defaultReconnectOnError, }; const logger = new Logger("Redis", "debug"); From 4df210cd97186102ac2c52b2f1e32798cfb5b10d Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 11 May 2026 06:37:17 +0100 Subject: [PATCH 2/2] feat(webapp): use defaultReconnectOnError at direct ioredis call sites The webapp constructs ioredis clients directly in several prod files that bypass the shared createRedisClient helper. Each of these needs the same reconnectOnError hook so an ElastiCache role swap or node-type change on the realtime / cache / socket-io / cluster Redis instances doesn't surface READONLY / LOADING reply errors to caller code. V1-only marqs files are intentionally not migrated. --- apps/webapp/app/presenters/v3/DevPresence.server.ts | 3 ++- apps/webapp/app/redis.server.ts | 3 +++ apps/webapp/app/services/autoIncrementCounter.server.ts | 3 ++- .../app/services/inputStreamWaitpointCache.server.ts | 2 ++ .../app/services/platformNotificationCounter.server.ts | 2 ++ .../app/services/realtime/redisRealtimeStreams.server.ts | 8 +++++++- .../app/services/sessionStreamWaitpointCache.server.ts | 2 ++ apps/webapp/app/services/taskIdentifierCache.server.ts | 2 ++ apps/webapp/app/v3/handleSocketIo.server.ts | 2 ++ 9 files changed, 24 insertions(+), 3 deletions(-) diff --git a/apps/webapp/app/presenters/v3/DevPresence.server.ts b/apps/webapp/app/presenters/v3/DevPresence.server.ts index fa606cf9f1b..d751b6d7114 100644 --- a/apps/webapp/app/presenters/v3/DevPresence.server.ts +++ b/apps/webapp/app/presenters/v3/DevPresence.server.ts @@ -1,4 +1,5 @@ import Redis, { type RedisOptions } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import { env } from "~/env.server"; const PRESENCE_KEY_PREFIX = "dev-presence:connection:"; @@ -7,7 +8,7 @@ export class DevPresence { private redis: Redis; constructor(options: RedisOptions) { - this.redis = new Redis(options); + this.redis = new Redis({ reconnectOnError: defaultReconnectOnError, ...options }); } async isConnected(environmentId: string) { diff --git a/apps/webapp/app/redis.server.ts b/apps/webapp/app/redis.server.ts index 55d490821e3..01efa0d3e68 100644 --- a/apps/webapp/app/redis.server.ts +++ b/apps/webapp/app/redis.server.ts @@ -1,4 +1,5 @@ import { Cluster, Redis, type ClusterNode, type ClusterOptions } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import { logger } from "./services/logger.server"; export type RedisWithClusterOptions = { @@ -42,6 +43,7 @@ export function createRedisClient( username: options.username, password: options.password, enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, ...(options.tlsDisabled ? { checkServerIdentity: () => { @@ -69,6 +71,7 @@ export function createRedisClient( password: options.password, enableAutoPipelining: true, keyPrefix: options.keyPrefix, + reconnectOnError: defaultReconnectOnError, ...(options.tlsDisabled ? {} : { tls: {} }), }); } diff --git a/apps/webapp/app/services/autoIncrementCounter.server.ts b/apps/webapp/app/services/autoIncrementCounter.server.ts index 205e6ef9c1c..b11a3f4e11d 100644 --- a/apps/webapp/app/services/autoIncrementCounter.server.ts +++ b/apps/webapp/app/services/autoIncrementCounter.server.ts @@ -1,4 +1,5 @@ import Redis, { RedisOptions } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import { Prisma, PrismaClientOrTransaction, PrismaTransactionOptions, prisma } from "~/db.server"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; @@ -11,7 +12,7 @@ export class AutoIncrementCounter { private _redis: Redis; constructor(private options: AutoIncrementCounterOptions) { - this._redis = new Redis(options.redis); + this._redis = new Redis({ reconnectOnError: defaultReconnectOnError, ...options.redis }); } async incrementInTransaction( diff --git a/apps/webapp/app/services/inputStreamWaitpointCache.server.ts b/apps/webapp/app/services/inputStreamWaitpointCache.server.ts index ab360c837a8..5bfd632633a 100644 --- a/apps/webapp/app/services/inputStreamWaitpointCache.server.ts +++ b/apps/webapp/app/services/inputStreamWaitpointCache.server.ts @@ -1,4 +1,5 @@ import { Redis } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; import { logger } from "./logger.server"; @@ -24,6 +25,7 @@ function initializeRedis(): Redis | undefined { password: env.CACHE_REDIS_PASSWORD, keyPrefix: "tr:", enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, ...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }); } diff --git a/apps/webapp/app/services/platformNotificationCounter.server.ts b/apps/webapp/app/services/platformNotificationCounter.server.ts index dc2970045da..3bd4fa49afb 100644 --- a/apps/webapp/app/services/platformNotificationCounter.server.ts +++ b/apps/webapp/app/services/platformNotificationCounter.server.ts @@ -1,4 +1,5 @@ import { Redis } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; import { logger } from "./logger.server"; @@ -18,6 +19,7 @@ function initializeRedis(): Redis | undefined { password: env.CACHE_REDIS_PASSWORD, keyPrefix: "tr:", enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, ...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }); } diff --git a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts index 99ad10c8ee4..b5a3c896701 100644 --- a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts @@ -1,5 +1,6 @@ import { Logger, LogLevel } from "@trigger.dev/core/logger"; import Redis, { RedisOptions } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import { env } from "~/env.server"; import { StreamIngestor, StreamResponder, StreamResponseOptions } from "./types"; @@ -35,6 +36,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { private get sharedRedis(): Redis { if (!this._sharedRedis) { this._sharedRedis = new Redis({ + reconnectOnError: defaultReconnectOnError, ...this.options.redis, connectionName: "realtime:shared", }); @@ -56,7 +58,11 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { signal: AbortSignal, options?: StreamResponseOptions ): Promise { - const redis = new Redis({ ...this.options.redis, connectionName: "realtime:streamResponse" }); + const redis = new Redis({ + reconnectOnError: defaultReconnectOnError, + ...this.options.redis, + connectionName: "realtime:streamResponse", + }); const streamKey = `stream:${runId}:${streamId}`; let isCleanedUp = false; diff --git a/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts b/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts index 050ebddeac3..93f4b397481 100644 --- a/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts +++ b/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts @@ -1,4 +1,5 @@ import { Redis } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; import { logger } from "./logger.server"; @@ -28,6 +29,7 @@ function initializeRedis(): Redis | undefined { password: env.CACHE_REDIS_PASSWORD, keyPrefix: "tr:", enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, ...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }); } diff --git a/apps/webapp/app/services/taskIdentifierCache.server.ts b/apps/webapp/app/services/taskIdentifierCache.server.ts index 9d243b5f740..04929c583cc 100644 --- a/apps/webapp/app/services/taskIdentifierCache.server.ts +++ b/apps/webapp/app/services/taskIdentifierCache.server.ts @@ -1,4 +1,5 @@ import { Redis } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import type { TaskTriggerSource } from "@trigger.dev/database"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; @@ -53,6 +54,7 @@ function initializeRedis(): Redis | undefined { password: env.CACHE_REDIS_PASSWORD, keyPrefix: "tr:", enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, ...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }); } diff --git a/apps/webapp/app/v3/handleSocketIo.server.ts b/apps/webapp/app/v3/handleSocketIo.server.ts index 7a2c0c3e366..aec9ded6f49 100644 --- a/apps/webapp/app/v3/handleSocketIo.server.ts +++ b/apps/webapp/app/v3/handleSocketIo.server.ts @@ -15,6 +15,7 @@ import type { WorkerServerToClientEvents, } from "@trigger.dev/core/v3/workers"; import { ZodNamespace } from "@trigger.dev/core/v3/zodNamespace"; +import { defaultReconnectOnError } from "@internal/redis"; import { Redis } from "ioredis"; import { Namespace, Server, Socket } from "socket.io"; import { env } from "~/env.server"; @@ -93,6 +94,7 @@ function initializeSocketIOServerInstance() { username: env.REDIS_USERNAME, password: env.REDIS_PASSWORD, enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }); const subClient = pubClient.duplicate();