Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/redis-reconnect-on-readonly-loading.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Add `reconnectOnError` to the shared ioredis client config so READONLY / LOADING reply errors during ElastiCache node-type changes trigger a disconnect-reconnect-retry cycle instead of surfacing to caller code.
3 changes: 2 additions & 1 deletion apps/webapp/app/presenters/v3/DevPresence.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Redis, { type RedisOptions } from "ioredis";
import { defaultReconnectOnError } from "@internal/redis";
import { env } from "~/env.server";

const PRESENCE_KEY_PREFIX = "dev-presence:connection:";
Expand All @@ -7,7 +8,7 @@ export class DevPresence {
private redis: Redis;

constructor(options: RedisOptions) {
this.redis = new Redis(options);
this.redis = new Redis({ reconnectOnError: defaultReconnectOnError, ...options });
}

async isConnected(environmentId: string) {
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/redis.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Cluster, Redis, type ClusterNode, type ClusterOptions } from "ioredis";
import { defaultReconnectOnError } from "@internal/redis";
import { logger } from "./services/logger.server";

export type RedisWithClusterOptions = {
Expand Down Expand Up @@ -42,6 +43,7 @@ export function createRedisClient(
username: options.username,
password: options.password,
enableAutoPipelining: true,
reconnectOnError: defaultReconnectOnError,
...(options.tlsDisabled
? {
checkServerIdentity: () => {
Expand Down Expand Up @@ -69,6 +71,7 @@ export function createRedisClient(
password: options.password,
enableAutoPipelining: true,
keyPrefix: options.keyPrefix,
reconnectOnError: defaultReconnectOnError,
...(options.tlsDisabled ? {} : { tls: {} }),
});
}
Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/services/autoIncrementCounter.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Redis, { RedisOptions } from "ioredis";
import { defaultReconnectOnError } from "@internal/redis";
import { Prisma, PrismaClientOrTransaction, PrismaTransactionOptions, prisma } from "~/db.server";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
Expand All @@ -11,7 +12,7 @@ export class AutoIncrementCounter {
private _redis: Redis;

constructor(private options: AutoIncrementCounterOptions) {
this._redis = new Redis(options.redis);
this._redis = new Redis({ reconnectOnError: defaultReconnectOnError, ...options.redis });
}

async incrementInTransaction<T>(
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/services/inputStreamWaitpointCache.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Redis } from "ioredis";
import { defaultReconnectOnError } from "@internal/redis";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
import { logger } from "./logger.server";
Expand All @@ -24,6 +25,7 @@ function initializeRedis(): Redis | undefined {
password: env.CACHE_REDIS_PASSWORD,
keyPrefix: "tr:",
enableAutoPipelining: true,
reconnectOnError: defaultReconnectOnError,
...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Redis } from "ioredis";
import { defaultReconnectOnError } from "@internal/redis";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
import { logger } from "./logger.server";
Expand All @@ -18,6 +19,7 @@ function initializeRedis(): Redis | undefined {
password: env.CACHE_REDIS_PASSWORD,
keyPrefix: "tr:",
enableAutoPipelining: true,
reconnectOnError: defaultReconnectOnError,
...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Logger, LogLevel } from "@trigger.dev/core/logger";
import Redis, { RedisOptions } from "ioredis";
import { defaultReconnectOnError } from "@internal/redis";
import { env } from "~/env.server";
import { StreamIngestor, StreamResponder, StreamResponseOptions } from "./types";

Expand Down Expand Up @@ -35,6 +36,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
private get sharedRedis(): Redis {
if (!this._sharedRedis) {
this._sharedRedis = new Redis({
reconnectOnError: defaultReconnectOnError,
...this.options.redis,
Comment thread
ericallam marked this conversation as resolved.
connectionName: "realtime:shared",
});
Expand All @@ -56,7 +58,11 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
signal: AbortSignal,
options?: StreamResponseOptions
): Promise<Response> {
const redis = new Redis({ ...this.options.redis, connectionName: "realtime:streamResponse" });
const redis = new Redis({
reconnectOnError: defaultReconnectOnError,
...this.options.redis,
connectionName: "realtime:streamResponse",
});
const streamKey = `stream:${runId}:${streamId}`;
let isCleanedUp = false;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Redis } from "ioredis";
import { defaultReconnectOnError } from "@internal/redis";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
import { logger } from "./logger.server";
Expand Down Expand Up @@ -28,6 +29,7 @@ function initializeRedis(): Redis | undefined {
password: env.CACHE_REDIS_PASSWORD,
keyPrefix: "tr:",
enableAutoPipelining: true,
reconnectOnError: defaultReconnectOnError,
...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
});
}
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/services/taskIdentifierCache.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Redis } from "ioredis";
import { defaultReconnectOnError } from "@internal/redis";
import type { TaskTriggerSource } from "@trigger.dev/database";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
Expand Down Expand Up @@ -53,6 +54,7 @@ function initializeRedis(): Redis | undefined {
password: env.CACHE_REDIS_PASSWORD,
keyPrefix: "tr:",
enableAutoPipelining: true,
reconnectOnError: defaultReconnectOnError,
...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
});
}
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/handleSocketIo.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type {
WorkerServerToClientEvents,
} from "@trigger.dev/core/v3/workers";
import { ZodNamespace } from "@trigger.dev/core/v3/zodNamespace";
import { defaultReconnectOnError } from "@internal/redis";
import { Redis } from "ioredis";
import { Namespace, Server, Socket } from "socket.io";
import { env } from "~/env.server";
Expand Down Expand Up @@ -93,6 +94,7 @@ function initializeSocketIOServerInstance() {
username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
enableAutoPipelining: true,
reconnectOnError: defaultReconnectOnError,
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
});
const subClient = pubClient.duplicate();
Expand Down
20 changes: 20 additions & 0 deletions internal-packages/redis/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,32 @@ import { Logger } from "@trigger.dev/core/logger";

export { Redis, type Callback, type RedisOptions, type Result, type RedisCommander } from "ioredis";

/**
* Reply-error -> reconnect mapping. Without this hook, an ElastiCache
* vertical scale-up surfaces tens of thousands of READONLY / LOADING
* reply errors to caller code over a healthy TCP/TLS connection (the
* client keeps talking to a node whose role swapped underneath it).
*
* Returning 2 tells ioredis to disconnect, reconnect, and retry the
* command that triggered the error. After reconnect, DNS / SG routing
* should land on a writable primary.
*
* Empirical confirmation on the harness in TRI-8878: this option
* reduced a scale-up event from ~437,000 caller-surfaced errors to 2.
*/
export function defaultReconnectOnError(err: Error): boolean | 1 | 2 {
const msg = err.message ?? "";
if (msg.startsWith("READONLY") || msg.startsWith("LOADING")) return 2;
return false;
}

const defaultOptions: Partial<RedisOptions> = {
retryStrategy: (times: number) => {
const delay = Math.min(times * 50, 1000);
return delay;
},
maxRetriesPerRequest: process.env.GITHUB_ACTIONS ? 50 : process.env.VITEST ? 5 : 20,
reconnectOnError: defaultReconnectOnError,
};

const logger = new Logger("Redis", "debug");
Expand Down
Loading