diff --git a/index.ts b/index.ts index e705bbb6..d7f5af75 100644 --- a/index.ts +++ b/index.ts @@ -314,8 +314,9 @@ export const OpenAIOAuthPlugin: Plugin = async ({ client }: PluginInput) => { let refreshGuardian: RefreshGuardian | null = null; let refreshGuardianConfigKey: string | null = null; let refreshGuardianCleanupRegistered = false; - let sessionAffinityStore: SessionAffinityStore | null = - new SessionAffinityStore(); +let sessionAffinityStore: SessionAffinityStore | null = + new SessionAffinityStore(); +let sessionAffinityWriteVersion = 0; let sessionAffinityConfigKey: string | null = null; const entitlementCache = new EntitlementCache(); const preemptiveQuotaScheduler = new PreemptiveQuotaScheduler(); @@ -862,6 +863,8 @@ export const OpenAIOAuthPlugin: Plugin = async ({ client }: PluginInput) => { .trim() || undefined; const sessionAffinityKey = threadIdCandidate ?? promptCacheKey ?? null; + const sessionAffinityVersion = + (sessionAffinityWriteVersion += 1); const effectivePromptCacheKey = (sessionAffinityKey ?? promptCacheKey ?? "") .toString() @@ -2273,15 +2276,19 @@ export const OpenAIOAuthPlugin: Plugin = async ({ client }: PluginInput) => { isStreaming, { onResponseId: (responseId) => { - if (!responseContinuationEnabled) return; - sessionAffinityStore?.remember( - sessionAffinityKey, - successAccountForResponse.index, - ); - sessionAffinityStore?.updateLastResponseId( - sessionAffinityKey, - responseId, - ); + if (!responseContinuationEnabled) return; + sessionAffinityStore?.rememberWithVersion( + sessionAffinityKey, + successAccountForResponse.index, + Date.now(), + sessionAffinityVersion, + ); + sessionAffinityStore?.updateLastResponseId( + sessionAffinityKey, + responseId, + Date.now(), + sessionAffinityVersion, + ); storedResponseIdForSuccess = true; }, streamStallTimeoutMs, diff --git a/lib/session-affinity.ts b/lib/session-affinity.ts index 1ce27e30..91efda33 100644 --- a/lib/session-affinity.ts +++ b/lib/session-affinity.ts @@ -12,6 +12,7 @@ interface SessionAffinityEntry { expiresAt: number; lastResponseId?: string; updatedAt: number; + writeVersion: number; } const DEFAULT_TTL_MS = 20 * 60 * 1000; @@ -62,17 +63,34 @@ export class SessionAffinityStore { } remember(sessionKey: string | null | undefined, accountIndex: number, now = Date.now()): void { + this.rememberWithVersion(sessionKey, accountIndex, now, now); + } + + rememberWithVersion( + sessionKey: string | null | undefined, + accountIndex: number, + now = Date.now(), + writeVersion = now, + ): void { const key = normalizeSessionKey(sessionKey); if (!key) return; if (!Number.isFinite(accountIndex) || accountIndex < 0) return; const existingEntry = this.entries.get(key); + if ( + existingEntry && + existingEntry.expiresAt > now && + existingEntry.writeVersion > writeVersion + ) { + return; + } this.setEntry(key, { accountIndex, expiresAt: now + this.ttlMs, lastResponseId: existingEntry?.lastResponseId, updatedAt: now, + writeVersion, }); } @@ -110,6 +128,7 @@ export class SessionAffinityStore { sessionKey: string | null | undefined, responseId: string | null | undefined, now = Date.now(), + writeVersion = now, ): void { const key = normalizeSessionKey(sessionKey); const normalizedResponseId = typeof responseId === "string" ? responseId.trim() : ""; @@ -121,12 +140,16 @@ export class SessionAffinityStore { this.entries.delete(key); return; } + if (entry.writeVersion > writeVersion) { + return; + } this.setEntry(key, { ...entry, expiresAt: now + this.ttlMs, lastResponseId: normalizedResponseId, updatedAt: now, + writeVersion, }); } diff --git a/test/index.test.ts b/test/index.test.ts index b5aed127..97d3a468 100644 --- a/test/index.test.ts +++ b/test/index.test.ts @@ -1642,7 +1642,7 @@ describe("OpenAIOAuthPlugin fetch handler", () => { expect(secondBody?.previous_response_id).toBe("resp_standalone_789"); }); - it("keeps account and previous_response_id aligned across overlapping same-session streams", async () => { + it("keeps the newest account and previous_response_id across overlapping same-session streams", async () => { const { AccountManager } = await import("../lib/accounts.js"); const configModule = await import("../lib/config.js"); const fetchHelpers = await import("../lib/request/fetch-helpers.js"); @@ -1748,9 +1748,9 @@ describe("OpenAIOAuthPlugin fetch handler", () => { previous_response_id?: string; }; const thirdHeaders = new Headers(thirdInit.headers); - expect(thirdBody.previous_response_id).toBe("resp_first_123"); - expect(thirdHeaders.get("x-test-account-id")).toBe("acc-1"); - expect(thirdHeaders.get("x-test-access-token")).toBe("access-alpha"); + expect(thirdBody.previous_response_id).toBe("resp_second_456"); + expect(thirdHeaders.get("x-test-account-id")).toBe("acc-2"); + expect(thirdHeaders.get("x-test-access-token")).toBe("access-beta"); }); it("compacts fast-session input before sending the upstream request when compaction succeeds", async () => { const fetchHelpers = await import("../lib/request/fetch-helpers.js"); diff --git a/test/session-affinity.test.ts b/test/session-affinity.test.ts index 1bf1ae73..dd304823 100644 --- a/test/session-affinity.test.ts +++ b/test/session-affinity.test.ts @@ -143,4 +143,18 @@ describe("SessionAffinityStore", () => { expect(store.getLastResponseId("session-a", 3_500)).toBe("resp_123"); expect(store.getPreferredAccountIndex("session-a", 3_500)).toBe(2); }); + + it("ignores stale response-id writes from older overlapping requests", () => { + const store = new SessionAffinityStore({ ttlMs: 10_000, maxEntries: 4 }); + store.rememberWithVersion("session-a", 1, 1_000, 1); + store.updateLastResponseId("session-a", "resp_first", 2_000, 1); + store.rememberWithVersion("session-a", 2, 3_000, 2); + store.updateLastResponseId("session-a", "resp_second", 4_000, 2); + + store.rememberWithVersion("session-a", 1, 5_000, 1); + store.updateLastResponseId("session-a", "resp_stale", 5_000, 1); + + expect(store.getPreferredAccountIndex("session-a", 5_500)).toBe(2); + expect(store.getLastResponseId("session-a", 5_500)).toBe("resp_second"); + }); });