Skip to content

Commit

Permalink
feat: add web socket based network info provider
Browse files Browse the repository at this point in the history
  • Loading branch information
iccicci committed Jul 15, 2024
1 parent 297e034 commit 6616043
Show file tree
Hide file tree
Showing 26 changed files with 955 additions and 27 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/continuous-integration-e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ env:
HANDLE_PROVIDER_PARAMS: '{"baseUrl":"http://localhost:4011/"}'
STAKE_POOL_CONNECTION_STRING: 'postgresql://postgres:doNoUseThisSecret!@localhost:5435/stake_pool'
STAKE_POOL_TEST_CONNECTION_STRING: 'postgresql://postgres:doNoUseThisSecret!@localhost:5435/stake_pool_test'
NETWORK_INFO_PROVIDER: 'http'
NETWORK_INFO_PROVIDER: 'ws'
NETWORK_INFO_PROVIDER_PARAMS: '{"baseUrl":"http://localhost:4000/"}'
OGMIOS_URL: 'ws://localhost:1340/'
REWARDS_PROVIDER: 'http'
Expand All @@ -26,6 +26,7 @@ env:
UTXO_PROVIDER_PARAMS: '{"baseUrl":"http://localhost:4000/"}'
STAKE_POOL_PROVIDER: 'http'
STAKE_POOL_PROVIDER_PARAMS: '{"baseUrl":"http://localhost:4000/"}'
WS_PROVIDER_URL: 'http://localhost:4100/ws'

on:
pull_request:
Expand Down
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,8 @@ COPY compose/projector/init.* ./
RUN chmod 755 init.sh
HEALTHCHECK CMD test `curl --fail --silent http://localhost:3000/v1.0.0/health | jq -r ".services[0].projectedTip.blockNo"` -gt 1
CMD ["./init.sh"]

FROM cardano-services as ws-server
WORKDIR /app/packages/cardano-services
HEALTHCHECK CMD curl --fail --silent http://localhost:3000/health
CMD ["bash", "-c", "../../node_modules/.bin/tsx watch --clear-screen=false --conditions=development src/cli start-ws-server"]
23 changes: 23 additions & 0 deletions compose/common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ x-sdk-environment: &sdk-environment
POSTGRES_USER_FILE_HANDLE: /run/secrets/postgres_user
POSTGRES_USER_FILE_STAKE_POOL: /run/secrets/postgres_user
TOKEN_METADATA_SERVER_URL: https://metadata.world.dev.cardano.org
USE_WEB_SOCKET_API: true
WEB_SOCKET_API_URL: ws://ws-server:3000/ws

services:
cardano-db-sync:
Expand Down Expand Up @@ -357,6 +359,27 @@ services:
ports:
- ${HANDLE_API_PORT:-4011}:3000

ws-server:
<<:
- *from-sdk
- *logging
- *provider-server
- *with-postgres
build:
args:
- NETWORK=${NETWORK:-mainnet}
context: ../../
target: ws-server
environment:
<<:
- *sdk-environment
- *provider-server-environment
ports:
- ${WS_SERVER_PORT:-4100}:3000
restart: always
volumes:
- ../..:/app

