Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,9 @@ export const OpenAIOAuthPlugin: Plugin = async ({ client }: PluginInput) => {
let refreshGuardian: RefreshGuardian | null = null;
let refreshGuardianConfigKey: string | null = null;
let refreshGuardianCleanupRegistered = false;
let sessionAffinityStore: SessionAffinityStore | null =
new SessionAffinityStore();
let sessionAffinityStore: SessionAffinityStore | null =
new SessionAffinityStore();
let sessionAffinityWriteVersion = 0;
let sessionAffinityConfigKey: string | null = null;
const entitlementCache = new EntitlementCache();
const preemptiveQuotaScheduler = new PreemptiveQuotaScheduler();
Expand Down Expand Up @@ -862,6 +863,8 @@ export const OpenAIOAuthPlugin: Plugin = async ({ client }: PluginInput) => {
.trim() || undefined;
const sessionAffinityKey =
threadIdCandidate ?? promptCacheKey ?? null;
const sessionAffinityVersion =
(sessionAffinityWriteVersion += 1);
const effectivePromptCacheKey =
(sessionAffinityKey ?? promptCacheKey ?? "")
.toString()
Expand Down Expand Up @@ -2273,15 +2276,19 @@ export const OpenAIOAuthPlugin: Plugin = async ({ client }: PluginInput) => {
isStreaming,
{
onResponseId: (responseId) => {
if (!responseContinuationEnabled) return;
sessionAffinityStore?.remember(
sessionAffinityKey,
successAccountForResponse.index,
);
sessionAffinityStore?.updateLastResponseId(
sessionAffinityKey,
responseId,
);
if (!responseContinuationEnabled) return;
sessionAffinityStore?.rememberWithVersion(
sessionAffinityKey,
successAccountForResponse.index,
Date.now(),
sessionAffinityVersion,
);
sessionAffinityStore?.updateLastResponseId(
sessionAffinityKey,
responseId,
Date.now(),
sessionAffinityVersion,
);
storedResponseIdForSuccess = true;
},
streamStallTimeoutMs,
Expand Down
23 changes: 23 additions & 0 deletions lib/session-affinity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ interface SessionAffinityEntry {
expiresAt: number;
lastResponseId?: string;
updatedAt: number;
writeVersion: number;
}

const DEFAULT_TTL_MS = 20 * 60 * 1000;
Expand Down Expand Up @@ -62,17 +63,34 @@ export class SessionAffinityStore {
}

remember(sessionKey: string | null | undefined, accountIndex: number, now = Date.now()): void {
this.rememberWithVersion(sessionKey, accountIndex, now, now);
}

rememberWithVersion(
sessionKey: string | null | undefined,
accountIndex: number,
now = Date.now(),
writeVersion = now,
): void {
const key = normalizeSessionKey(sessionKey);
if (!key) return;
if (!Number.isFinite(accountIndex) || accountIndex < 0) return;

const existingEntry = this.entries.get(key);
if (
existingEntry &&
existingEntry.expiresAt > now &&
existingEntry.writeVersion > writeVersion
) {
return;
}

this.setEntry(key, {
accountIndex,
expiresAt: now + this.ttlMs,
lastResponseId: existingEntry?.lastResponseId,
updatedAt: now,
writeVersion,
});
}

Expand Down Expand Up @@ -110,6 +128,7 @@ export class SessionAffinityStore {
sessionKey: string | null | undefined,
responseId: string | null | undefined,
now = Date.now(),
writeVersion = now,
): void {
const key = normalizeSessionKey(sessionKey);
const normalizedResponseId = typeof responseId === "string" ? responseId.trim() : "";
Expand All @@ -121,12 +140,16 @@ export class SessionAffinityStore {
this.entries.delete(key);
return;
}
if (entry.writeVersion > writeVersion) {
return;
}

this.setEntry(key, {
...entry,
expiresAt: now + this.ttlMs,
lastResponseId: normalizedResponseId,
updatedAt: now,
writeVersion,
});
}

Expand Down
8 changes: 4 additions & 4 deletions test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1642,7 +1642,7 @@ describe("OpenAIOAuthPlugin fetch handler", () => {
expect(secondBody?.previous_response_id).toBe("resp_standalone_789");
});

it("keeps account and previous_response_id aligned across overlapping same-session streams", async () => {
it("keeps the newest account and previous_response_id across overlapping same-session streams", async () => {
const { AccountManager } = await import("../lib/accounts.js");
const configModule = await import("../lib/config.js");
const fetchHelpers = await import("../lib/request/fetch-helpers.js");
Expand Down Expand Up @@ -1748,9 +1748,9 @@ describe("OpenAIOAuthPlugin fetch handler", () => {
previous_response_id?: string;
};
const thirdHeaders = new Headers(thirdInit.headers);
expect(thirdBody.previous_response_id).toBe("resp_first_123");
expect(thirdHeaders.get("x-test-account-id")).toBe("acc-1");
expect(thirdHeaders.get("x-test-access-token")).toBe("access-alpha");
expect(thirdBody.previous_response_id).toBe("resp_second_456");
expect(thirdHeaders.get("x-test-account-id")).toBe("acc-2");
expect(thirdHeaders.get("x-test-access-token")).toBe("access-beta");
});
it("compacts fast-session input before sending the upstream request when compaction succeeds", async () => {
const fetchHelpers = await import("../lib/request/fetch-helpers.js");
Expand Down
14 changes: 14 additions & 0 deletions test/session-affinity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,18 @@ describe("SessionAffinityStore", () => {
expect(store.getLastResponseId("session-a", 3_500)).toBe("resp_123");
expect(store.getPreferredAccountIndex("session-a", 3_500)).toBe(2);
});

it("ignores stale response-id writes from older overlapping requests", () => {
const store = new SessionAffinityStore({ ttlMs: 10_000, maxEntries: 4 });
store.rememberWithVersion("session-a", 1, 1_000, 1);
store.updateLastResponseId("session-a", "resp_first", 2_000, 1);
store.rememberWithVersion("session-a", 2, 3_000, 2);
store.updateLastResponseId("session-a", "resp_second", 4_000, 2);

store.rememberWithVersion("session-a", 1, 5_000, 1);
store.updateLastResponseId("session-a", "resp_stale", 5_000, 1);

expect(store.getPreferredAccountIndex("session-a", 5_500)).toBe(2);
expect(store.getLastResponseId("session-a", 5_500)).toBe("resp_second");
});
});