Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions lazer/sdk/js/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,60 @@
## Contributing & Development

See [contributing.md](docs/contributing/contributing.md) for information on how to develop or contribute to this project!

## How to use

```javascript
import { PythLazerClient } from "@pythnetwork/pyth-lazer-sdk";

const c = await PythLazerClient.create({
token: "YOUR-AUTH-TOKEN-HERE",
logger: console, // Optionally log operations (to the console in this case.)
webSocketPoolConfig: {
numConnections: 4, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 4.
onError: (error) => {
console.error("⛔️ WebSocket error:", error.message);
},
// Optional configuration for resilient WebSocket connections
rwsConfig: {
heartbeatTimeoutDurationMs: 5000, // Optional heartbeat timeout duration in milliseconds
maxRetryDelayMs: 1000, // Optional maximum retry delay in milliseconds
logAfterRetryCount: 10, // Optional log after how many retries
},
},
});

c.addMessageListener((message) => {
console.info("received the following from the Lazer stream:", message);
});

// Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down)
// The connections may still try to reconnect in the background. To shut down the client completely, call shutdown().
c.addAllConnectionsDownListener(() => {
console.error("All connections are down!");
});

// Create and remove one or more subscriptions on the fly
c.subscribe({
type: "subscribe",
subscriptionId: 1,
priceFeedIds: [1, 2],
properties: ["price"],
formats: ["solana"],
deliveryFormat: "binary",
channel: "fixed_rate@200ms",
parsed: false,
jsonBinaryEncoding: "base64",
});
c.subscribe({
type: "subscribe",
subscriptionId: 2,
priceFeedIds: [1, 2, 3, 4, 5],
properties: ["price", "exponent", "publisherCount", "confidence"],
formats: ["evm"],
deliveryFormat: "json",
channel: "fixed_rate@200ms",
parsed: true,
jsonBinaryEncoding: "hex",
});
```
7 changes: 5 additions & 2 deletions lazer/sdk/js/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
{
"name": "@pythnetwork/pyth-lazer-sdk",
"version": "4.0.0",
"version": "5.0.0",
"description": "Pyth Lazer SDK",
"engines": {
"node": ">=22"
},
"publishConfig": {
"access": "public"
},
Expand Down Expand Up @@ -61,7 +64,7 @@
"license": "Apache-2.0",
"dependencies": {
"@isaacs/ttlcache": "^1.4.1",
"cross-fetch": "^4.0.0",
"buffer": "^6.0.3",
"isomorphic-ws": "^5.0.0",
"ts-log": "^2.2.7",
"ws": "^8.18.0"
Expand Down
83 changes: 43 additions & 40 deletions lazer/sdk/js/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import fetch from "cross-fetch";
import WebSocket from "isomorphic-ws";
import type { Logger } from "ts-log";
import { dummyLogger } from "ts-log";
Expand All @@ -20,6 +19,7 @@ import type {
import { BINARY_UPDATE_FORMAT_MAGIC_LE, FORMAT_MAGICS_LE } from "./protocol.js";
import type { WebSocketPoolConfig } from "./socket/websocket-pool.js";
import { WebSocketPool } from "./socket/websocket-pool.js";
import { bufferFromWebsocketData } from "./util/buffer-util.js";

export type BinaryResponse = {
subscriptionId: number;
Expand Down Expand Up @@ -113,53 +113,56 @@ export class PythLazerClient {
*/
addMessageListener(handler: (event: JsonOrBinaryResponse) => void) {
const wsp = this.getWebSocketPool();
wsp.addMessageListener((data: WebSocket.Data) => {
wsp.addMessageListener(async (data: WebSocket.Data) => {
if (typeof data == "string") {
handler({
type: "json",
value: JSON.parse(data) as Response,
});
} else if (Buffer.isBuffer(data)) {
let pos = 0;
const magic = data.subarray(pos, pos + UINT32_NUM_BYTES).readUint32LE();
pos += UINT32_NUM_BYTES;
if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) {
throw new Error("binary update format magic mismatch");
}
// TODO: some uint64 values may not be representable as Number.
const subscriptionId = Number(
data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE(),
);
pos += UINT64_NUM_BYTES;
return;
}
const buffData = await bufferFromWebsocketData(data);
let pos = 0;
const magic = buffData
.subarray(pos, pos + UINT32_NUM_BYTES)
.readUint32LE();
pos += UINT32_NUM_BYTES;
if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) {
throw new Error("binary update format magic mismatch");
}
// TODO: some uint64 values may not be representable as Number.
const subscriptionId = Number(
buffData.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE(),
);
pos += UINT64_NUM_BYTES;

const value: BinaryResponse = { subscriptionId };
while (pos < data.length) {
const len = data.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE();
pos += UINT16_NUM_BYTES;
const magic = data
.subarray(pos, pos + UINT32_NUM_BYTES)
.readUint32LE();
if (magic == FORMAT_MAGICS_LE.EVM) {
value.evm = data.subarray(pos, pos + len);
} else if (magic == FORMAT_MAGICS_LE.SOLANA) {
value.solana = data.subarray(pos, pos + len);
} else if (magic == FORMAT_MAGICS_LE.LE_ECDSA) {
value.leEcdsa = data.subarray(pos, pos + len);
} else if (magic == FORMAT_MAGICS_LE.LE_UNSIGNED) {
value.leUnsigned = data.subarray(pos, pos + len);
} else if (magic == FORMAT_MAGICS_LE.JSON) {
value.parsed = JSON.parse(
data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString(),
) as ParsedPayload;
} else {
throw new Error("unknown magic: " + magic.toString());
}
pos += len;
const value: BinaryResponse = { subscriptionId };
while (pos < buffData.length) {
const len = buffData
.subarray(pos, pos + UINT16_NUM_BYTES)
.readUint16BE();
pos += UINT16_NUM_BYTES;
const magic = buffData
.subarray(pos, pos + UINT32_NUM_BYTES)
.readUint32LE();
if (magic == FORMAT_MAGICS_LE.EVM) {
value.evm = buffData.subarray(pos, pos + len);
} else if (magic == FORMAT_MAGICS_LE.SOLANA) {
value.solana = buffData.subarray(pos, pos + len);
} else if (magic == FORMAT_MAGICS_LE.LE_ECDSA) {
value.leEcdsa = buffData.subarray(pos, pos + len);
} else if (magic == FORMAT_MAGICS_LE.LE_UNSIGNED) {
value.leUnsigned = buffData.subarray(pos, pos + len);
} else if (magic == FORMAT_MAGICS_LE.JSON) {
value.parsed = JSON.parse(
buffData.subarray(pos + UINT32_NUM_BYTES, pos + len).toString(),
) as ParsedPayload;
} else {
throw new Error(`unknown magic: ${magic.toString()}`);
}
handler({ type: "binary", value });
} else {
throw new TypeError("unexpected event data type");
pos += len;
}
handler({ type: "binary", value });
});
}

Expand Down
4 changes: 4 additions & 0 deletions lazer/sdk/js/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,7 @@ export type JsonUpdate = {
leEcdsa?: JsonBinaryData;
leUnsigned?: JsonBinaryData;
};

export enum CustomSocketClosureCodes {
CLIENT_TIMEOUT_BUT_RECONNECTING = 4000,
}
45 changes: 41 additions & 4 deletions lazer/sdk/js/src/socket/resilient-websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import WebSocket from "isomorphic-ws";
import type { Logger } from "ts-log";
import { dummyLogger } from "ts-log";

import { CustomSocketClosureCodes } from "../protocol.js";
import { envIsBrowserOrWorker } from "../util/env-util.js";

const DEFAULT_HEARTBEAT_TIMEOUT_DURATION_MS = 5000; // 5 seconds
const DEFAULT_MAX_RETRY_DELAY_MS = 1000; // 1 second'
const DEFAULT_LOG_AFTER_RETRY_COUNT = 10;
Expand All @@ -18,6 +21,21 @@ export type ResilientWebSocketConfig = {
logAfterRetryCount?: number;
};

/**
* the isomorphic-ws package ships with some slightly-erroneous typings.
* namely, it returns a WebSocket with typings that indicate the "terminate()" function
* is available on all platforms.
* Given that, under the hood, it is using the globalThis.WebSocket class, if it's available,
* and falling back to using the https://www.npmjs.com/package/ws package, this
* means there are API differences between the native WebSocket (the one in a web browser)
* and the server-side version from the "ws" package.
*
* This type creates a WebSocket type reference we use to indicate the unknown
* nature of the env in which is code is run.
*/
type UnsafeWebSocket = Omit<WebSocket, "terminate"> &
Partial<Pick<WebSocket, "terminate">>;

export class ResilientWebSocket {
private endpoint: string;
private wsOptions?: ClientOptions | ClientRequestArgs | undefined;
Expand All @@ -26,7 +44,7 @@ export class ResilientWebSocket {
private maxRetryDelayMs: number;
private logAfterRetryCount: number;

wsClient: undefined | WebSocket;
wsClient: UnsafeWebSocket | undefined;
wsUserClosed = false;
private wsFailedAttempts: number;
private heartbeatTimeout?: NodeJS.Timeout | undefined;
Expand Down Expand Up @@ -106,7 +124,13 @@ export class ResilientWebSocket {
this.retryTimeout = undefined;
}

this.wsClient = new WebSocket(this.endpoint, this.wsOptions);
// browser constructor supports a different 2nd argument for the constructor,
// so we need to ensure it's not included if we're running in that environment:
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#protocols
this.wsClient = new WebSocket(
this.endpoint,
envIsBrowserOrWorker() ? undefined : this.wsOptions,
);

this.wsClient.addEventListener("open", () => {
this.logger.info("WebSocket connection established");
Expand Down Expand Up @@ -154,8 +178,21 @@ export class ResilientWebSocket {
}

this.heartbeatTimeout = setTimeout(() => {
this.logger.warn("Connection timed out. Reconnecting...");
this.wsClient?.terminate();
const warnMsg = "Connection timed out. Reconnecting...";
this.logger.warn(warnMsg);
if (this.wsClient) {
if (typeof this.wsClient.terminate === "function") {
this.wsClient.terminate();
} else {
// terminate is an implementation detail of the node-friendly
// https://www.npmjs.com/package/ws package, but is not a native WebSocket API,
// so we have to use the close method
this.wsClient.close(
CustomSocketClosureCodes.CLIENT_TIMEOUT_BUT_RECONNECTING,
warnMsg,
);
}
}
this.handleReconnect();
}, this.heartbeatTimeoutDurationMs);
}
Expand Down
50 changes: 34 additions & 16 deletions lazer/sdk/js/src/socket/websocket-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,18 @@ import {
DEFAULT_STREAM_SERVICE_0_URL,
DEFAULT_STREAM_SERVICE_1_URL,
} from "../constants.js";
import {
addAuthTokenToWebSocketUrl,
bufferFromWebsocketData,
envIsBrowserOrWorker,
} from "../util/index.js";

const DEFAULT_NUM_CONNECTIONS = 4;

type WebSocketOnMessageCallback = (
data: WebSocket.Data,
) => void | Promise<void>;

export type WebSocketPoolConfig = {
urls?: string[];
numConnections?: number;
Expand All @@ -25,7 +34,7 @@ export class WebSocketPool {
rwsPool: ResilientWebSocket[];
private cache: TTLCache<string, boolean>;
private subscriptions: Map<number, Request>; // id -> subscription Request
private messageListeners: ((event: WebSocket.Data) => void)[];
private messageListeners: WebSocketOnMessageCallback[];
private allConnectionsDownListeners: (() => void)[];
private wasAllDown = true;
private checkConnectionStatesInterval: NodeJS.Timeout;
Expand Down Expand Up @@ -65,16 +74,19 @@ export class WebSocketPool {
const numConnections = config.numConnections ?? DEFAULT_NUM_CONNECTIONS;

for (let i = 0; i < numConnections; i++) {
const url = urls[i % urls.length];
const baseUrl = urls[i % urls.length];
const isBrowser = envIsBrowserOrWorker();
const url = isBrowser
? addAuthTokenToWebSocketUrl(baseUrl, token)
: baseUrl;
if (!url) {
throw new Error(`URLs must not be null or empty`);
}
const wsOptions = {
const wsOptions: ResilientWebSocketConfig["wsOptions"] = {
...config.rwsConfig?.wsOptions,
headers: {
Authorization: `Bearer ${token}`,
},
headers: isBrowser ? undefined : { Authorization: `Bearer ${token}` },
};

const rws = new ResilientWebSocket({
...config.rwsConfig,
endpoint: url,
Expand Down Expand Up @@ -104,7 +116,12 @@ export class WebSocketPool {
rws.onError = config.onError;
}
// Handle all client messages ourselves. Dedupe before sending to registered message handlers.
rws.onMessage = pool.dedupeHandler;
rws.onMessage = (data) => {
pool.dedupeHandler(data).catch((error: unknown) => {
const errMsg = `An error occurred in the WebSocket pool's dedupeHandler: ${error instanceof Error ? error.message : String(error)}`;
throw new Error(errMsg);
});
};
pool.rwsPool.push(rws);
rws.startWebSocket();
}
Expand Down Expand Up @@ -140,15 +157,18 @@ export class WebSocketPool {
}
}

private async constructCacheKeyFromWebsocketData(data: WebSocket.Data) {
if (typeof data === "string") return data;
const buff = await bufferFromWebsocketData(data);
return buff.toString("hex");
}

/**
* Handles incoming websocket messages by deduplicating identical messages received across
* multiple connections before forwarding to registered handlers
*/
dedupeHandler = (data: WebSocket.Data): void => {
const cacheKey =
typeof data === "string"
? data
: Buffer.from(data as Buffer).toString("hex");
dedupeHandler = async (data: WebSocket.Data): Promise<void> => {
const cacheKey = await this.constructCacheKeyFromWebsocketData(data);

if (this.cache.has(cacheKey)) {
this.logger.debug("Dropping duplicate message");
Expand All @@ -161,9 +181,7 @@ export class WebSocketPool {
this.handleErrorMessages(data);
}

for (const handler of this.messageListeners) {
handler(data);
}
await Promise.all(this.messageListeners.map((handler) => handler(data)));
};

sendRequest(request: Request) {
Expand All @@ -189,7 +207,7 @@ export class WebSocketPool {
this.sendRequest(request);
}

addMessageListener(handler: (data: WebSocket.Data) => void): void {
addMessageListener(handler: WebSocketOnMessageCallback): void {
this.messageListeners.push(handler);
}

Expand Down
Loading
Loading