Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ stats.html
.env.local
.env.*.local
.claude/worktrees
.mcp.json
1 change: 1 addition & 0 deletions packages/sdk/browser/contract-tests/entity/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"scripts": {
"install-playwright-browsers": "playwright install --with-deps chromium",
"start": "tsc --noEmit && vite --open=true",
"start:headless": "tsc --noEmit && vite",
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Helper for LLMs to run contract tests.

"build": "tsc --noEmit && vite build",
"lint": "eslint ./src",
"start:adapter": "sdk-testharness-server adapter",
Expand Down
51 changes: 34 additions & 17 deletions packages/sdk/browser/contract-tests/entity/src/ClientEntity.ts
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The jank in here may be temporary as @tanderson-ld and @aaron-zeisler are adjusting the tests.

Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,24 @@ function makeSdkConfig(options: SDKConfigParams, tag: string) {
if (options.dataSystem) {
const dataSystem: any = {};

// Helper to apply endpoint overrides from a mode definition to global URIs.
const applyEndpointOverrides = (modeDef: SDKConfigModeDefinition) => {
(modeDef.synchronizers ?? []).forEach((sync) => {
if (sync.streaming?.baseUri) {
cf.streamUri = sync.streaming.baseUri;
cf.streamInitialReconnectDelay = maybeTime(sync.streaming.initialRetryDelayMs);
}
if (sync.polling?.baseUri) {
cf.baseUri = sync.polling.baseUri;
}
});
(modeDef.initializers ?? []).forEach((init) => {
if (init.polling?.baseUri) {
cf.baseUri = init.polling.baseUri;
}
});
};

if (options.dataSystem.connectionModeConfig) {
const connMode = options.dataSystem.connectionModeConfig;
dataSystem.automaticModeSwitching = connMode.initialConnectionMode
Expand All @@ -113,26 +131,25 @@ function makeSdkConfig(options: SDKConfigParams, tag: string) {
const connectionModes: Record<string, any> = {};
Object.entries(connMode.customConnectionModes).forEach(([modeName, modeDef]) => {
connectionModes[modeName] = translateModeDefinition(modeDef);

// Per-entry endpoint overrides also set global URIs for ServiceEndpoints
// compatibility. These override the serviceEndpoints values above.
(modeDef.synchronizers ?? []).forEach((sync) => {
if (sync.streaming?.baseUri) {
cf.streamUri = sync.streaming.baseUri;
cf.streamInitialReconnectDelay = maybeTime(sync.streaming.initialRetryDelayMs);
}
if (sync.polling?.baseUri) {
cf.baseUri = sync.polling.baseUri;
}
});
(modeDef.initializers ?? []).forEach((init) => {
if (init.polling?.baseUri) {
cf.baseUri = init.polling.baseUri;
}
});
applyEndpointOverrides(modeDef);
});
dataSystem.connectionModes = connectionModes;
}
} else if (options.dataSystem.initializers || options.dataSystem.synchronizers) {
// Top-level initializers/synchronizers (no connection modes). Wrap them
// into a single 'streaming' connection mode for the browser SDK.
const modeDef: SDKConfigModeDefinition = {
initializers: options.dataSystem.initializers,
synchronizers: options.dataSystem.synchronizers,
};
dataSystem.automaticModeSwitching = {
type: 'manual',
initialConnectionMode: 'streaming',
};
dataSystem.connectionModes = {
streaming: translateModeDefinition(modeDef),
};
applyEndpointOverrides(modeDef);
}

(cf as any).dataSystem = dataSystem;
Expand Down
15 changes: 14 additions & 1 deletion packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,23 @@ export default class DefaultBrowserEventSource implements LDEventSource {
private _es?: EventSource;
private _backoff: DefaultBackoff;
private _errorFilter: (err: HttpErrorResponse) => boolean;
private _urlBuilder?: () => string;

// The type of the handle can be platform specific and we treat is opaquely.
private _reconnectTimeoutHandle?: any;

private _listeners: Record<string, EventListener[]> = {};

constructor(
private readonly _url: string,
private _url: string,
options: EventSourceInitDict,
) {
this._backoff = new DefaultBackoff(
options.initialRetryDelayMillis,
options.retryResetIntervalMillis,
);
this._errorFilter = options.errorFilter;
this._urlBuilder = options.urlBuilder;
this._openConnection();
}

Expand All @@ -50,6 +52,9 @@ export default class DefaultBrowserEventSource implements LDEventSource {
onretrying: ((e: { delayMillis: number }) => void) | undefined;

private _openConnection() {
if (this._urlBuilder) {
this._url = this._urlBuilder();
}
this._es = new EventSource(this._url);
this._es.onopen = () => {
this._backoff.success();
Expand All @@ -58,6 +63,14 @@ export default class DefaultBrowserEventSource implements LDEventSource {
// The error could be from a polyfill, or from the browser event source, so we are loose on the
// typing.
this._es.onerror = (err: any) => {
// In browsers, a server-sent "event: error" SSE message fires both
// addEventListener('error', ...) AND onerror. We must not treat it as a
// connection failure. A server-sent error arrives as a MessageEvent while
// the connection is still open; a real connection error is a plain Event
// with readyState !== OPEN.
if (err instanceof MessageEvent) {
return;
}
this._handleError(err);
this.onerror?.(err);
};
Expand Down
7 changes: 7 additions & 0 deletions packages/shared/common/src/api/platform/EventSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,11 @@ export interface EventSourceInitDict {
initialRetryDelayMillis: number;
readTimeoutMillis: number;
retryResetIntervalMillis: number;
/**
* Optional callback that returns a fresh URL on each reconnection attempt.
* When provided, the EventSource implementation should call this instead of
* reusing the original URL. This allows query parameters (e.g. `basis`) to
* be updated between reconnections.
*/
urlBuilder?: () => string;
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ export function createStreamingBase(config: {
initialRetryDelayMillis: config.initialRetryDelayMillis,
readTimeoutMillis: 5 * 60 * 1000,
retryResetIntervalMillis: 60 * 1000,
urlBuilder: buildStreamUri,
});
eventSource = es;

Expand Down
Loading