diff --git a/.server-changes/redis-reconnect-on-readonly-loading.md b/.server-changes/redis-reconnect-on-readonly-loading.md new file mode 100644 index 00000000000..3173d99860b --- /dev/null +++ b/.server-changes/redis-reconnect-on-readonly-loading.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Add `reconnectOnError` to the shared ioredis client config so READONLY / LOADING reply errors during ElastiCache node-type changes trigger a disconnect-reconnect-retry cycle instead of surfacing to caller code. diff --git a/apps/webapp/app/presenters/v3/DevPresence.server.ts b/apps/webapp/app/presenters/v3/DevPresence.server.ts index fa606cf9f1b..d751b6d7114 100644 --- a/apps/webapp/app/presenters/v3/DevPresence.server.ts +++ b/apps/webapp/app/presenters/v3/DevPresence.server.ts @@ -1,4 +1,5 @@ import Redis, { type RedisOptions } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import { env } from "~/env.server"; const PRESENCE_KEY_PREFIX = "dev-presence:connection:"; @@ -7,7 +8,7 @@ export class DevPresence { private redis: Redis; constructor(options: RedisOptions) { - this.redis = new Redis(options); + this.redis = new Redis({ reconnectOnError: defaultReconnectOnError, ...options }); } async isConnected(environmentId: string) { diff --git a/apps/webapp/app/redis.server.ts b/apps/webapp/app/redis.server.ts index 55d490821e3..01efa0d3e68 100644 --- a/apps/webapp/app/redis.server.ts +++ b/apps/webapp/app/redis.server.ts @@ -1,4 +1,5 @@ import { Cluster, Redis, type ClusterNode, type ClusterOptions } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import { logger } from "./services/logger.server"; export type RedisWithClusterOptions = { @@ -42,6 +43,7 @@ export function createRedisClient( username: options.username, password: options.password, enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, ...(options.tlsDisabled ? { checkServerIdentity: () => { @@ -69,6 +71,7 @@ export function createRedisClient( password: options.password, enableAutoPipelining: true, keyPrefix: options.keyPrefix, + reconnectOnError: defaultReconnectOnError, ...(options.tlsDisabled ? {} : { tls: {} }), }); } diff --git a/apps/webapp/app/services/autoIncrementCounter.server.ts b/apps/webapp/app/services/autoIncrementCounter.server.ts index 205e6ef9c1c..b11a3f4e11d 100644 --- a/apps/webapp/app/services/autoIncrementCounter.server.ts +++ b/apps/webapp/app/services/autoIncrementCounter.server.ts @@ -1,4 +1,5 @@ import Redis, { RedisOptions } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import { Prisma, PrismaClientOrTransaction, PrismaTransactionOptions, prisma } from "~/db.server"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; @@ -11,7 +12,7 @@ export class AutoIncrementCounter { private _redis: Redis; constructor(private options: AutoIncrementCounterOptions) { - this._redis = new Redis(options.redis); + this._redis = new Redis({ reconnectOnError: defaultReconnectOnError, ...options.redis }); } async incrementInTransaction( diff --git a/apps/webapp/app/services/inputStreamWaitpointCache.server.ts b/apps/webapp/app/services/inputStreamWaitpointCache.server.ts index ab360c837a8..5bfd632633a 100644 --- a/apps/webapp/app/services/inputStreamWaitpointCache.server.ts +++ b/apps/webapp/app/services/inputStreamWaitpointCache.server.ts @@ -1,4 +1,5 @@ import { Redis } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; import { logger } from "./logger.server"; @@ -24,6 +25,7 @@ function initializeRedis(): Redis | undefined { password: env.CACHE_REDIS_PASSWORD, keyPrefix: "tr:", enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, ...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }); } diff --git a/apps/webapp/app/services/platformNotificationCounter.server.ts b/apps/webapp/app/services/platformNotificationCounter.server.ts index dc2970045da..3bd4fa49afb 100644 --- a/apps/webapp/app/services/platformNotificationCounter.server.ts +++ b/apps/webapp/app/services/platformNotificationCounter.server.ts @@ -1,4 +1,5 @@ import { Redis } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; import { logger } from "./logger.server"; @@ -18,6 +19,7 @@ function initializeRedis(): Redis | undefined { password: env.CACHE_REDIS_PASSWORD, keyPrefix: "tr:", enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, ...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }); } diff --git a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts index 99ad10c8ee4..b5a3c896701 100644 --- a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts @@ -1,5 +1,6 @@ import { Logger, LogLevel } from "@trigger.dev/core/logger"; import Redis, { RedisOptions } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import { env } from "~/env.server"; import { StreamIngestor, StreamResponder, StreamResponseOptions } from "./types"; @@ -35,6 +36,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { private get sharedRedis(): Redis { if (!this._sharedRedis) { this._sharedRedis = new Redis({ + reconnectOnError: defaultReconnectOnError, ...this.options.redis, connectionName: "realtime:shared", }); @@ -56,7 +58,11 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { signal: AbortSignal, options?: StreamResponseOptions ): Promise { - const redis = new Redis({ ...this.options.redis, connectionName: "realtime:streamResponse" }); + const redis = new Redis({ + reconnectOnError: defaultReconnectOnError, + ...this.options.redis, + connectionName: "realtime:streamResponse", + }); const streamKey = `stream:${runId}:${streamId}`; let isCleanedUp = false; diff --git a/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts b/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts index 050ebddeac3..93f4b397481 100644 --- a/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts +++ b/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts @@ -1,4 +1,5 @@ import { Redis } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; import { logger } from "./logger.server"; @@ -28,6 +29,7 @@ function initializeRedis(): Redis | undefined { password: env.CACHE_REDIS_PASSWORD, keyPrefix: "tr:", enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, ...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }); } diff --git a/apps/webapp/app/services/taskIdentifierCache.server.ts b/apps/webapp/app/services/taskIdentifierCache.server.ts index 9d243b5f740..04929c583cc 100644 --- a/apps/webapp/app/services/taskIdentifierCache.server.ts +++ b/apps/webapp/app/services/taskIdentifierCache.server.ts @@ -1,4 +1,5 @@ import { Redis } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; import type { TaskTriggerSource } from "@trigger.dev/database"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; @@ -53,6 +54,7 @@ function initializeRedis(): Redis | undefined { password: env.CACHE_REDIS_PASSWORD, keyPrefix: "tr:", enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, ...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }); } diff --git a/apps/webapp/app/v3/handleSocketIo.server.ts b/apps/webapp/app/v3/handleSocketIo.server.ts index 7a2c0c3e366..aec9ded6f49 100644 --- a/apps/webapp/app/v3/handleSocketIo.server.ts +++ b/apps/webapp/app/v3/handleSocketIo.server.ts @@ -15,6 +15,7 @@ import type { WorkerServerToClientEvents, } from "@trigger.dev/core/v3/workers"; import { ZodNamespace } from "@trigger.dev/core/v3/zodNamespace"; +import { defaultReconnectOnError } from "@internal/redis"; import { Redis } from "ioredis"; import { Namespace, Server, Socket } from "socket.io"; import { env } from "~/env.server"; @@ -93,6 +94,7 @@ function initializeSocketIOServerInstance() { username: env.REDIS_USERNAME, password: env.REDIS_PASSWORD, enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }); const subClient = pubClient.duplicate(); diff --git a/internal-packages/redis/src/index.ts b/internal-packages/redis/src/index.ts index bdd69315cff..0b02c14a3a2 100644 --- a/internal-packages/redis/src/index.ts +++ b/internal-packages/redis/src/index.ts @@ -3,12 +3,32 @@ import { Logger } from "@trigger.dev/core/logger"; export { Redis, type Callback, type RedisOptions, type Result, type RedisCommander } from "ioredis"; +/** + * Reply-error -> reconnect mapping. Without this hook, an ElastiCache + * vertical scale-up surfaces tens of thousands of READONLY / LOADING + * reply errors to caller code over a healthy TCP/TLS connection (the + * client keeps talking to a node whose role swapped underneath it). + * + * Returning 2 tells ioredis to disconnect, reconnect, and retry the + * command that triggered the error. After reconnect, DNS / SG routing + * should land on a writable primary. + * + * Empirical confirmation on the harness in TRI-8878: this option + * reduced a scale-up event from ~437,000 caller-surfaced errors to 2. + */ +export function defaultReconnectOnError(err: Error): boolean | 1 | 2 { + const msg = err.message ?? ""; + if (msg.startsWith("READONLY") || msg.startsWith("LOADING")) return 2; + return false; +} + const defaultOptions: Partial = { retryStrategy: (times: number) => { const delay = Math.min(times * 50, 1000); return delay; }, maxRetriesPerRequest: process.env.GITHUB_ACTIONS ? 50 : process.env.VITEST ? 5 : 20, + reconnectOnError: defaultReconnectOnError, }; const logger = new Logger("Redis", "debug");