diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 20925715b5..89b93f1012 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1226,6 +1226,10 @@ const EnvironmentSchema = z REALTIME_STREAMS_S2_BASIN: z.string().optional(), REALTIME_STREAMS_S2_ACCESS_TOKEN: z.string().optional(), + REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS: z.coerce + .number() + .int() + .default(60_000 * 60 * 24), // 1 day REALTIME_STREAMS_S2_LOG_LEVEL: z .enum(["log", "error", "warn", "info", "debug"]) .default("info"), diff --git a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts index b5c8c57322..25fb2d08e1 100644 --- a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts @@ -381,7 +381,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { "chunkIndex", "0", "data", - part + JSON.stringify(part) + "\n" ); // Set TTL for cleanup when stream is done diff --git a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts index ebd1b2f6f4..1d03116ff5 100644 --- a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts @@ -1,4 +1,5 @@ // app/realtime/S2RealtimeStreams.ts +import type { UnkeyCache } from "@internal/cache"; import { StreamIngestor, StreamResponder, StreamResponseOptions } from "./types"; import { Logger, LogLevel } from "@trigger.dev/core/logger"; import { randomUUID } from "node:crypto"; @@ -17,6 +18,12 @@ export type S2RealtimeStreamsOptions = { logger?: Logger; logLevel?: LogLevel; + + accessTokenExpirationInMs?: number; + + cache?: UnkeyCache<{ + accessToken: string; + }>; }; type S2IssueAccessTokenResponse = { access_token: string }; @@ -41,6 +48,12 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { private readonly logger: Logger; private readonly level: LogLevel; + private readonly accessTokenExpirationInMs: number; + + private readonly cache?: UnkeyCache<{ + accessToken: string; + }>; + constructor(opts: S2RealtimeStreamsOptions) { this.basin = opts.basin; this.baseUrl = `https://${this.basin}.b.aws.s2.dev/v1`; @@ -54,14 +67,13 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { this.logger = opts.logger ?? new Logger("S2RealtimeStreams", opts.logLevel ?? "info"); this.level = opts.logLevel ?? "info"; - } - private toStreamName(runId: string, streamId: string): string { - return `${this.toStreamPrefix(runId)}${streamId}`; + this.cache = opts.cache; + this.accessTokenExpirationInMs = opts.accessTokenExpirationInMs ?? 60_000 * 60 * 24; // 1 day } - private toStreamPrefix(runId: string): string { - return `${this.streamPrefix}/runs/${runId}/`; + private toStreamName(runId: string, streamId: string): string { + return `${this.streamPrefix}/runs/${runId}/${streamId}`; } async initializeStream( @@ -70,11 +82,12 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { ): Promise<{ responseHeaders?: Record }> { const id = randomUUID(); - const accessToken = await this.s2IssueAccessToken(id, runId, streamId); + const accessToken = await this.getS2AccessToken(id); return { responseHeaders: { "X-S2-Access-Token": accessToken, + "X-S2-Stream-Name": `/runs/${runId}/${streamId}`, "X-S2-Basin": this.basin, "X-S2-Flush-Interval-Ms": this.flushIntervalMs.toString(), "X-S2-Max-Retries": this.maxRetries.toString(), @@ -153,7 +166,23 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { return (await res.json()) as S2AppendAck; } - private async s2IssueAccessToken(id: string, runId: string, streamId: string): Promise { + private async getS2AccessToken(id: string): Promise { + if (!this.cache) { + return this.s2IssueAccessToken(id); + } + + const result = await this.cache.accessToken.swr(this.streamPrefix, async () => { + return this.s2IssueAccessToken(id); + }); + + if (!result.val) { + throw new Error("Failed to get S2 access token"); + } + + return result.val; + } + + private async s2IssueAccessToken(id: string): Promise { // POST /v1/access-tokens const res = await fetch(`https://aws.s2.dev/v1/access-tokens`, { method: "POST", @@ -169,10 +198,10 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { }, ops: ["append", "create-stream"], streams: { - prefix: this.toStreamPrefix(runId), + prefix: this.streamPrefix, }, }, - expires_at: new Date(Date.now() + 1000 * 60 * 60 * 24).toISOString(), // 1 day + expires_at: new Date(Date.now() + this.accessTokenExpirationInMs).toISOString(), auto_prefix_streams: true, }), }); diff --git a/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts b/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts index d913d510fb..34792cba03 100644 --- a/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts +++ b/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts @@ -1,9 +1,16 @@ +import { + createCache, + createMemoryStore, + DefaultStatefulContext, + Namespace, + RedisCacheStore, +} from "@internal/cache"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; -import { RedisRealtimeStreams } from "./redisRealtimeStreams.server"; import { AuthenticatedEnvironment } from "../apiAuth.server"; -import { StreamIngestor, StreamResponder } from "./types"; +import { RedisRealtimeStreams } from "./redisRealtimeStreams.server"; import { S2RealtimeStreams } from "./s2realtimeStreams.server"; +import { StreamIngestor, StreamResponder } from "./types"; function initializeRedisRealtimeStreams() { return new RedisRealtimeStreams({ @@ -44,9 +51,43 @@ export function getRealtimeStreamInstance( flushIntervalMs: env.REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS, maxRetries: env.REALTIME_STREAMS_S2_MAX_RETRIES, s2WaitSeconds: env.REALTIME_STREAMS_S2_WAIT_SECONDS, + accessTokenExpirationInMs: env.REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS, + cache: s2RealtimeStreamsCache, }); } throw new Error("Realtime streams v2 is required for this run but S2 configuration is missing"); } } + +const s2RealtimeStreamsCache = singleton( + "s2RealtimeStreamsCache", + initializeS2RealtimeStreamsCache +); + +function initializeS2RealtimeStreamsCache() { + const ctx = new DefaultStatefulContext(); + const redisCacheStore = new RedisCacheStore({ + name: "s2-realtime-streams-cache", + connection: { + port: env.REALTIME_STREAMS_REDIS_PORT, + host: env.REALTIME_STREAMS_REDIS_HOST, + username: env.REALTIME_STREAMS_REDIS_USERNAME, + password: env.REALTIME_STREAMS_REDIS_PASSWORD, + enableAutoPipelining: true, + ...(env.REALTIME_STREAMS_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + keyPrefix: "s2-realtime-streams-cache:", + }, + useModernCacheKeyBuilder: true, + }); + + const memoryStore = createMemoryStore(5000, 0.001); + + return createCache({ + accessToken: new Namespace(ctx, { + stores: [memoryStore, redisCacheStore], + fresh: Math.floor(env.REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS / 2), + stale: Math.floor(env.REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS / 2 + 60_000), + }), + }); +} diff --git a/packages/core/src/v3/realtimeStreams/streamInstance.ts b/packages/core/src/v3/realtimeStreams/streamInstance.ts index 6982066afa..5efbcb225a 100644 --- a/packages/core/src/v3/realtimeStreams/streamInstance.ts +++ b/packages/core/src/v3/realtimeStreams/streamInstance.ts @@ -50,7 +50,7 @@ export class StreamInstance implements StreamsWriter { }) : new StreamsWriterV2({ basin: parsedResponse.basin, - stream: this.options.key, + stream: parsedResponse.streamName ?? this.options.key, accessToken: parsedResponse.accessToken, source: this.options.source, signal: this.options.signal, @@ -105,6 +105,7 @@ type ParsedStreamResponse = basin: string; flushIntervalMs?: number; maxRetries?: number; + streamName?: string; }; function parseCreateStreamResponse( @@ -124,6 +125,7 @@ function parseCreateStreamResponse( const flushIntervalMs = headers?.["x-s2-flush-interval-ms"]; const maxRetries = headers?.["x-s2-max-retries"]; + const streamName = headers?.["x-s2-stream-name"]; return { version: "v2", @@ -131,6 +133,7 @@ function parseCreateStreamResponse( basin, flushIntervalMs: flushIntervalMs ? parseInt(flushIntervalMs) : undefined, maxRetries: maxRetries ? parseInt(maxRetries) : undefined, + streamName, }; }