From 61893978d1bead6800c096c4a3c3fe39880446b2 Mon Sep 17 00:00:00 2001 From: benduran Date: Thu, 16 Oct 2025 10:51:22 +0100 Subject: [PATCH 01/13] fix(lazer-sdk-js): updated the websocket to be truly isomorphic and also add Buffer support when running in the browser (plus better runtime env detection) --- lazer/sdk/js/package.json | 5 +- lazer/sdk/js/src/client.ts | 87 +++++++++---------- .../sdk/js/src/socket/resilient-websocket.ts | 11 ++- lazer/sdk/js/src/socket/websocket-pool.ts | 51 +++++++---- lazer/sdk/js/src/util/buffer-util.ts | 38 ++++++++ lazer/sdk/js/src/util/env-util.ts | 30 +++++++ lazer/sdk/js/src/util/index.ts | 2 + lazer/sdk/js/tsconfig.json | 5 +- pnpm-lock.yaml | 6 +- 9 files changed, 167 insertions(+), 68 deletions(-) create mode 100644 lazer/sdk/js/src/util/buffer-util.ts create mode 100644 lazer/sdk/js/src/util/env-util.ts create mode 100644 lazer/sdk/js/src/util/index.ts diff --git a/lazer/sdk/js/package.json b/lazer/sdk/js/package.json index 809427f2d0..21f061c753 100644 --- a/lazer/sdk/js/package.json +++ b/lazer/sdk/js/package.json @@ -2,6 +2,9 @@ "name": "@pythnetwork/pyth-lazer-sdk", "version": "4.0.0", "description": "Pyth Lazer SDK", + "engines": { + "node": ">=20" + }, "publishConfig": { "access": "public" }, @@ -61,7 +64,7 @@ "license": "Apache-2.0", "dependencies": { "@isaacs/ttlcache": "^1.4.1", - "cross-fetch": "^4.0.0", + "buffer": "^6.0.3", "isomorphic-ws": "^5.0.0", "ts-log": "^2.2.7", "ws": "^8.18.0" diff --git a/lazer/sdk/js/src/client.ts b/lazer/sdk/js/src/client.ts index 56c220955a..e186320034 100644 --- a/lazer/sdk/js/src/client.ts +++ b/lazer/sdk/js/src/client.ts @@ -1,4 +1,3 @@ -import fetch from "cross-fetch"; import WebSocket from "isomorphic-ws"; import type { Logger } from "ts-log"; import { dummyLogger } from "ts-log"; @@ -20,6 +19,7 @@ import type { import { BINARY_UPDATE_FORMAT_MAGIC_LE, FORMAT_MAGICS_LE } from "./protocol.js"; import type { WebSocketPoolConfig } from "./socket/websocket-pool.js"; import { WebSocketPool } from "./socket/websocket-pool.js"; +import { IsomorphicBuffer } from "./util/index.js"; export type BinaryResponse = { subscriptionId: number; @@ -31,9 +31,9 @@ export type BinaryResponse = { }; export type JsonOrBinaryResponse = | { - type: "json"; - value: Response; - } + type: "json"; + value: Response; + } | { type: "binary"; value: BinaryResponse }; const UINT16_NUM_BYTES = 2; @@ -55,7 +55,7 @@ export class PythLazerClient { private readonly priceServiceUrl: string, private readonly logger: Logger, private readonly wsp?: WebSocketPool, - ) {} + ) { } /** * Gets the WebSocket pool. If the WebSocket pool is not configured, an error is thrown. @@ -113,53 +113,52 @@ export class PythLazerClient { */ addMessageListener(handler: (event: JsonOrBinaryResponse) => void) { const wsp = this.getWebSocketPool(); - wsp.addMessageListener((data: WebSocket.Data) => { + wsp.addMessageListener(async (data: WebSocket.Data) => { if (typeof data == "string") { handler({ type: "json", value: JSON.parse(data) as Response, }); - } else if (Buffer.isBuffer(data)) { - let pos = 0; - const magic = data.subarray(pos, pos + UINT32_NUM_BYTES).readUint32LE(); - pos += UINT32_NUM_BYTES; - if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) { - throw new Error("binary update format magic mismatch"); - } - // TODO: some uint64 values may not be representable as Number. - const subscriptionId = Number( - data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE(), - ); - pos += UINT64_NUM_BYTES; + return; + } + const buffData = await IsomorphicBuffer.fromWebsocketData(data); + let pos = 0; + const magic = buffData.subarray(pos, pos + UINT32_NUM_BYTES).readUint32LE(); + pos += UINT32_NUM_BYTES; + if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) { + throw new Error("binary update format magic mismatch"); + } + // TODO: some uint64 values may not be representable as Number. + const subscriptionId = Number( + buffData.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE(), + ); + pos += UINT64_NUM_BYTES; - const value: BinaryResponse = { subscriptionId }; - while (pos < data.length) { - const len = data.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE(); - pos += UINT16_NUM_BYTES; - const magic = data - .subarray(pos, pos + UINT32_NUM_BYTES) - .readUint32LE(); - if (magic == FORMAT_MAGICS_LE.EVM) { - value.evm = data.subarray(pos, pos + len); - } else if (magic == FORMAT_MAGICS_LE.SOLANA) { - value.solana = data.subarray(pos, pos + len); - } else if (magic == FORMAT_MAGICS_LE.LE_ECDSA) { - value.leEcdsa = data.subarray(pos, pos + len); - } else if (magic == FORMAT_MAGICS_LE.LE_UNSIGNED) { - value.leUnsigned = data.subarray(pos, pos + len); - } else if (magic == FORMAT_MAGICS_LE.JSON) { - value.parsed = JSON.parse( - data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString(), - ) as ParsedPayload; - } else { - throw new Error("unknown magic: " + magic.toString()); - } - pos += len; + const value: BinaryResponse = { subscriptionId }; + while (pos < buffData.length) { + const len = buffData.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE(); + pos += UINT16_NUM_BYTES; + const magic = buffData + .subarray(pos, pos + UINT32_NUM_BYTES) + .readUint32LE(); + if (magic == FORMAT_MAGICS_LE.EVM) { + value.evm = buffData.subarray(pos, pos + len); + } else if (magic == FORMAT_MAGICS_LE.SOLANA) { + value.solana = buffData.subarray(pos, pos + len); + } else if (magic == FORMAT_MAGICS_LE.LE_ECDSA) { + value.leEcdsa = buffData.subarray(pos, pos + len); + } else if (magic == FORMAT_MAGICS_LE.LE_UNSIGNED) { + value.leUnsigned = buffData.subarray(pos, pos + len); + } else if (magic == FORMAT_MAGICS_LE.JSON) { + value.parsed = JSON.parse( + buffData.subarray(pos + UINT32_NUM_BYTES, pos + len).toString(), + ) as ParsedPayload; + } else { + throw new Error(`unknown magic: ${magic.toString()}`); } - handler({ type: "binary", value }); - } else { - throw new TypeError("unexpected event data type"); + pos += len; } + handler({ type: "binary", value }); }); } diff --git a/lazer/sdk/js/src/socket/resilient-websocket.ts b/lazer/sdk/js/src/socket/resilient-websocket.ts index 6131100d97..7b90396e47 100644 --- a/lazer/sdk/js/src/socket/resilient-websocket.ts +++ b/lazer/sdk/js/src/socket/resilient-websocket.ts @@ -5,6 +5,8 @@ import WebSocket from "isomorphic-ws"; import type { Logger } from "ts-log"; import { dummyLogger } from "ts-log"; +import { envIsServiceOrWebWorker } from "../util/env-util.js"; + const DEFAULT_HEARTBEAT_TIMEOUT_DURATION_MS = 5000; // 5 seconds const DEFAULT_MAX_RETRY_DELAY_MS = 1000; // 1 second' const DEFAULT_LOG_AFTER_RETRY_COUNT = 10; @@ -106,7 +108,10 @@ export class ResilientWebSocket { this.retryTimeout = undefined; } - this.wsClient = new WebSocket(this.endpoint, this.wsOptions); + // browser constructor supports a different 2nd argument for the constructor, + // so we need to ensure it's not included if we're running in that environment: + // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#protocols + this.wsClient = new WebSocket(this.endpoint, envIsServiceOrWebWorker() ? this.wsOptions : undefined); this.wsClient.addEventListener("open", () => { this.logger.info("WebSocket connection established"); @@ -184,8 +189,8 @@ export class ResilientWebSocket { if (this.shouldLogRetry()) { this.logger.error( "Connection closed unexpectedly or because of timeout. Reconnecting after " + - String(this.retryDelayMs()) + - "ms.", + String(this.retryDelayMs()) + + "ms.", ); } diff --git a/lazer/sdk/js/src/socket/websocket-pool.ts b/lazer/sdk/js/src/socket/websocket-pool.ts index f56a91ec24..6285996f05 100644 --- a/lazer/sdk/js/src/socket/websocket-pool.ts +++ b/lazer/sdk/js/src/socket/websocket-pool.ts @@ -11,9 +11,12 @@ import { DEFAULT_STREAM_SERVICE_0_URL, DEFAULT_STREAM_SERVICE_1_URL, } from "../constants.js"; +import { envIsBrowserOrWorker, IsomorphicBuffer } from "../util/index.js"; const DEFAULT_NUM_CONNECTIONS = 4; +type WebSocketOnMessageCallback = (data: WebSocket.Data) => void | Promise; + export type WebSocketPoolConfig = { urls?: string[]; numConnections?: number; @@ -25,7 +28,7 @@ export class WebSocketPool { rwsPool: ResilientWebSocket[]; private cache: TTLCache; private subscriptions: Map; // id -> subscription Request - private messageListeners: ((event: WebSocket.Data) => void)[]; + private messageListeners: WebSocketOnMessageCallback[]; private allConnectionsDownListeners: (() => void)[]; private wasAllDown = true; private checkConnectionStatesInterval: NodeJS.Timeout; @@ -65,16 +68,28 @@ export class WebSocketPool { const numConnections = config.numConnections ?? DEFAULT_NUM_CONNECTIONS; for (let i = 0; i < numConnections; i++) { - const url = urls[i % urls.length]; + let url = urls[i % urls.length]; if (!url) { throw new Error(`URLs must not be null or empty`); } - const wsOptions = { + const wsOptions: ResilientWebSocketConfig['wsOptions'] = { ...config.rwsConfig?.wsOptions, - headers: { - Authorization: `Bearer ${token}`, - }, }; + + if (envIsBrowserOrWorker()) { + // we are in a browser environment where the websocket protocol + // doesn't support sending headers in the initial upgrade request, + // so we add the token as a query param, which the server already supports + const parsedUrl = new URL(url); + parsedUrl.searchParams.set('ACCESS_TOKEN', token); + url = parsedUrl.toString(); + } else { + // we are in a server-side javascript runtime context + wsOptions.headers = { + Authorization: `Bearer ${token}`, + }; + } + const rws = new ResilientWebSocket({ ...config.rwsConfig, endpoint: url, @@ -104,7 +119,9 @@ export class WebSocketPool { rws.onError = config.onError; } // Handle all client messages ourselves. Dedupe before sending to registered message handlers. - rws.onMessage = pool.dedupeHandler; + rws.onMessage = data => { + void pool.dedupeHandler(data); + }; pool.rwsPool.push(rws); rws.startWebSocket(); } @@ -144,11 +161,14 @@ export class WebSocketPool { * Handles incoming websocket messages by deduplicating identical messages received across * multiple connections before forwarding to registered handlers */ - dedupeHandler = (data: WebSocket.Data): void => { - const cacheKey = - typeof data === "string" - ? data - : Buffer.from(data as Buffer).toString("hex"); + dedupeHandler = async (data: WebSocket.Data): Promise => { + let cacheKey = ''; + if (typeof data === 'string') { + cacheKey = data; + } else { + const buff = await IsomorphicBuffer.fromWebsocketData(data); + cacheKey = buff.toString('hex'); + } if (this.cache.has(cacheKey)) { this.logger.debug("Dropping duplicate message"); @@ -161,9 +181,8 @@ export class WebSocketPool { this.handleErrorMessages(data); } - for (const handler of this.messageListeners) { - handler(data); - } + await Promise.all(this.messageListeners.map(handler => handler(data))); + }; sendRequest(request: Request) { @@ -189,7 +208,7 @@ export class WebSocketPool { this.sendRequest(request); } - addMessageListener(handler: (data: WebSocket.Data) => void): void { + addMessageListener(handler: WebSocketOnMessageCallback): void { this.messageListeners.push(handler); } diff --git a/lazer/sdk/js/src/util/buffer-util.ts b/lazer/sdk/js/src/util/buffer-util.ts new file mode 100644 index 0000000000..8eb45d8a7d --- /dev/null +++ b/lazer/sdk/js/src/util/buffer-util.ts @@ -0,0 +1,38 @@ +// the linting rules don't allow importing anything that might clash with +// a global, top-level import. we disable this rule because we need this +// imported from our installed dependency +// eslint-disable-next-line unicorn/prefer-node-protocol +import { Buffer as BrowserBuffer } from 'buffer'; + +import type { Data } from 'isomorphic-ws'; + +const { Buffer: PossibleBuiltInBuffer } = globalThis as Partial<{ Buffer: typeof Buffer }>; + +const BufferClassToUse = PossibleBuiltInBuffer ?? BrowserBuffer; + +export class IsomorphicBuffer extends BufferClassToUse { + /** + * given a relatively unknown websocket frame data object, + * returns a valid Buffer instance that is safe to use + * isomorphically in any JS runtime environment + */ + static async fromWebsocketData(data: Data) { + if (typeof data === 'string') { + return BufferClassToUse.from(new TextEncoder().encode(data).buffer); + } + if (data instanceof Blob) { + // let the uncaught promise exception bubble up if there's an issue + return BufferClassToUse.from(await data.arrayBuffer()); + } + if (data instanceof ArrayBuffer) return BufferClassToUse.from(data); + if (Buffer.isBuffer(data)) { + const arrBuffer = new ArrayBuffer(data.length); + const v = new Uint8Array(arrBuffer); + for (const [i, item] of data.entries()) { + v[i] = item; + } + return BufferClassToUse.from(arrBuffer); + } + throw new TypeError("unexpected event data type found when IsomorphicBuffer.fromWebsocketData() called"); + } +} diff --git a/lazer/sdk/js/src/util/env-util.ts b/lazer/sdk/js/src/util/env-util.ts new file mode 100644 index 0000000000..be792f3362 --- /dev/null +++ b/lazer/sdk/js/src/util/env-util.ts @@ -0,0 +1,30 @@ +// we create this local-only type, which has assertions made to indicate +// that we do not know and cannot guarantee which JS environment we are in +const g = globalThis as Partial<{ self: typeof globalThis.self; window: typeof globalThis.window }> + +/** + * Detects if this code is running within any Service or WebWorker context. + * @returns true if in a worker of some kind, false if otherwise + */ +export function envIsServiceOrWebWorker() { + const possiblyInAWorker = typeof WorkerGlobalScope !== 'undefined' && g.self !== undefined; + return possiblyInAWorker && g.self instanceof WorkerGlobalScope; +} + +/** + * Detects if the code is running in a regular DOM or Web Worker context. + * @returns true if running in a DOM or Web Worker context, false if running in Node.js + */ +export function envIsBrowser() { + return g.window !== undefined; +} + +/** + * a convenience method that returns whether or not + * this code is executing in some type of browser-centric environment + * + * @returns true if in the browser's main UI thread or in a worker, false if otherwise + */ +export function envIsBrowserOrWorker() { + return envIsServiceOrWebWorker() || envIsBrowser(); +} diff --git a/lazer/sdk/js/src/util/index.ts b/lazer/sdk/js/src/util/index.ts new file mode 100644 index 0000000000..d21025f710 --- /dev/null +++ b/lazer/sdk/js/src/util/index.ts @@ -0,0 +1,2 @@ +export * from "./buffer-util.js"; +export * from "./env-util.js"; diff --git a/lazer/sdk/js/tsconfig.json b/lazer/sdk/js/tsconfig.json index 32a3705250..e4af8b1248 100644 --- a/lazer/sdk/js/tsconfig.json +++ b/lazer/sdk/js/tsconfig.json @@ -1,4 +1,7 @@ { "extends": "@cprussin/tsconfig/base.json", - "exclude": ["node_modules", "dist"] + "exclude": ["node_modules", "dist"], + "compilerOptions": { + "lib": ["DOM", "DOM.Iterable", "WebWorker"] + } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2990378003..8ecf2e92cf 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2077,9 +2077,9 @@ importers: '@isaacs/ttlcache': specifier: ^1.4.1 version: 1.4.1 - cross-fetch: - specifier: ^4.0.0 - version: 4.1.0(encoding@0.1.13) + buffer: + specifier: ^6.0.3 + version: 6.0.3 isomorphic-ws: specifier: ^5.0.0 version: 5.0.0(ws@8.18.1(bufferutil@4.0.9)(utf-8-validate@6.0.3)) From 5bb7fd58592b52312bfdd5de6afa166d2ac98400 Mon Sep 17 00:00:00 2001 From: benduran Date: Thu, 16 Oct 2025 11:15:01 +0100 Subject: [PATCH 02/13] fix(lazer-sdk-js): inverted the logic to ensure additional options are being sent on node WebSocket connections --- lazer/sdk/js/src/socket/resilient-websocket.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lazer/sdk/js/src/socket/resilient-websocket.ts b/lazer/sdk/js/src/socket/resilient-websocket.ts index 7b90396e47..2b275158ee 100644 --- a/lazer/sdk/js/src/socket/resilient-websocket.ts +++ b/lazer/sdk/js/src/socket/resilient-websocket.ts @@ -111,7 +111,7 @@ export class ResilientWebSocket { // browser constructor supports a different 2nd argument for the constructor, // so we need to ensure it's not included if we're running in that environment: // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#protocols - this.wsClient = new WebSocket(this.endpoint, envIsServiceOrWebWorker() ? this.wsOptions : undefined); + this.wsClient = new WebSocket(this.endpoint, envIsServiceOrWebWorker() ? undefined : this.wsOptions); this.wsClient.addEventListener("open", () => { this.logger.info("WebSocket connection established"); From 84190a481818d5d82ebf62a4aa856d3bfbdf963a Mon Sep 17 00:00:00 2001 From: benduran Date: Thu, 16 Oct 2025 11:19:56 +0100 Subject: [PATCH 03/13] docs(lazer-sdk-js): updated the readme with an up-to-date sample --- lazer/sdk/js/README.md | 57 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/lazer/sdk/js/README.md b/lazer/sdk/js/README.md index 65e587c6c9..c88660a840 100644 --- a/lazer/sdk/js/README.md +++ b/lazer/sdk/js/README.md @@ -3,3 +3,60 @@ ## Contributing & Development See [contributing.md](docs/contributing/contributing.md) for information on how to develop or contribute to this project! + +## How to use + +```javascript +import { PythLazerClient } from '@pythnetwork/pyth-lazer-sdk'; + +const c = await PythLazerClient.create({ + token: 'YOUR-AUTH-TOKEN-HERE', + logger: console, // Optionally log operations (to the console in this case.) + webSocketPoolConfig: { + numConnections: 4, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 4. + onError: error => { + console.error('⛔️ WebSocket error:', error.message); + }, + // Optional configuration for resilient WebSocket connections + rwsConfig: { + heartbeatTimeoutDurationMs: 5000, // Optional heartbeat timeout duration in milliseconds + maxRetryDelayMs: 1000, // Optional maximum retry delay in milliseconds + logAfterRetryCount: 10, // Optional log after how many retries + }, + }, + }); + + c.addMessageListener(message => { + console.info('received the following from the Lazer stream:', message); + }); + + // Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down) + // The connections may still try to reconnect in the background. To shut down the client completely, call shutdown(). + c.addAllConnectionsDownListener(() => { + console.error('All connections are down!'); + }); + + // Create and remove one or more subscriptions on the fly + c.subscribe({ + type: 'subscribe', + subscriptionId: 1, + priceFeedIds: [1, 2], + properties: ['price'], + formats: ['solana'], + deliveryFormat: 'binary', + channel: 'fixed_rate@200ms', + parsed: false, + jsonBinaryEncoding: 'base64', + }); + c.subscribe({ + type: 'subscribe', + subscriptionId: 2, + priceFeedIds: [1, 2, 3, 4, 5], + properties: ['price', 'exponent', 'publisherCount', 'confidence'], + formats: ['evm'], + deliveryFormat: 'json', + channel: 'fixed_rate@200ms', + parsed: true, + jsonBinaryEncoding: 'hex', + }); +``` From b0d6e55e1cb0a4ffc0f450500dc5a39d282c8718 Mon Sep 17 00:00:00 2001 From: benduran Date: Thu, 16 Oct 2025 12:31:04 +0100 Subject: [PATCH 04/13] chore(lazer-sdk-js): fixed formatting --- lazer/sdk/js/README.md | 98 +++++++++---------- lazer/sdk/js/src/client.ts | 16 +-- .../sdk/js/src/socket/resilient-websocket.ts | 24 ++++- lazer/sdk/js/src/socket/websocket-pool.ts | 19 ++-- lazer/sdk/js/src/util/buffer-util.ts | 22 +++-- lazer/sdk/js/src/util/env-util.ts | 10 +- 6 files changed, 108 insertions(+), 81 deletions(-) diff --git a/lazer/sdk/js/README.md b/lazer/sdk/js/README.md index c88660a840..356cc8a8c3 100644 --- a/lazer/sdk/js/README.md +++ b/lazer/sdk/js/README.md @@ -7,56 +7,56 @@ See [contributing.md](docs/contributing/contributing.md) for information on how ## How to use ```javascript -import { PythLazerClient } from '@pythnetwork/pyth-lazer-sdk'; +import { PythLazerClient } from "@pythnetwork/pyth-lazer-sdk"; const c = await PythLazerClient.create({ - token: 'YOUR-AUTH-TOKEN-HERE', - logger: console, // Optionally log operations (to the console in this case.) - webSocketPoolConfig: { - numConnections: 4, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 4. - onError: error => { - console.error('⛔️ WebSocket error:', error.message); - }, - // Optional configuration for resilient WebSocket connections - rwsConfig: { - heartbeatTimeoutDurationMs: 5000, // Optional heartbeat timeout duration in milliseconds - maxRetryDelayMs: 1000, // Optional maximum retry delay in milliseconds - logAfterRetryCount: 10, // Optional log after how many retries - }, + token: "YOUR-AUTH-TOKEN-HERE", + logger: console, // Optionally log operations (to the console in this case.) + webSocketPoolConfig: { + numConnections: 4, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 4. + onError: (error) => { + console.error("⛔️ WebSocket error:", error.message); }, - }); - - c.addMessageListener(message => { - console.info('received the following from the Lazer stream:', message); - }); - - // Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down) - // The connections may still try to reconnect in the background. To shut down the client completely, call shutdown(). - c.addAllConnectionsDownListener(() => { - console.error('All connections are down!'); - }); - - // Create and remove one or more subscriptions on the fly - c.subscribe({ - type: 'subscribe', - subscriptionId: 1, - priceFeedIds: [1, 2], - properties: ['price'], - formats: ['solana'], - deliveryFormat: 'binary', - channel: 'fixed_rate@200ms', - parsed: false, - jsonBinaryEncoding: 'base64', - }); - c.subscribe({ - type: 'subscribe', - subscriptionId: 2, - priceFeedIds: [1, 2, 3, 4, 5], - properties: ['price', 'exponent', 'publisherCount', 'confidence'], - formats: ['evm'], - deliveryFormat: 'json', - channel: 'fixed_rate@200ms', - parsed: true, - jsonBinaryEncoding: 'hex', - }); + // Optional configuration for resilient WebSocket connections + rwsConfig: { + heartbeatTimeoutDurationMs: 5000, // Optional heartbeat timeout duration in milliseconds + maxRetryDelayMs: 1000, // Optional maximum retry delay in milliseconds + logAfterRetryCount: 10, // Optional log after how many retries + }, + }, +}); + +c.addMessageListener((message) => { + console.info("received the following from the Lazer stream:", message); +}); + +// Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down) +// The connections may still try to reconnect in the background. To shut down the client completely, call shutdown(). +c.addAllConnectionsDownListener(() => { + console.error("All connections are down!"); +}); + +// Create and remove one or more subscriptions on the fly +c.subscribe({ + type: "subscribe", + subscriptionId: 1, + priceFeedIds: [1, 2], + properties: ["price"], + formats: ["solana"], + deliveryFormat: "binary", + channel: "fixed_rate@200ms", + parsed: false, + jsonBinaryEncoding: "base64", +}); +c.subscribe({ + type: "subscribe", + subscriptionId: 2, + priceFeedIds: [1, 2, 3, 4, 5], + properties: ["price", "exponent", "publisherCount", "confidence"], + formats: ["evm"], + deliveryFormat: "json", + channel: "fixed_rate@200ms", + parsed: true, + jsonBinaryEncoding: "hex", +}); ``` diff --git a/lazer/sdk/js/src/client.ts b/lazer/sdk/js/src/client.ts index e186320034..50ac75d407 100644 --- a/lazer/sdk/js/src/client.ts +++ b/lazer/sdk/js/src/client.ts @@ -31,9 +31,9 @@ export type BinaryResponse = { }; export type JsonOrBinaryResponse = | { - type: "json"; - value: Response; - } + type: "json"; + value: Response; + } | { type: "binary"; value: BinaryResponse }; const UINT16_NUM_BYTES = 2; @@ -55,7 +55,7 @@ export class PythLazerClient { private readonly priceServiceUrl: string, private readonly logger: Logger, private readonly wsp?: WebSocketPool, - ) { } + ) {} /** * Gets the WebSocket pool. If the WebSocket pool is not configured, an error is thrown. @@ -123,7 +123,9 @@ export class PythLazerClient { } const buffData = await IsomorphicBuffer.fromWebsocketData(data); let pos = 0; - const magic = buffData.subarray(pos, pos + UINT32_NUM_BYTES).readUint32LE(); + const magic = buffData + .subarray(pos, pos + UINT32_NUM_BYTES) + .readUint32LE(); pos += UINT32_NUM_BYTES; if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) { throw new Error("binary update format magic mismatch"); @@ -136,7 +138,9 @@ export class PythLazerClient { const value: BinaryResponse = { subscriptionId }; while (pos < buffData.length) { - const len = buffData.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE(); + const len = buffData + .subarray(pos, pos + UINT16_NUM_BYTES) + .readUint16BE(); pos += UINT16_NUM_BYTES; const magic = buffData .subarray(pos, pos + UINT32_NUM_BYTES) diff --git a/lazer/sdk/js/src/socket/resilient-websocket.ts b/lazer/sdk/js/src/socket/resilient-websocket.ts index 2b275158ee..212653db2c 100644 --- a/lazer/sdk/js/src/socket/resilient-websocket.ts +++ b/lazer/sdk/js/src/socket/resilient-websocket.ts @@ -28,7 +28,9 @@ export class ResilientWebSocket { private maxRetryDelayMs: number; private logAfterRetryCount: number; - wsClient: undefined | WebSocket; + wsClient: + | undefined + | (Omit & Partial>); wsUserClosed = false; private wsFailedAttempts: number; private heartbeatTimeout?: NodeJS.Timeout | undefined; @@ -111,7 +113,10 @@ export class ResilientWebSocket { // browser constructor supports a different 2nd argument for the constructor, // so we need to ensure it's not included if we're running in that environment: // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#protocols - this.wsClient = new WebSocket(this.endpoint, envIsServiceOrWebWorker() ? undefined : this.wsOptions); + this.wsClient = new WebSocket( + this.endpoint, + envIsServiceOrWebWorker() ? undefined : this.wsOptions, + ); this.wsClient.addEventListener("open", () => { this.logger.info("WebSocket connection established"); @@ -160,7 +165,16 @@ export class ResilientWebSocket { this.heartbeatTimeout = setTimeout(() => { this.logger.warn("Connection timed out. Reconnecting..."); - this.wsClient?.terminate(); + if (this.wsClient) { + if (typeof this.wsClient.terminate === "function") { + this.wsClient.terminate(); + } else { + // terminate is an implementation detail of the node-friendly + // https://www.npmjs.com/package/ws package, but is not a native WebSocket API, + // so we have to use the close method + this.wsClient.close(); + } + } this.handleReconnect(); }, this.heartbeatTimeoutDurationMs); } @@ -189,8 +203,8 @@ export class ResilientWebSocket { if (this.shouldLogRetry()) { this.logger.error( "Connection closed unexpectedly or because of timeout. Reconnecting after " + - String(this.retryDelayMs()) + - "ms.", + String(this.retryDelayMs()) + + "ms.", ); } diff --git a/lazer/sdk/js/src/socket/websocket-pool.ts b/lazer/sdk/js/src/socket/websocket-pool.ts index 6285996f05..153a7a5711 100644 --- a/lazer/sdk/js/src/socket/websocket-pool.ts +++ b/lazer/sdk/js/src/socket/websocket-pool.ts @@ -15,7 +15,9 @@ import { envIsBrowserOrWorker, IsomorphicBuffer } from "../util/index.js"; const DEFAULT_NUM_CONNECTIONS = 4; -type WebSocketOnMessageCallback = (data: WebSocket.Data) => void | Promise; +type WebSocketOnMessageCallback = ( + data: WebSocket.Data, +) => void | Promise; export type WebSocketPoolConfig = { urls?: string[]; @@ -72,7 +74,7 @@ export class WebSocketPool { if (!url) { throw new Error(`URLs must not be null or empty`); } - const wsOptions: ResilientWebSocketConfig['wsOptions'] = { + const wsOptions: ResilientWebSocketConfig["wsOptions"] = { ...config.rwsConfig?.wsOptions, }; @@ -81,7 +83,7 @@ export class WebSocketPool { // doesn't support sending headers in the initial upgrade request, // so we add the token as a query param, which the server already supports const parsedUrl = new URL(url); - parsedUrl.searchParams.set('ACCESS_TOKEN', token); + parsedUrl.searchParams.set("ACCESS_TOKEN", token); url = parsedUrl.toString(); } else { // we are in a server-side javascript runtime context @@ -119,7 +121,7 @@ export class WebSocketPool { rws.onError = config.onError; } // Handle all client messages ourselves. Dedupe before sending to registered message handlers. - rws.onMessage = data => { + rws.onMessage = (data) => { void pool.dedupeHandler(data); }; pool.rwsPool.push(rws); @@ -162,12 +164,12 @@ export class WebSocketPool { * multiple connections before forwarding to registered handlers */ dedupeHandler = async (data: WebSocket.Data): Promise => { - let cacheKey = ''; - if (typeof data === 'string') { + let cacheKey = ""; + if (typeof data === "string") { cacheKey = data; } else { const buff = await IsomorphicBuffer.fromWebsocketData(data); - cacheKey = buff.toString('hex'); + cacheKey = buff.toString("hex"); } if (this.cache.has(cacheKey)) { @@ -181,8 +183,7 @@ export class WebSocketPool { this.handleErrorMessages(data); } - await Promise.all(this.messageListeners.map(handler => handler(data))); - + await Promise.all(this.messageListeners.map((handler) => handler(data))); }; sendRequest(request: Request) { diff --git a/lazer/sdk/js/src/util/buffer-util.ts b/lazer/sdk/js/src/util/buffer-util.ts index 8eb45d8a7d..69c2e2a641 100644 --- a/lazer/sdk/js/src/util/buffer-util.ts +++ b/lazer/sdk/js/src/util/buffer-util.ts @@ -2,22 +2,24 @@ // a global, top-level import. we disable this rule because we need this // imported from our installed dependency // eslint-disable-next-line unicorn/prefer-node-protocol -import { Buffer as BrowserBuffer } from 'buffer'; +import { Buffer as BrowserBuffer } from "buffer"; -import type { Data } from 'isomorphic-ws'; +import type { Data } from "isomorphic-ws"; -const { Buffer: PossibleBuiltInBuffer } = globalThis as Partial<{ Buffer: typeof Buffer }>; +const { Buffer: PossibleBuiltInBuffer } = globalThis as Partial<{ + Buffer: typeof Buffer; +}>; const BufferClassToUse = PossibleBuiltInBuffer ?? BrowserBuffer; export class IsomorphicBuffer extends BufferClassToUse { /** - * given a relatively unknown websocket frame data object, - * returns a valid Buffer instance that is safe to use - * isomorphically in any JS runtime environment - */ + * given a relatively unknown websocket frame data object, + * returns a valid Buffer instance that is safe to use + * isomorphically in any JS runtime environment + */ static async fromWebsocketData(data: Data) { - if (typeof data === 'string') { + if (typeof data === "string") { return BufferClassToUse.from(new TextEncoder().encode(data).buffer); } if (data instanceof Blob) { @@ -33,6 +35,8 @@ export class IsomorphicBuffer extends BufferClassToUse { } return BufferClassToUse.from(arrBuffer); } - throw new TypeError("unexpected event data type found when IsomorphicBuffer.fromWebsocketData() called"); + throw new TypeError( + "unexpected event data type found when IsomorphicBuffer.fromWebsocketData() called", + ); } } diff --git a/lazer/sdk/js/src/util/env-util.ts b/lazer/sdk/js/src/util/env-util.ts index be792f3362..891d132512 100644 --- a/lazer/sdk/js/src/util/env-util.ts +++ b/lazer/sdk/js/src/util/env-util.ts @@ -1,13 +1,17 @@ // we create this local-only type, which has assertions made to indicate // that we do not know and cannot guarantee which JS environment we are in -const g = globalThis as Partial<{ self: typeof globalThis.self; window: typeof globalThis.window }> +const g = globalThis as Partial<{ + self: typeof globalThis.self; + window: typeof globalThis.window; +}>; /** * Detects if this code is running within any Service or WebWorker context. * @returns true if in a worker of some kind, false if otherwise */ export function envIsServiceOrWebWorker() { - const possiblyInAWorker = typeof WorkerGlobalScope !== 'undefined' && g.self !== undefined; + const possiblyInAWorker = + typeof WorkerGlobalScope !== "undefined" && g.self !== undefined; return possiblyInAWorker && g.self instanceof WorkerGlobalScope; } @@ -22,7 +26,7 @@ export function envIsBrowser() { /** * a convenience method that returns whether or not * this code is executing in some type of browser-centric environment - * + * * @returns true if in the browser's main UI thread or in a worker, false if otherwise */ export function envIsBrowserOrWorker() { From 2e3f822a35bfa9481e42eacc104815d15976c3d5 Mon Sep 17 00:00:00 2001 From: benduran Date: Thu, 16 Oct 2025 12:34:05 +0100 Subject: [PATCH 05/13] fix(lazer-sdk-js): use terminate, if it's available. otherwise, use the standard close method with a custom error code --- lazer/sdk/js/src/protocol.ts | 4 ++++ lazer/sdk/js/src/socket/resilient-websocket.ts | 9 +++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/lazer/sdk/js/src/protocol.ts b/lazer/sdk/js/src/protocol.ts index 30a1bb83ab..2050b46e25 100644 --- a/lazer/sdk/js/src/protocol.ts +++ b/lazer/sdk/js/src/protocol.ts @@ -158,3 +158,7 @@ export type JsonUpdate = { leEcdsa?: JsonBinaryData; leUnsigned?: JsonBinaryData; }; + +export enum CustomSocketClosureCodes { + CLIENT_TIMEOUT_BUT_RECONNECTING = 4000, +} diff --git a/lazer/sdk/js/src/socket/resilient-websocket.ts b/lazer/sdk/js/src/socket/resilient-websocket.ts index 212653db2c..ba47b242e5 100644 --- a/lazer/sdk/js/src/socket/resilient-websocket.ts +++ b/lazer/sdk/js/src/socket/resilient-websocket.ts @@ -5,6 +5,7 @@ import WebSocket from "isomorphic-ws"; import type { Logger } from "ts-log"; import { dummyLogger } from "ts-log"; +import { CustomSocketClosureCodes } from "../protocol.js"; import { envIsServiceOrWebWorker } from "../util/env-util.js"; const DEFAULT_HEARTBEAT_TIMEOUT_DURATION_MS = 5000; // 5 seconds @@ -164,7 +165,8 @@ export class ResilientWebSocket { } this.heartbeatTimeout = setTimeout(() => { - this.logger.warn("Connection timed out. Reconnecting..."); + const warnMsg = "Connection timed out. Reconnecting..."; + this.logger.warn(warnMsg); if (this.wsClient) { if (typeof this.wsClient.terminate === "function") { this.wsClient.terminate(); @@ -172,7 +174,10 @@ export class ResilientWebSocket { // terminate is an implementation detail of the node-friendly // https://www.npmjs.com/package/ws package, but is not a native WebSocket API, // so we have to use the close method - this.wsClient.close(); + this.wsClient.close( + CustomSocketClosureCodes.CLIENT_TIMEOUT_BUT_RECONNECTING, + warnMsg, + ); } } this.handleReconnect(); From 7efe095f1050c60023b25acfa082a96cf58d96f2 Mon Sep 17 00:00:00 2001 From: benduran Date: Thu, 16 Oct 2025 16:07:33 +0100 Subject: [PATCH 06/13] chore(lazer-sdk-js): bumped engine range --- lazer/sdk/js/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lazer/sdk/js/package.json b/lazer/sdk/js/package.json index 21f061c753..d75f89737a 100644 --- a/lazer/sdk/js/package.json +++ b/lazer/sdk/js/package.json @@ -3,7 +3,7 @@ "version": "4.0.0", "description": "Pyth Lazer SDK", "engines": { - "node": ">=20" + "node": ">=22" }, "publishConfig": { "access": "public" From d4cf06be69d3e5609a1c312cb621e99dd2e3c046 Mon Sep 17 00:00:00 2001 From: benduran Date: Fri, 17 Oct 2025 10:04:48 +0100 Subject: [PATCH 07/13] chore(lazer-sdk-js): responded to PR feedback about the typescript typing --- .../sdk/js/src/socket/resilient-websocket.ts | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/lazer/sdk/js/src/socket/resilient-websocket.ts b/lazer/sdk/js/src/socket/resilient-websocket.ts index ba47b242e5..ddbebf6aad 100644 --- a/lazer/sdk/js/src/socket/resilient-websocket.ts +++ b/lazer/sdk/js/src/socket/resilient-websocket.ts @@ -21,6 +21,20 @@ export type ResilientWebSocketConfig = { logAfterRetryCount?: number; }; +/** + * the isomorphic-ws package ships with some slightly-erroneous typings. + * namely, it returns a WebSocket with typings that indicate the "terminate()" function + * is available on all platforms. + * Given that, under the hood, it is using the globalThis.WebSocket class, if it's available, + * and falling back to using the https://www.npmjs.com/package/ws package, this + * means there are API differences between the native WebSocket (the one in a web browser) + * and the server-side version from the "ws" package. + * + * This type creates a WebSocket type reference we use to indicate the unknown + * nature of the env in which is code is run. + */ +type UnsafeWebSocket = (Omit & Partial>); + export class ResilientWebSocket { private endpoint: string; private wsOptions?: ClientOptions | ClientRequestArgs | undefined; @@ -29,9 +43,7 @@ export class ResilientWebSocket { private maxRetryDelayMs: number; private logAfterRetryCount: number; - wsClient: - | undefined - | (Omit & Partial>); + wsClient: UnsafeWebSocket | undefined; wsUserClosed = false; private wsFailedAttempts: number; private heartbeatTimeout?: NodeJS.Timeout | undefined; @@ -208,8 +220,8 @@ export class ResilientWebSocket { if (this.shouldLogRetry()) { this.logger.error( "Connection closed unexpectedly or because of timeout. Reconnecting after " + - String(this.retryDelayMs()) + - "ms.", + String(this.retryDelayMs()) + + "ms.", ); } From 20e664aab4b618d602b2f550f4ac38715e3322d1 Mon Sep 17 00:00:00 2001 From: benduran Date: Fri, 17 Oct 2025 11:21:38 +0100 Subject: [PATCH 08/13] chore(lazer-sdk-js): responded to PR feedback about how the URL for the connection is constructed and fixed a websocket constructor bug --- .../sdk/js/src/socket/resilient-websocket.ts | 4 ++-- lazer/sdk/js/src/socket/websocket-pool.ts | 21 +++++-------------- lazer/sdk/js/src/util/index.ts | 1 + lazer/sdk/js/src/util/url-util.ts | 16 ++++++++++++++ 4 files changed, 24 insertions(+), 18 deletions(-) create mode 100644 lazer/sdk/js/src/util/url-util.ts diff --git a/lazer/sdk/js/src/socket/resilient-websocket.ts b/lazer/sdk/js/src/socket/resilient-websocket.ts index ddbebf6aad..b5a3868c83 100644 --- a/lazer/sdk/js/src/socket/resilient-websocket.ts +++ b/lazer/sdk/js/src/socket/resilient-websocket.ts @@ -6,7 +6,7 @@ import type { Logger } from "ts-log"; import { dummyLogger } from "ts-log"; import { CustomSocketClosureCodes } from "../protocol.js"; -import { envIsServiceOrWebWorker } from "../util/env-util.js"; +import { envIsBrowserOrWorker } from "../util/env-util.js"; const DEFAULT_HEARTBEAT_TIMEOUT_DURATION_MS = 5000; // 5 seconds const DEFAULT_MAX_RETRY_DELAY_MS = 1000; // 1 second' @@ -128,7 +128,7 @@ export class ResilientWebSocket { // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#protocols this.wsClient = new WebSocket( this.endpoint, - envIsServiceOrWebWorker() ? undefined : this.wsOptions, + envIsBrowserOrWorker() ? undefined : this.wsOptions, ); this.wsClient.addEventListener("open", () => { diff --git a/lazer/sdk/js/src/socket/websocket-pool.ts b/lazer/sdk/js/src/socket/websocket-pool.ts index 153a7a5711..cbb434f2c3 100644 --- a/lazer/sdk/js/src/socket/websocket-pool.ts +++ b/lazer/sdk/js/src/socket/websocket-pool.ts @@ -11,7 +11,7 @@ import { DEFAULT_STREAM_SERVICE_0_URL, DEFAULT_STREAM_SERVICE_1_URL, } from "../constants.js"; -import { envIsBrowserOrWorker, IsomorphicBuffer } from "../util/index.js"; +import { addAuthTokenToWebSocketUrl, envIsBrowserOrWorker, IsomorphicBuffer } from "../util/index.js"; const DEFAULT_NUM_CONNECTIONS = 4; @@ -70,28 +70,17 @@ export class WebSocketPool { const numConnections = config.numConnections ?? DEFAULT_NUM_CONNECTIONS; for (let i = 0; i < numConnections; i++) { - let url = urls[i % urls.length]; + const baseUrl = urls[i % urls.length]; + const isBrowser = envIsBrowserOrWorker(); + const url = isBrowser ? addAuthTokenToWebSocketUrl(baseUrl, token) : baseUrl; if (!url) { throw new Error(`URLs must not be null or empty`); } const wsOptions: ResilientWebSocketConfig["wsOptions"] = { ...config.rwsConfig?.wsOptions, + headers: isBrowser ? undefined : { Authorization: `Bearer ${token}` }, }; - if (envIsBrowserOrWorker()) { - // we are in a browser environment where the websocket protocol - // doesn't support sending headers in the initial upgrade request, - // so we add the token as a query param, which the server already supports - const parsedUrl = new URL(url); - parsedUrl.searchParams.set("ACCESS_TOKEN", token); - url = parsedUrl.toString(); - } else { - // we are in a server-side javascript runtime context - wsOptions.headers = { - Authorization: `Bearer ${token}`, - }; - } - const rws = new ResilientWebSocket({ ...config.rwsConfig, endpoint: url, diff --git a/lazer/sdk/js/src/util/index.ts b/lazer/sdk/js/src/util/index.ts index d21025f710..fb145f4520 100644 --- a/lazer/sdk/js/src/util/index.ts +++ b/lazer/sdk/js/src/util/index.ts @@ -1,2 +1,3 @@ export * from "./buffer-util.js"; export * from "./env-util.js"; +export * from "./url-util.js"; \ No newline at end of file diff --git a/lazer/sdk/js/src/util/url-util.ts b/lazer/sdk/js/src/util/url-util.ts new file mode 100644 index 0000000000..adc6b40c5d --- /dev/null +++ b/lazer/sdk/js/src/util/url-util.ts @@ -0,0 +1,16 @@ +const ACCESS_TOKEN_QUERY_PARAM_KEY = 'ACCESS_TOKEN'; + +/** + * Given a URL to a hosted lazer stream service and a possible auth token, + * appends the auth token as a query parameter and returns the URL with the token + * contained within. + * If the URL provided is nullish, it is returned as-is (in the same nullish format). + * If the token is nullish, the baseUrl given is returned, instead. + */ +export function addAuthTokenToWebSocketUrl(baseUrl: string | null | undefined, authToken: string | null | undefined) { + if (!baseUrl || !authToken) return baseUrl; + const parsedUrl = new URL(baseUrl); + parsedUrl.searchParams.set(ACCESS_TOKEN_QUERY_PARAM_KEY, authToken); + + return parsedUrl.toString(); +} \ No newline at end of file From 8ec22ae61f70992c1846f511473ade1248e01623 Mon Sep 17 00:00:00 2001 From: benduran Date: Fri, 17 Oct 2025 11:25:43 +0100 Subject: [PATCH 09/13] chore(lazer-sdk-js): made a slightly more specific error message and throw it to let the browser bubble it up --- lazer/sdk/js/src/socket/websocket-pool.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lazer/sdk/js/src/socket/websocket-pool.ts b/lazer/sdk/js/src/socket/websocket-pool.ts index cbb434f2c3..2c2fb0d7d9 100644 --- a/lazer/sdk/js/src/socket/websocket-pool.ts +++ b/lazer/sdk/js/src/socket/websocket-pool.ts @@ -111,7 +111,10 @@ export class WebSocketPool { } // Handle all client messages ourselves. Dedupe before sending to registered message handlers. rws.onMessage = (data) => { - void pool.dedupeHandler(data); + pool.dedupeHandler(data).catch((error: unknown) => { + const errMsg = `An error occurred in the WebSocket pool's dedupeHandler: ${error instanceof Error ? error.message : String(error)}`; + throw new Error(errMsg); + }); }; pool.rwsPool.push(rws); rws.startWebSocket(); From 7aebe1f1b2c2709ca9de50a2e0b09efc8496d4b5 Mon Sep 17 00:00:00 2001 From: benduran Date: Fri, 17 Oct 2025 11:42:48 +0100 Subject: [PATCH 10/13] chore(lazer-sdk-js): responded to further PR feedback about typing exhaustivity, using a utility function for buffer creation, as well as one for adding the token to the URL --- lazer/sdk/js/src/client.ts | 4 +- .../sdk/js/src/socket/resilient-websocket.ts | 9 +-- lazer/sdk/js/src/socket/websocket-pool.ts | 24 +++++--- lazer/sdk/js/src/util/buffer-util.ts | 60 +++++++++---------- lazer/sdk/js/src/util/index.ts | 2 +- lazer/sdk/js/src/util/url-util.ts | 9 ++- 6 files changed, 57 insertions(+), 51 deletions(-) diff --git a/lazer/sdk/js/src/client.ts b/lazer/sdk/js/src/client.ts index 50ac75d407..ec87689efb 100644 --- a/lazer/sdk/js/src/client.ts +++ b/lazer/sdk/js/src/client.ts @@ -19,7 +19,7 @@ import type { import { BINARY_UPDATE_FORMAT_MAGIC_LE, FORMAT_MAGICS_LE } from "./protocol.js"; import type { WebSocketPoolConfig } from "./socket/websocket-pool.js"; import { WebSocketPool } from "./socket/websocket-pool.js"; -import { IsomorphicBuffer } from "./util/index.js"; +import { bufferFromWebsocketData } from "./util/buffer-util.js"; export type BinaryResponse = { subscriptionId: number; @@ -121,7 +121,7 @@ export class PythLazerClient { }); return; } - const buffData = await IsomorphicBuffer.fromWebsocketData(data); + const buffData = await bufferFromWebsocketData(data); let pos = 0; const magic = buffData .subarray(pos, pos + UINT32_NUM_BYTES) diff --git a/lazer/sdk/js/src/socket/resilient-websocket.ts b/lazer/sdk/js/src/socket/resilient-websocket.ts index b5a3868c83..b23af3775e 100644 --- a/lazer/sdk/js/src/socket/resilient-websocket.ts +++ b/lazer/sdk/js/src/socket/resilient-websocket.ts @@ -29,11 +29,12 @@ export type ResilientWebSocketConfig = { * and falling back to using the https://www.npmjs.com/package/ws package, this * means there are API differences between the native WebSocket (the one in a web browser) * and the server-side version from the "ws" package. - * + * * This type creates a WebSocket type reference we use to indicate the unknown * nature of the env in which is code is run. */ -type UnsafeWebSocket = (Omit & Partial>); +type UnsafeWebSocket = Omit & + Partial>; export class ResilientWebSocket { private endpoint: string; @@ -220,8 +221,8 @@ export class ResilientWebSocket { if (this.shouldLogRetry()) { this.logger.error( "Connection closed unexpectedly or because of timeout. Reconnecting after " + - String(this.retryDelayMs()) + - "ms.", + String(this.retryDelayMs()) + + "ms.", ); } diff --git a/lazer/sdk/js/src/socket/websocket-pool.ts b/lazer/sdk/js/src/socket/websocket-pool.ts index 2c2fb0d7d9..eee5ddb3bc 100644 --- a/lazer/sdk/js/src/socket/websocket-pool.ts +++ b/lazer/sdk/js/src/socket/websocket-pool.ts @@ -11,7 +11,11 @@ import { DEFAULT_STREAM_SERVICE_0_URL, DEFAULT_STREAM_SERVICE_1_URL, } from "../constants.js"; -import { addAuthTokenToWebSocketUrl, envIsBrowserOrWorker, IsomorphicBuffer } from "../util/index.js"; +import { + addAuthTokenToWebSocketUrl, + bufferFromWebsocketData, + envIsBrowserOrWorker, +} from "../util/index.js"; const DEFAULT_NUM_CONNECTIONS = 4; @@ -72,7 +76,9 @@ export class WebSocketPool { for (let i = 0; i < numConnections; i++) { const baseUrl = urls[i % urls.length]; const isBrowser = envIsBrowserOrWorker(); - const url = isBrowser ? addAuthTokenToWebSocketUrl(baseUrl, token) : baseUrl; + const url = isBrowser + ? addAuthTokenToWebSocketUrl(baseUrl, token) + : baseUrl; if (!url) { throw new Error(`URLs must not be null or empty`); } @@ -151,18 +157,18 @@ export class WebSocketPool { } } + private async constructCacheKeyFromWebsocketData(data: WebSocket.Data) { + if (typeof data === "string") return data; + const buff = await bufferFromWebsocketData(data); + return buff.toString("hex"); + } + /** * Handles incoming websocket messages by deduplicating identical messages received across * multiple connections before forwarding to registered handlers */ dedupeHandler = async (data: WebSocket.Data): Promise => { - let cacheKey = ""; - if (typeof data === "string") { - cacheKey = data; - } else { - const buff = await IsomorphicBuffer.fromWebsocketData(data); - cacheKey = buff.toString("hex"); - } + const cacheKey = await this.constructCacheKeyFromWebsocketData(data); if (this.cache.has(cacheKey)) { this.logger.debug("Dropping duplicate message"); diff --git a/lazer/sdk/js/src/util/buffer-util.ts b/lazer/sdk/js/src/util/buffer-util.ts index 69c2e2a641..8f9df33a6d 100644 --- a/lazer/sdk/js/src/util/buffer-util.ts +++ b/lazer/sdk/js/src/util/buffer-util.ts @@ -6,37 +6,33 @@ import { Buffer as BrowserBuffer } from "buffer"; import type { Data } from "isomorphic-ws"; -const { Buffer: PossibleBuiltInBuffer } = globalThis as Partial<{ - Buffer: typeof Buffer; -}>; - -const BufferClassToUse = PossibleBuiltInBuffer ?? BrowserBuffer; - -export class IsomorphicBuffer extends BufferClassToUse { - /** - * given a relatively unknown websocket frame data object, - * returns a valid Buffer instance that is safe to use - * isomorphically in any JS runtime environment - */ - static async fromWebsocketData(data: Data) { - if (typeof data === "string") { - return BufferClassToUse.from(new TextEncoder().encode(data).buffer); - } - if (data instanceof Blob) { - // let the uncaught promise exception bubble up if there's an issue - return BufferClassToUse.from(await data.arrayBuffer()); - } - if (data instanceof ArrayBuffer) return BufferClassToUse.from(data); - if (Buffer.isBuffer(data)) { - const arrBuffer = new ArrayBuffer(data.length); - const v = new Uint8Array(arrBuffer); - for (const [i, item] of data.entries()) { - v[i] = item; - } - return BufferClassToUse.from(arrBuffer); - } - throw new TypeError( - "unexpected event data type found when IsomorphicBuffer.fromWebsocketData() called", - ); +const BufferClassToUse = + "Buffer" in globalThis ? globalThis.Buffer : BrowserBuffer; + +/** + * given a relatively unknown websocket frame data object, + * returns a valid Buffer instance that is safe to use + * isomorphically in any JS runtime environment + */ +export async function bufferFromWebsocketData(data: Data): Promise { + if (typeof data === "string") { + return BufferClassToUse.from(new TextEncoder().encode(data).buffer); + } + + if (data instanceof BufferClassToUse) return data; + + if (data instanceof Blob) { + // let the uncaught promise exception bubble up if there's an issue + return BufferClassToUse.from(await data.arrayBuffer()); } + + if (data instanceof ArrayBuffer) return BufferClassToUse.from(data); + + if (Array.isArray(data)) { + // an array of buffers is highly unlikely, but it is a possibility + // indicated by the WebSocket Data interface + return BufferClassToUse.concat(data); + } + + return data; } diff --git a/lazer/sdk/js/src/util/index.ts b/lazer/sdk/js/src/util/index.ts index fb145f4520..e4155c22ab 100644 --- a/lazer/sdk/js/src/util/index.ts +++ b/lazer/sdk/js/src/util/index.ts @@ -1,3 +1,3 @@ export * from "./buffer-util.js"; export * from "./env-util.js"; -export * from "./url-util.js"; \ No newline at end of file +export * from "./url-util.js"; diff --git a/lazer/sdk/js/src/util/url-util.ts b/lazer/sdk/js/src/util/url-util.ts index adc6b40c5d..db835e5599 100644 --- a/lazer/sdk/js/src/util/url-util.ts +++ b/lazer/sdk/js/src/util/url-util.ts @@ -1,4 +1,4 @@ -const ACCESS_TOKEN_QUERY_PARAM_KEY = 'ACCESS_TOKEN'; +const ACCESS_TOKEN_QUERY_PARAM_KEY = "ACCESS_TOKEN"; /** * Given a URL to a hosted lazer stream service and a possible auth token, @@ -7,10 +7,13 @@ const ACCESS_TOKEN_QUERY_PARAM_KEY = 'ACCESS_TOKEN'; * If the URL provided is nullish, it is returned as-is (in the same nullish format). * If the token is nullish, the baseUrl given is returned, instead. */ -export function addAuthTokenToWebSocketUrl(baseUrl: string | null | undefined, authToken: string | null | undefined) { +export function addAuthTokenToWebSocketUrl( + baseUrl: string | null | undefined, + authToken: string | null | undefined, +) { if (!baseUrl || !authToken) return baseUrl; const parsedUrl = new URL(baseUrl); parsedUrl.searchParams.set(ACCESS_TOKEN_QUERY_PARAM_KEY, authToken); return parsedUrl.toString(); -} \ No newline at end of file +} From 8baec64433940479ac9079f537be0c782207f205 Mon Sep 17 00:00:00 2001 From: benduran Date: Fri, 17 Oct 2025 11:45:03 +0100 Subject: [PATCH 11/13] chore(lazer-sdk-js): more PR feedback changes about brevity --- lazer/sdk/js/src/util/env-util.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lazer/sdk/js/src/util/env-util.ts b/lazer/sdk/js/src/util/env-util.ts index 891d132512..86137bda3d 100644 --- a/lazer/sdk/js/src/util/env-util.ts +++ b/lazer/sdk/js/src/util/env-util.ts @@ -10,9 +10,7 @@ const g = globalThis as Partial<{ * @returns true if in a worker of some kind, false if otherwise */ export function envIsServiceOrWebWorker() { - const possiblyInAWorker = - typeof WorkerGlobalScope !== "undefined" && g.self !== undefined; - return possiblyInAWorker && g.self instanceof WorkerGlobalScope; + return typeof WorkerGlobalScope !== "undefined" && g.self instanceof WorkerGlobalScope; } /** From 27d0b5d6cd38467830e075b9185b228122b60d13 Mon Sep 17 00:00:00 2001 From: benduran Date: Fri, 17 Oct 2025 11:46:46 +0100 Subject: [PATCH 12/13] feat(lazer-sdk-js)!: major bump, considering the changes --- lazer/sdk/js/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lazer/sdk/js/package.json b/lazer/sdk/js/package.json index d75f89737a..bc18b8d0c5 100644 --- a/lazer/sdk/js/package.json +++ b/lazer/sdk/js/package.json @@ -1,6 +1,6 @@ { "name": "@pythnetwork/pyth-lazer-sdk", - "version": "4.0.0", + "version": "5.0.0", "description": "Pyth Lazer SDK", "engines": { "node": ">=22" From f7277e9df320edd9b7d7e044a54ef6d6b67d9b50 Mon Sep 17 00:00:00 2001 From: benduran Date: Fri, 17 Oct 2025 11:53:58 +0100 Subject: [PATCH 13/13] chore(lazer-sdk-js): linting issue --- lazer/sdk/js/src/util/env-util.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lazer/sdk/js/src/util/env-util.ts b/lazer/sdk/js/src/util/env-util.ts index 86137bda3d..78e763ce02 100644 --- a/lazer/sdk/js/src/util/env-util.ts +++ b/lazer/sdk/js/src/util/env-util.ts @@ -10,7 +10,10 @@ const g = globalThis as Partial<{ * @returns true if in a worker of some kind, false if otherwise */ export function envIsServiceOrWebWorker() { - return typeof WorkerGlobalScope !== "undefined" && g.self instanceof WorkerGlobalScope; + return ( + typeof WorkerGlobalScope !== "undefined" && + g.self instanceof WorkerGlobalScope + ); } /**