Skip to content

Commit

Permalink
feat(web3): add ability to pass different websocket endpoint #17387 (…
Browse files Browse the repository at this point in the history
…#17556)
  • Loading branch information
beautyfree committed May 27, 2021
1 parent bbc0de6 commit aa5e806
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 24 deletions.
39 changes: 16 additions & 23 deletions src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import assert from 'assert';
import bs58 from 'bs58';
import {Buffer} from 'buffer';
import {parse as urlParse, format as urlFormat} from 'url';
import {parse as urlParse} from 'url';
import fetch, {Response} from 'node-fetch';
import {
type as pick,
Expand Down Expand Up @@ -36,6 +36,7 @@ import {Message} from './message';
import {sleep} from './util/sleep';
import {promiseTimeout} from './util/promise-timeout';
import {toBuffer} from './util/to-buffer';
import {makeWebsocketUrl} from './util/url';
import type {Blockhash} from './blockhash';
import type {FeeCalculator} from './fee-calculator';
import type {TransactionSignature} from './transaction';
Expand Down Expand Up @@ -1917,6 +1918,8 @@ export type FetchMiddleware = (
export type ConnectionConfig = {
/** Optional commitment level */
commitment?: Commitment;
/** Optional endpoint URL to the fullnode JSON RPC PubSub WebSocket Endpoint */
wsEndpoint?: string;
/** Optional HTTP headers object */
httpHeaders?: HttpHeaders;
/** Optional fetch middleware callback */
Expand All @@ -1929,6 +1932,7 @@ export type ConnectionConfig = {
export class Connection {
/** @internal */ _commitment?: Commitment;
/** @internal */ _rpcEndpoint: string;
/** @internal */ _rpcWsEndpoint: string;
/** @internal */ _rpcClient: RpcClient;
/** @internal */ _rpcRequest: RpcRequest;
/** @internal */ _rpcBatchRequest: RpcBatchRequest;
Expand All @@ -1948,6 +1952,11 @@ export class Connection {
lastFetch: number;
simulatedSignatures: Array<string>;
transactionSignatures: Array<string>;
} = {
recentBlockhash: null,
lastFetch: 0,
transactionSignatures: [],
simulatedSignatures: [],
};

/** @internal */ _accountChangeSubscriptionCounter: number = 0;
Expand Down Expand Up @@ -1995,21 +2004,24 @@ export class Connection {
endpoint: string,
commitmentOrConfig?: Commitment | ConnectionConfig,
) {
this._rpcEndpoint = endpoint;

let url = urlParse(endpoint);
const useHttps = url.protocol === 'https:';

let wsEndpoint;
let httpHeaders;
let fetchMiddleware;
if (commitmentOrConfig && typeof commitmentOrConfig === 'string') {
this._commitment = commitmentOrConfig;
} else if (commitmentOrConfig) {
this._commitment = commitmentOrConfig.commitment;
wsEndpoint = commitmentOrConfig.wsEndpoint;
httpHeaders = commitmentOrConfig.httpHeaders;
fetchMiddleware = commitmentOrConfig.fetchMiddleware;
}

this._rpcEndpoint = endpoint;
this._rpcWsEndpoint = wsEndpoint || makeWebsocketUrl(endpoint);

this._rpcClient = createRpcClient(
url.href,
useHttps,
Expand All @@ -2019,26 +2031,7 @@ export class Connection {
this._rpcRequest = createRpcRequest(this._rpcClient);
this._rpcBatchRequest = createRpcBatchRequest(this._rpcClient);

this._blockhashInfo = {
recentBlockhash: null,
lastFetch: 0,
transactionSignatures: [],
simulatedSignatures: [],
};

url.protocol = useHttps ? 'wss:' : 'ws:';
url.host = '';
// Only shift the port by +1 as a convention for ws(s) only if given endpoint
// is explictly specifying the endpoint port (HTTP-based RPC), assuming
// we're directly trying to connect to solana-validator's ws listening port.
// When the endpoint omits the port, we're connecting to the protocol
// default ports: http(80) or https(443) and it's assumed we're behind a reverse
// proxy which manages WebSocket upgrade and backend port redirection.
if (url.port !== null) {
url.port = String(Number(url.port) + 1);
}

this._rpcWebSocket = new RpcWebSocketClient(urlFormat(url), {
this._rpcWebSocket = new RpcWebSocketClient(this._rpcWsEndpoint, {
autoconnect: false,
max_reconnects: Infinity,
});
Expand Down
20 changes: 20 additions & 0 deletions src/util/url.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import {format as urlFormat, parse as urlParse} from 'url';

export function makeWebsocketUrl(endpoint: string) {
let url = urlParse(endpoint);
const useHttps = url.protocol === 'https:';

url.protocol = useHttps ? 'wss:' : 'ws:';
url.host = '';

// Only shift the port by +1 as a convention for ws(s) only if given endpoint
// is explictly specifying the endpoint port (HTTP-based RPC), assuming
// we're directly trying to connect to solana-validator's ws listening port.
// When the endpoint omits the port, we're connecting to the protocol
// default ports: http(80) or https(443) and it's assumed we're behind a reverse
// proxy which manages WebSocket upgrade and backend port redirection.
if (url.port !== null) {
url.port = String(Number(url.port) + 1);
}
return urlFormat(url);
}
4 changes: 4 additions & 0 deletions test/url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,9 @@ export const url = process.env.TEST_LIVE
? 'http://localhost:8899/'
: 'http://localhost:9999/';

export const wsUrl = process.env.TEST_LIVE
? 'ws://localhost:8900/'
: 'ws://localhost:9999/';

//export const url = 'https://devnet.solana.com/';
//export const url = 'http://devnet.solana.com/';
18 changes: 17 additions & 1 deletion test/websocket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {expect, use} from 'chai';
import chaiAsPromised from 'chai-as-promised';

import {Connection} from '../src';
import {url} from './url';
import {url, wsUrl} from './url';
import {sleep} from '../src/util/sleep';

use(chaiAsPromised);
Expand Down Expand Up @@ -63,5 +63,21 @@ if (process.env.TEST_LIVE) {
await sleep(1100);
expect(connection._rpcWebSocketIdleTimeout).to.eq(null);
});

it('connect by websocket endpoint from options', async () => {
let connection = new Connection('', {
wsEndpoint: wsUrl,
});

const testSignature = bs58.encode(Buffer.alloc(64));
const id = connection.onSignature(testSignature, () => {});

// wait for websocket to connect
await sleep(100);
expect(connection._rpcWebSocketConnected).to.be.true;
expect(connection._rpcWebSocketHeartbeat).not.to.eq(null);

await connection.removeSignatureListener(id);
});
});
}

0 comments on commit aa5e806

Please sign in to comment.