Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,10 @@ const EnvironmentSchema = z

REALTIME_STREAMS_S2_BASIN: z.string().optional(),
REALTIME_STREAMS_S2_ACCESS_TOKEN: z.string().optional(),
REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS: z.coerce
.number()
.int()
.default(60_000 * 60 * 24), // 1 day
REALTIME_STREAMS_S2_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
"chunkIndex",
"0",
"data",
part
JSON.stringify(part) + "\n"
);

// Set TTL for cleanup when stream is done
Expand Down
47 changes: 38 additions & 9 deletions apps/webapp/app/services/realtime/s2realtimeStreams.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// app/realtime/S2RealtimeStreams.ts
import type { UnkeyCache } from "@internal/cache";
import { StreamIngestor, StreamResponder, StreamResponseOptions } from "./types";
import { Logger, LogLevel } from "@trigger.dev/core/logger";
import { randomUUID } from "node:crypto";
Expand All @@ -17,6 +18,12 @@ export type S2RealtimeStreamsOptions = {

logger?: Logger;
logLevel?: LogLevel;

accessTokenExpirationInMs?: number;

cache?: UnkeyCache<{
accessToken: string;
}>;
};

type S2IssueAccessTokenResponse = { access_token: string };
Expand All @@ -41,6 +48,12 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
private readonly logger: Logger;
private readonly level: LogLevel;

private readonly accessTokenExpirationInMs: number;

private readonly cache?: UnkeyCache<{
accessToken: string;
}>;

constructor(opts: S2RealtimeStreamsOptions) {
this.basin = opts.basin;
this.baseUrl = `https://${this.basin}.b.aws.s2.dev/v1`;
Expand All @@ -54,14 +67,13 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {

this.logger = opts.logger ?? new Logger("S2RealtimeStreams", opts.logLevel ?? "info");
this.level = opts.logLevel ?? "info";
}

private toStreamName(runId: string, streamId: string): string {
return `${this.toStreamPrefix(runId)}${streamId}`;
this.cache = opts.cache;
this.accessTokenExpirationInMs = opts.accessTokenExpirationInMs ?? 60_000 * 60 * 24; // 1 day
}

private toStreamPrefix(runId: string): string {
return `${this.streamPrefix}/runs/${runId}/`;
private toStreamName(runId: string, streamId: string): string {
return `${this.streamPrefix}/runs/${runId}/${streamId}`;
}

async initializeStream(
Expand All @@ -70,11 +82,12 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
): Promise<{ responseHeaders?: Record<string, string> }> {
const id = randomUUID();

const accessToken = await this.s2IssueAccessToken(id, runId, streamId);
const accessToken = await this.getS2AccessToken(id);

return {
responseHeaders: {
"X-S2-Access-Token": accessToken,
"X-S2-Stream-Name": `/runs/${runId}/${streamId}`,
"X-S2-Basin": this.basin,
"X-S2-Flush-Interval-Ms": this.flushIntervalMs.toString(),
"X-S2-Max-Retries": this.maxRetries.toString(),
Expand Down Expand Up @@ -153,7 +166,23 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
return (await res.json()) as S2AppendAck;
}

private async s2IssueAccessToken(id: string, runId: string, streamId: string): Promise<string> {
private async getS2AccessToken(id: string): Promise<string> {
if (!this.cache) {
return this.s2IssueAccessToken(id);
}

const result = await this.cache.accessToken.swr(this.streamPrefix, async () => {
return this.s2IssueAccessToken(id);
});

if (!result.val) {
throw new Error("Failed to get S2 access token");
}

return result.val;
}

private async s2IssueAccessToken(id: string): Promise<string> {
// POST /v1/access-tokens
const res = await fetch(`https://aws.s2.dev/v1/access-tokens`, {
method: "POST",
Expand All @@ -169,10 +198,10 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
},
ops: ["append", "create-stream"],
streams: {
prefix: this.toStreamPrefix(runId),
prefix: this.streamPrefix,
},
},
expires_at: new Date(Date.now() + 1000 * 60 * 60 * 24).toISOString(), // 1 day
expires_at: new Date(Date.now() + this.accessTokenExpirationInMs).toISOString(),
auto_prefix_streams: true,
}),
});
Expand Down
45 changes: 43 additions & 2 deletions apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import {
createCache,
createMemoryStore,
DefaultStatefulContext,
Namespace,
RedisCacheStore,
} from "@internal/cache";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
import { RedisRealtimeStreams } from "./redisRealtimeStreams.server";
import { AuthenticatedEnvironment } from "../apiAuth.server";
import { StreamIngestor, StreamResponder } from "./types";
import { RedisRealtimeStreams } from "./redisRealtimeStreams.server";
import { S2RealtimeStreams } from "./s2realtimeStreams.server";
import { StreamIngestor, StreamResponder } from "./types";

function initializeRedisRealtimeStreams() {
return new RedisRealtimeStreams({
Expand Down Expand Up @@ -44,9 +51,43 @@ export function getRealtimeStreamInstance(
flushIntervalMs: env.REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS,
maxRetries: env.REALTIME_STREAMS_S2_MAX_RETRIES,
s2WaitSeconds: env.REALTIME_STREAMS_S2_WAIT_SECONDS,
accessTokenExpirationInMs: env.REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS,
cache: s2RealtimeStreamsCache,
});
}

throw new Error("Realtime streams v2 is required for this run but S2 configuration is missing");
}
}

const s2RealtimeStreamsCache = singleton(
"s2RealtimeStreamsCache",
initializeS2RealtimeStreamsCache
);

function initializeS2RealtimeStreamsCache() {
const ctx = new DefaultStatefulContext();
const redisCacheStore = new RedisCacheStore({
name: "s2-realtime-streams-cache",
connection: {
port: env.REALTIME_STREAMS_REDIS_PORT,
host: env.REALTIME_STREAMS_REDIS_HOST,
username: env.REALTIME_STREAMS_REDIS_USERNAME,
password: env.REALTIME_STREAMS_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.REALTIME_STREAMS_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
keyPrefix: "s2-realtime-streams-cache:",
},
useModernCacheKeyBuilder: true,
});

const memoryStore = createMemoryStore(5000, 0.001);

return createCache({
accessToken: new Namespace<string>(ctx, {
stores: [memoryStore, redisCacheStore],
fresh: Math.floor(env.REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS / 2),
stale: Math.floor(env.REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS / 2 + 60_000),
}),
});
}
5 changes: 4 additions & 1 deletion packages/core/src/v3/realtimeStreams/streamInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class StreamInstance<T> implements StreamsWriter {
})
: new StreamsWriterV2({
basin: parsedResponse.basin,
stream: this.options.key,
stream: parsedResponse.streamName ?? this.options.key,
accessToken: parsedResponse.accessToken,
source: this.options.source,
signal: this.options.signal,
Expand Down Expand Up @@ -105,6 +105,7 @@ type ParsedStreamResponse =
basin: string;
flushIntervalMs?: number;
maxRetries?: number;
streamName?: string;
};

function parseCreateStreamResponse(
Expand All @@ -124,13 +125,15 @@ function parseCreateStreamResponse(

const flushIntervalMs = headers?.["x-s2-flush-interval-ms"];
const maxRetries = headers?.["x-s2-max-retries"];
const streamName = headers?.["x-s2-stream-name"];

return {
version: "v2",
accessToken,
basin,
flushIntervalMs: flushIntervalMs ? parseInt(flushIntervalMs) : undefined,
maxRetries: maxRetries ? parseInt(maxRetries) : undefined,
streamName,
};
}

Expand Down
Loading