secrets:
# Replicates the db-sync secret for historical reasons.
# When the SDK was using only one database (the db-sync one) the only secret for database name used was this one
Expand Down
4 changes: 3 additions & 1 deletion packages/cardano-services-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@
"@cardano-sdk/util": "workspace:~",
"axios": "^0.28.0",
"class-validator": "^0.14.0",
"isomorphic-ws": "^5.0.0",
"json-bigint": "~1.0.0",
"ts-log": "^2.2.4"
"ts-log": "^2.2.4",
"ws": "^8.17.1"
},
"files": [
"dist/*",
Expand Down
228 changes: 228 additions & 0 deletions packages/cardano-services-client/src/WebSocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/* eslint-disable unicorn/prefer-add-event-listener */

import {
AsyncReturnType,
Cardano,
EpochInfo,
EraSummary,
HealthCheckResponse,
NetworkInfoMethods,
NetworkInfoProvider,
StakeSummary,
SupplySummary,
WSMessage,
createSlotEpochInfoCalc
} from '@cardano-sdk/core';
import { Logger } from 'ts-log';
import { Observable, ReplaySubject, Subject, firstValueFrom } from 'rxjs';

import { fromSerializableObject } from '@cardano-sdk/util';
import WebSocket from 'isomorphic-ws';

const NOT_CONNECTED_ID = 'not-connected';

type WSStatus = 'connecting' | 'connected' | 'idle' | 'stop';

export type WSHandler = (message: WSMessage) => void;

export interface WsClientConfiguration {
/** The interval in seconds between two heartbeat messages. Default 55". */
heartbeatInterval?: number;

/** The interval in seconds after which a request must timeout. Default 60". */
requestTimeout?: number;

/** The WebSocket server URL. */
url: URL;
}

export interface WsClientDependencies {
/** The logger. */
logger: Logger;
}

interface EpochRollover {
epochInfo: EpochInfo;
eraSummaries: EraSummary[];
ledgerTip: Cardano.Tip;
lovelaceSupply: SupplySummary;
protocolParameters: Cardano.ProtocolParameters;
}

const isEventError = (error: unknown): error is { error: Error } =>
// eslint-disable-next-line @typescript-eslint/no-explicit-any
typeof error === 'object' && !!error && (error as any).error instanceof Error;

export class CardanoWsClient {
/** The client id, assigned by the server. */
clientId = NOT_CONNECTED_ID;

/** Emits on epoch rollover. */
epoch$: Observable<EpochRollover>;

/** Emits the health state. */
health$: Observable<HealthCheckResponse>;

/** The `Observable` form of `NetworkInfoProvider`. */
networkInfo: {
[m in `${NetworkInfoMethods}$`]: Observable<
AsyncReturnType<NetworkInfoProvider[m extends `${infer o}$` ? o : never]>
>;
};

/** WebSocket based `NetworkInfoProvider` implementation. */
networkInfoProvider: NetworkInfoProvider;

private epochSubject$: Subject<EpochRollover>;
private healthSubject$: ReplaySubject<HealthCheckResponse>;
private heartbeatInterval: number;
private heartbeatTimeout: NodeJS.Timeout | undefined;
private logger: Logger;
private status: WSStatus = 'idle';
private url: URL;
private ws: WebSocket;

private networkInfoSubjects = {} as {
[m in `${NetworkInfoMethods}$`]: ReplaySubject<
AsyncReturnType<NetworkInfoProvider[m extends `${infer o}$` ? o : never]>
>;
};

constructor(deps: WsClientDependencies, cfg: WsClientConfiguration) {
this.epoch$ = this.epochSubject$ = new Subject<EpochRollover>();
this.health$ = this.healthSubject$ = new ReplaySubject<HealthCheckResponse>(1);
this.heartbeatInterval = (cfg.heartbeatInterval || 55) * 1000;
this.logger = deps.logger;
this.url = cfg.url;

this.networkInfoSubjects = {
eraSummaries$: new ReplaySubject<EraSummary[]>(1),
genesisParameters$: new ReplaySubject<Cardano.CompactGenesis>(1),
ledgerTip$: new ReplaySubject<Cardano.Tip>(1),
lovelaceSupply$: new ReplaySubject<SupplySummary>(1),
protocolParameters$: new ReplaySubject<Cardano.ProtocolParameters>(1),
stake$: new ReplaySubject<StakeSummary>(1)
};
this.networkInfo = this.networkInfoSubjects;

this.networkInfoProvider = {
eraSummaries: () => firstValueFrom(this.networkInfo.eraSummaries$),
genesisParameters: () => firstValueFrom(this.networkInfo.genesisParameters$),
healthCheck: () => firstValueFrom(this.health$),
ledgerTip: () => firstValueFrom(this.networkInfo.ledgerTip$),
lovelaceSupply: () => firstValueFrom(this.networkInfo.lovelaceSupply$),
protocolParameters: () => firstValueFrom(this.networkInfo.protocolParameters$),
stake: () => firstValueFrom(this.networkInfo.stake$)
};

this.connect();
}

private connect() {
this.status = 'connecting';

const ws = new WebSocket(this.url);

// eslint-disable-next-line sonarjs/cognitive-complexity, complexity
ws.onmessage = (event) => {
try {
if (typeof event.data !== 'string') throw new Error('Unexpected data from WebSocket ');

const message = fromSerializableObject<WSMessage>(JSON.parse(event.data));
const { clientId, networkInfo } = message;

if (clientId) this.logger.info(`Connected with clientId ${(this.clientId = clientId)}`);

if (networkInfo) {
const { eraSummaries, genesisParameters, ledgerTip, lovelaceSupply, protocolParameters, stake } = networkInfo;

if (eraSummaries) this.networkInfoSubjects.eraSummaries$.next(eraSummaries);
if (genesisParameters) this.networkInfoSubjects.genesisParameters$.next(genesisParameters);
if (lovelaceSupply) this.networkInfoSubjects.lovelaceSupply$.next(lovelaceSupply);
if (protocolParameters) this.networkInfoSubjects.protocolParameters$.next(protocolParameters);
if (stake) this.networkInfoSubjects.stake$.next(stake);

// Emit ledgerTip as last one
if (ledgerTip) this.networkInfoSubjects.ledgerTip$.next(ledgerTip);

// If it is an epoch rollover, emit it
if (eraSummaries && ledgerTip && lovelaceSupply && protocolParameters && !clientId) {
const epochInfo = createSlotEpochInfoCalc(eraSummaries)(ledgerTip.slot);

this.epochSubject$.next({ epochInfo, eraSummaries, ledgerTip, lovelaceSupply, protocolParameters });
}
}
} catch (error) {
this.logger.error(error, 'While parsing message', event.data, this.clientId);
}
};

ws.onclose = () => {
this.logger.info('WebSocket client connection closed', this.clientId);

if (this.heartbeatTimeout) {
clearInterval(this.heartbeatTimeout);
this.heartbeatTimeout = undefined;
}

this.clientId = NOT_CONNECTED_ID;
this.retry();
};

ws.onerror = (error: unknown) => {
const err = isEventError(error) ? error.error : new Error(`Unknown error: ${JSON.stringify(error)}`);

this.logger.error(err, 'Async error from WebSocket client', this.clientId);
ws.close();
this.healthSubject$.next({ ok: false, reason: err.message });
};

ws.onopen = () => {
this.status = 'connected';
this.ws = ws;
this.heartbeat();
this.healthSubject$.next({ ok: true });
};
}

private heartbeat() {
if (this.heartbeatTimeout) clearInterval(this.heartbeatTimeout);

this.heartbeatTimeout = setTimeout(() => {
try {
this.request({});
} catch (error) {
this.logger.error(error, 'Error while refreshing heartbeat', this.clientId);
}
}, this.heartbeatInterval);
this.heartbeatTimeout.unref();
}

private retry() {
if (this.status !== 'stop') {
this.status = 'idle';
setTimeout(() => this.connect(), 10_000).unref();
}
}

/** Closes the WebSocket connection. */
close() {
this.status = 'stop';
this.ws.close();
}

/**
* Sends a request through WS to server.
*
* @param request the request.
* @returns `true` is sent, otherwise `false`.
*/
private request(request: WSMessage) {
if (this.status !== 'connected') return false;

this.ws!.send(JSON.stringify(request));
this.heartbeat();

return true;
}
}
1 change: 1 addition & 0 deletions packages/cardano-services-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ export * from './NetworkInfoProvider';
export * from './RewardsProvider';
export * from './HandleProvider';
export * from './version';
export * from './WebSocket';
6 changes: 5 additions & 1 deletion packages/cardano-services/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@
"@types/express-prometheus-middleware": "^1.2.1",
"@types/lodash": "^4.14.182",
"@types/pg": "^8.6.5",
"@types/uuid": "^8.3.4",
"@types/wait-on": "^5.3.1",
"@types/ws": "^8.5.10",
"axios-mock-adapter": "^1.20.0",
"cbor": "^8.1.0",
"delay": "^5.0.0",
Expand Down Expand Up @@ -122,7 +124,9 @@
"rxjs": "^7.4.0",
"ts-custom-error": "^3.2.0",
"ts-log": "^2.2.4",
"typeorm": "^0.3.15"
"typeorm": "^0.3.15",
"uuid": "^10.0.0",
"ws": "^8.17.1"
},
"files": [
"dist/*",
Expand Down
Loading

0 comments on commit 6616043

Please sign in to comment.