Skip to content

Commit e53d840

Browse files
committed
refactor: extract openai stream wrappers
1 parent f66bd10 commit e53d840

File tree

2 files changed

+264
-263
lines changed

2 files changed

+264
-263
lines changed

src/agents/pi-embedded-runner/extra-params.ts

Lines changed: 7 additions & 263 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ import {
99
usesOpenAiStringModeAnthropicToolChoice,
1010
} from "../provider-capabilities.js";
1111
import { log } from "./logger.js";
12+
import {
13+
createCodexDefaultTransportWrapper,
14+
createOpenAIDefaultTransportWrapper,
15+
createOpenAIResponsesContextManagementWrapper,
16+
createOpenAIServiceTierWrapper,
17+
resolveOpenAIServiceTier,
18+
} from "./openai-stream-wrappers.js";
1219

1320
const OPENROUTER_APP_HEADERS: Record<string, string> = {
1421
"HTTP-Referer": "https://openclaw.ai",
@@ -25,11 +32,6 @@ function resolveKilocodeAppHeaders(): Record<string, string> {
2532

2633
const ANTHROPIC_CONTEXT_1M_BETA = "context-1m-2025-08-07";
2734
const ANTHROPIC_1M_MODEL_PREFIXES = ["claude-opus-4", "claude-sonnet-4"] as const;
28-
// NOTE: We only force `store=true` for *direct* OpenAI Responses.
29-
// Codex responses (chatgpt.com/backend-api/codex/responses) require `store=false`.
30-
const OPENAI_RESPONSES_APIS = new Set(["openai-responses"]);
31-
const OPENAI_RESPONSES_PROVIDERS = new Set(["openai", "azure-openai-responses"]);
32-
3335
/**
3436
* Resolve provider-specific extra params from model config.
3537
* Used to pass through stream params like temperature/maxTokens.
@@ -69,7 +71,6 @@ export function resolveExtraParams(params: {
6971
}
7072

7173
type CacheRetention = "none" | "short" | "long";
72-
type OpenAIServiceTier = "auto" | "default" | "flex" | "priority";
7374
type CacheRetentionStreamOptions = Partial<SimpleStreamOptions> & {
7475
cacheRetention?: CacheRetention;
7576
openaiWsWarmup?: boolean;
@@ -214,263 +215,6 @@ function createBedrockNoCacheWrapper(baseStreamFn: StreamFn | undefined): Stream
214215
});
215216
}
216217

217-
function isDirectOpenAIBaseUrl(baseUrl: unknown): boolean {
218-
if (typeof baseUrl !== "string" || !baseUrl.trim()) {
219-
return false;
220-
}
221-
222-
try {
223-
const host = new URL(baseUrl).hostname.toLowerCase();
224-
return (
225-
host === "api.openai.com" || host === "chatgpt.com" || host.endsWith(".openai.azure.com")
226-
);
227-
} catch {
228-
const normalized = baseUrl.toLowerCase();
229-
return (
230-
normalized.includes("api.openai.com") ||
231-
normalized.includes("chatgpt.com") ||
232-
normalized.includes(".openai.azure.com")
233-
);
234-
}
235-
}
236-
237-
function isOpenAIPublicApiBaseUrl(baseUrl: unknown): boolean {
238-
if (typeof baseUrl !== "string" || !baseUrl.trim()) {
239-
return false;
240-
}
241-
242-
try {
243-
return new URL(baseUrl).hostname.toLowerCase() === "api.openai.com";
244-
} catch {
245-
return baseUrl.toLowerCase().includes("api.openai.com");
246-
}
247-
}
248-
249-
function shouldForceResponsesStore(model: {
250-
api?: unknown;
251-
provider?: unknown;
252-
baseUrl?: unknown;
253-
compat?: { supportsStore?: boolean };
254-
}): boolean {
255-
// Never force store=true when the model explicitly declares supportsStore=false
256-
// (e.g. Azure OpenAI Responses API without server-side persistence).
257-
if (model.compat?.supportsStore === false) {
258-
return false;
259-
}
260-
if (typeof model.api !== "string" || typeof model.provider !== "string") {
261-
return false;
262-
}
263-
if (!OPENAI_RESPONSES_APIS.has(model.api)) {
264-
return false;
265-
}
266-
if (!OPENAI_RESPONSES_PROVIDERS.has(model.provider)) {
267-
return false;
268-
}
269-
return isDirectOpenAIBaseUrl(model.baseUrl);
270-
}
271-
272-
function parsePositiveInteger(value: unknown): number | undefined {
273-
if (typeof value === "number" && Number.isFinite(value) && value > 0) {
274-
return Math.floor(value);
275-
}
276-
if (typeof value === "string") {
277-
const parsed = Number.parseInt(value, 10);
278-
if (Number.isFinite(parsed) && parsed > 0) {
279-
return parsed;
280-
}
281-
}
282-
return undefined;
283-
}
284-
285-
function resolveOpenAIResponsesCompactThreshold(model: { contextWindow?: unknown }): number {
286-
const contextWindow = parsePositiveInteger(model.contextWindow);
287-
if (contextWindow) {
288-
return Math.max(1_000, Math.floor(contextWindow * 0.7));
289-
}
290-
return 80_000;
291-
}
292-
293-
function shouldEnableOpenAIResponsesServerCompaction(
294-
model: {
295-
api?: unknown;
296-
provider?: unknown;
297-
baseUrl?: unknown;
298-
compat?: { supportsStore?: boolean };
299-
},
300-
extraParams: Record<string, unknown> | undefined,
301-
): boolean {
302-
const configured = extraParams?.responsesServerCompaction;
303-
if (configured === false) {
304-
return false;
305-
}
306-
if (!shouldForceResponsesStore(model)) {
307-
return false;
308-
}
309-
if (configured === true) {
310-
return true;
311-
}
312-
// Auto-enable for direct OpenAI Responses models.
313-
return model.provider === "openai";
314-
}
315-
316-
function shouldStripResponsesStore(
317-
model: { api?: unknown; compat?: { supportsStore?: boolean } },
318-
forceStore: boolean,
319-
): boolean {
320-
if (forceStore) {
321-
return false;
322-
}
323-
if (typeof model.api !== "string") {
324-
return false;
325-
}
326-
return OPENAI_RESPONSES_APIS.has(model.api) && model.compat?.supportsStore === false;
327-
}
328-
329-
function applyOpenAIResponsesPayloadOverrides(params: {
330-
payloadObj: Record<string, unknown>;
331-
forceStore: boolean;
332-
stripStore: boolean;
333-
useServerCompaction: boolean;
334-
compactThreshold: number;
335-
}): void {
336-
if (params.forceStore) {
337-
params.payloadObj.store = true;
338-
}
339-
if (params.stripStore) {
340-
delete params.payloadObj.store;
341-
}
342-
if (params.useServerCompaction && params.payloadObj.context_management === undefined) {
343-
params.payloadObj.context_management = [
344-
{
345-
type: "compaction",
346-
compact_threshold: params.compactThreshold,
347-
},
348-
];
349-
}
350-
}
351-
352-
function createOpenAIResponsesContextManagementWrapper(
353-
baseStreamFn: StreamFn | undefined,
354-
extraParams: Record<string, unknown> | undefined,
355-
): StreamFn {
356-
const underlying = baseStreamFn ?? streamSimple;
357-
return (model, context, options) => {
358-
const forceStore = shouldForceResponsesStore(model);
359-
const useServerCompaction = shouldEnableOpenAIResponsesServerCompaction(model, extraParams);
360-
// Strip `store` from the payload when the model declares supportsStore=false.
361-
// pi-ai upstream hardcodes `store: false` for Responses API; strict
362-
// OpenAI-compatible endpoints (e.g. Gemini via Cloudflare) reject it.
363-
const stripStore = shouldStripResponsesStore(model, forceStore);
364-
if (!forceStore && !useServerCompaction && !stripStore) {
365-
return underlying(model, context, options);
366-
}
367-
368-
const compactThreshold =
369-
parsePositiveInteger(extraParams?.responsesCompactThreshold) ??
370-
resolveOpenAIResponsesCompactThreshold(model);
371-
const originalOnPayload = options?.onPayload;
372-
return underlying(model, context, {
373-
...options,
374-
onPayload: (payload) => {
375-
if (payload && typeof payload === "object") {
376-
applyOpenAIResponsesPayloadOverrides({
377-
payloadObj: payload as Record<string, unknown>,
378-
forceStore,
379-
stripStore,
380-
useServerCompaction,
381-
compactThreshold,
382-
});
383-
}
384-
originalOnPayload?.(payload);
385-
},
386-
});
387-
};
388-
}
389-
390-
function normalizeOpenAIServiceTier(value: unknown): OpenAIServiceTier | undefined {
391-
if (typeof value !== "string") {
392-
return undefined;
393-
}
394-
const normalized = value.trim().toLowerCase();
395-
if (
396-
normalized === "auto" ||
397-
normalized === "default" ||
398-
normalized === "flex" ||
399-
normalized === "priority"
400-
) {
401-
return normalized;
402-
}
403-
return undefined;
404-
}
405-
406-
function resolveOpenAIServiceTier(
407-
extraParams: Record<string, unknown> | undefined,
408-
): OpenAIServiceTier | undefined {
409-
const raw = extraParams?.serviceTier ?? extraParams?.service_tier;
410-
const normalized = normalizeOpenAIServiceTier(raw);
411-
if (raw !== undefined && normalized === undefined) {
412-
const rawSummary = typeof raw === "string" ? raw : typeof raw;
413-
log.warn(`ignoring invalid OpenAI service tier param: ${rawSummary}`);
414-
}
415-
return normalized;
416-
}
417-
418-
function createOpenAIServiceTierWrapper(
419-
baseStreamFn: StreamFn | undefined,
420-
serviceTier: OpenAIServiceTier,
421-
): StreamFn {
422-
const underlying = baseStreamFn ?? streamSimple;
423-
return (model, context, options) => {
424-
if (
425-
model.api !== "openai-responses" ||
426-
model.provider !== "openai" ||
427-
!isOpenAIPublicApiBaseUrl(model.baseUrl)
428-
) {
429-
return underlying(model, context, options);
430-
}
431-
const originalOnPayload = options?.onPayload;
432-
return underlying(model, context, {
433-
...options,
434-
onPayload: (payload) => {
435-
if (payload && typeof payload === "object") {
436-
const payloadObj = payload as Record<string, unknown>;
437-
if (payloadObj.service_tier === undefined) {
438-
payloadObj.service_tier = serviceTier;
439-
}
440-
}
441-
originalOnPayload?.(payload);
442-
},
443-
});
444-
};
445-
}
446-
447-
function createCodexDefaultTransportWrapper(baseStreamFn: StreamFn | undefined): StreamFn {
448-
const underlying = baseStreamFn ?? streamSimple;
449-
return (model, context, options) =>
450-
underlying(model, context, {
451-
...options,
452-
transport: options?.transport ?? "auto",
453-
});
454-
}
455-
456-
function createOpenAIDefaultTransportWrapper(baseStreamFn: StreamFn | undefined): StreamFn {
457-
const underlying = baseStreamFn ?? streamSimple;
458-
return (model, context, options) => {
459-
const typedOptions = options as
460-
| (SimpleStreamOptions & { openaiWsWarmup?: boolean })
461-
| undefined;
462-
const mergedOptions = {
463-
...options,
464-
transport: options?.transport ?? "auto",
465-
// Warm-up is optional in OpenAI docs; enabled by default here for lower
466-
// first-turn latency on WebSocket sessions. Set params.openaiWsWarmup=false
467-
// to disable per model.
468-
openaiWsWarmup: typedOptions?.openaiWsWarmup ?? true,
469-
} as SimpleStreamOptions;
470-
return underlying(model, context, mergedOptions);
471-
};
472-
}
473-
474218
function isAnthropic1MModel(modelId: string): boolean {
475219
const normalized = modelId.trim().toLowerCase();
476220
return ANTHROPIC_1M_MODEL_PREFIXES.some((prefix) => normalized.startsWith(prefix));

0 commit comments

Comments
 (0)