Skip to content

Commit

Permalink
refactor(cardano-services): rework epoch poll
Browse files Browse the repository at this point in the history
- rework EpochPollService as singleton class, used across services
  • Loading branch information
Ivaylo Andonov committed Aug 8, 2022
1 parent 9b3e26c commit 49eb011
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 76 deletions.
Expand Up @@ -9,14 +9,13 @@ import {
TimeSettings
} from '@cardano-sdk/core';
import { DbSyncProvider } from '../../DbSyncProvider';
import { EpochPollService } from '../../util/polling/EpochPollService';
import { GenesisData } from './types';
import { InMemoryCache, UNLIMITED_CACHE_TTL } from '../../InMemoryCache';
import { Logger } from 'ts-log';
import { NetworkInfoBuilder } from './NetworkInfoBuilder';
import { NetworkInfoCacheKey } from '.';
import { Pool } from 'pg';
import { Shutdown } from '@cardano-sdk/util';
import { epochPollService } from './utils';
import {
loadGenesisData,
toGenesisParams,
Expand All @@ -41,8 +40,7 @@ export class DbSyncNetworkInfoProvider extends DbSyncProvider implements Network
#cache: InMemoryCache;
#builder: NetworkInfoBuilder;
#genesisDataReady: Promise<GenesisData>;
#epochPollInterval: number;
#epochPollService: Shutdown | null;
#epochPollService: EpochPollService;
#cardanoNode: CardanoNode;

constructor(
Expand All @@ -52,10 +50,10 @@ export class DbSyncNetworkInfoProvider extends DbSyncProvider implements Network
super(db);
this.#logger = logger;
this.#cache = cache;
this.#builder = new NetworkInfoBuilder(db, logger);
this.#genesisDataReady = loadGenesisData(cardanoNodeConfigPath);
this.#epochPollInterval = epochPollInterval;
this.#cardanoNode = cardanoNode;
this.#genesisDataReady = loadGenesisData(cardanoNodeConfigPath);
this.#builder = new NetworkInfoBuilder(db, logger);
this.#epochPollService = EpochPollService.create(cache, () => this.#builder.queryLatestEpoch(), epochPollInterval);
}

public async ledgerTip(): Promise<Cardano.Tip> {
Expand Down Expand Up @@ -115,19 +113,13 @@ export class DbSyncNetworkInfoProvider extends DbSyncProvider implements Network
}

async start(): Promise<void> {
this.#epochPollService.start();
await this.#cardanoNode.initialize();
if (!this.#epochPollService)
this.#epochPollService = epochPollService(
this.#cache,
() => this.#builder.queryLatestEpoch(),
this.#epochPollInterval
);
}

async close(): Promise<void> {
this.#epochPollService?.shutdown();
this.#epochPollService = null;
this.#cache.shutdown();
this.#epochPollService.stop();
await this.#cardanoNode.shutdown();
}
}
@@ -1,3 +1,2 @@
export * from './DbSyncNetworkInfoProvider';
export * from './utils';
export * as NetworkInfoCacheKey from './keys';
Expand Up @@ -2,5 +2,4 @@ export const TOTAL_SUPPLY = 'NetworkInfo_total_supply';
export const CIRCULATING_SUPPLY = 'NetworkInfo_circulating_supply';
export const ACTIVE_STAKE = 'NetworkInfo_active_stake';
export const LIVE_STAKE = 'NetworkInfo_live_stake';
export const CURRENT_EPOCH = 'NetworkInfo_current_epoch';
export const ERA_SUMMARIES = 'NetworkInfo_era_summaries';

This file was deleted.

2 changes: 1 addition & 1 deletion packages/cardano-services/src/cli.ts
Expand Up @@ -20,7 +20,7 @@ import {
} from './ProgramsCommon';
import { DB_CACHE_TTL_DEFAULT } from './InMemoryCache';
import { DEFAULT_TOKEN_METADATA_CACHE_TTL, DEFAULT_TOKEN_METADATA_SERVER_URL } from './Asset';
import { EPOCH_POLL_INTERVAL_DEFAULT } from './NetworkInfo';
import { EPOCH_POLL_INTERVAL_DEFAULT } from './util';
import { InvalidLoggerLevel } from './errors';
import {
PARALLEL_MODE_DEFAULT,
Expand Down
2 changes: 1 addition & 1 deletion packages/cardano-services/src/run.ts
Expand Up @@ -15,7 +15,7 @@ import {
import { DB_CACHE_TTL_DEFAULT } from './InMemoryCache';
import { DEFAULT_TOKEN_METADATA_CACHE_TTL, DEFAULT_TOKEN_METADATA_SERVER_URL } from './Asset';
import { ENABLE_METRICS_DEFAULT, USE_QUEUE_DEFAULT } from './ProgramsCommon';
import { EPOCH_POLL_INTERVAL_DEFAULT } from './NetworkInfo';
import { EPOCH_POLL_INTERVAL_DEFAULT } from './util';
import { LogLevel } from 'bunyan';
import { URL } from 'url';
import { cacheTtlValidator } from './util/validators';
Expand Down
1 change: 1 addition & 0 deletions packages/cardano-services/src/util/index.ts
@@ -1,3 +1,4 @@
export * from './http';
export * from './provider';
export * from './hexString';
export * from './polling';
64 changes: 64 additions & 0 deletions packages/cardano-services/src/util/polling/EpochPollService.ts
@@ -0,0 +1,64 @@
import { AsyncAction, InMemoryCache, UNLIMITED_CACHE_TTL } from '../../InMemoryCache';

export const EPOCH_POLL_INTERVAL_DEFAULT = 10_000;
export const CURRENT_EPOCH_CACHE_KEY = 'current_epoch';

/**
* Class to handle epoch rollover through db polling
*/
export class EpochPollService {
static #instance: EpochPollService;
#cache: InMemoryCache;
#timeoutId: number;
#interval: number;
#asyncAction: AsyncAction<number>;

private constructor(cache: InMemoryCache, asyncAction: AsyncAction<number>, interval: number) {
this.#cache = cache;
this.#interval = interval;
this.#asyncAction = asyncAction;
this.executePoll = this.executePoll.bind(this);
}

/**
* Poll execution to determine new epoch rollover
*/
private async executePoll() {
const lastEpoch = await this.#asyncAction();
const currentEpoch = this.#cache.getVal<number>(CURRENT_EPOCH_CACHE_KEY);
const shouldClearCache = !!(currentEpoch && lastEpoch > currentEpoch);

if (!currentEpoch || shouldClearCache) {
shouldClearCache ? this.#cache.clear() : void 0;
this.#cache.set<number>(CURRENT_EPOCH_CACHE_KEY, lastEpoch, UNLIMITED_CACHE_TTL);
}
}

/**
* Creates a single instance of EpochPollService
*
* @returns {EpochPollService} EpochPollService instance
*/
static create(cache: InMemoryCache, asyncAction: AsyncAction<number>, interval: number): EpochPollService {
if (!this.#instance) {
this.#instance = new EpochPollService(cache, asyncAction, interval);
}
return this.#instance;
}

/**
* Starts the poll execution, should be executed only once across all HTTP services
*/
start() {
if (this.#timeoutId) return;

this.#timeoutId = setInterval(this.executePoll, this.#interval);
}

/**
* Stops the poll execution
*/
stop() {
clearInterval(this.#timeoutId);
}
}
1 change: 1 addition & 0 deletions packages/cardano-services/src/util/polling/index.ts
@@ -0,0 +1 @@
export * from './EpochPollService';
Expand Up @@ -2,9 +2,10 @@
/* eslint-disable sonarjs/no-duplicate-string */
/* eslint-disable sonarjs/cognitive-complexity */
/* eslint-disable sonarjs/no-identical-functions */
import { CURRENT_EPOCH_CACHE_KEY } from '../../src/util';
import { CardanoNode, NetworkInfoProvider, ProviderError, ProviderFailure } from '@cardano-sdk/core';
import { CreateHttpProviderConfig, networkInfoHttpProvider } from '@cardano-sdk/cardano-services-client';
import { DbSyncNetworkInfoProvider, NetworkInfoCacheKey, NetworkInfoHttpService } from '../../src/NetworkInfo';
import { DbSyncNetworkInfoProvider, NetworkInfoHttpService } from '../../src/NetworkInfo';
import { HttpServer, HttpServerConfig } from '../../src';
import { INFO, createLogger } from 'bunyan';
import { InMemoryCache, UNLIMITED_CACHE_TTL } from '../../src/InMemoryCache';
Expand Down Expand Up @@ -71,7 +72,7 @@ describe('NetworkInfoHttpService', () => {

describe('healthy state', () => {
const dbConnectionQuerySpy = jest.spyOn(db, 'query');
const invalidateCacheSpy = jest.spyOn(cache, 'invalidate');
const clearCacheSpy = jest.spyOn(cache, 'clear');

beforeAll(async () => {
port = await getPort();
Expand Down Expand Up @@ -101,18 +102,20 @@ describe('NetworkInfoHttpService', () => {
beforeEach(async () => {
await cache.clear();
jest.clearAllMocks();
dbConnectionQuerySpy.mockClear();
clearCacheSpy.mockClear();
});

describe('start', () => {
it('should start epoch polling once the db provider is initialized and started', async () => {
expect(cache.getVal(NetworkInfoCacheKey.CURRENT_EPOCH)).toBeUndefined();
expect(cache.getVal(CURRENT_EPOCH_CACHE_KEY)).toBeUndefined();
expect(cache.keys().length).toEqual(0);

await sleep(epochPollInterval * 2);

expect(cache.keys().length).toEqual(1);
expect(cache.getVal(NetworkInfoCacheKey.CURRENT_EPOCH)).toBeDefined();
expect(invalidateCacheSpy).not.toHaveBeenCalled();
expect(cache.getVal(CURRENT_EPOCH_CACHE_KEY)).toBeDefined();
expect(clearCacheSpy).not.toHaveBeenCalled();
});
});

Expand Down Expand Up @@ -206,16 +209,17 @@ describe('NetworkInfoHttpService', () => {

await provider.stake();

expect(cache.getVal(NetworkInfoCacheKey.CURRENT_EPOCH)).toBeUndefined();
expect(cache.getVal(CURRENT_EPOCH_CACHE_KEY)).toBeUndefined();
expect(cache.keys().length).toEqual(stakeTotalQueriesCount);

await sleep(epochPollInterval);

expect(cache.getVal(NetworkInfoCacheKey.CURRENT_EPOCH)).toEqual(currentEpochNo);
expect(cache.getVal(CURRENT_EPOCH_CACHE_KEY)).toEqual(currentEpochNo);
expect(cache.keys().length).toEqual(totalQueriesCount);
expect(dbConnectionQuerySpy).toBeCalledTimes(stakeDbQueriesCount + DB_POLL_QUERIES_COUNT);
expect(cardanoNodeStakeSpy).toHaveBeenCalledTimes(stakeNodeQueriesCount);
expect(invalidateCacheSpy).not.toHaveBeenCalled();
expect(clearCacheSpy).not.toHaveBeenCalled();
expect(dbConnectionQuerySpy).toBeCalledTimes(totalQueriesCount);
});

it(
Expand All @@ -235,16 +239,10 @@ describe('NetworkInfoHttpService', () => {
);

await sleep(epochPollInterval);
expect(invalidateCacheSpy).toHaveBeenCalledWith([
NetworkInfoCacheKey.TOTAL_SUPPLY,
NetworkInfoCacheKey.ACTIVE_STAKE,
NetworkInfoCacheKey.ERA_SUMMARIES
]);
expect(clearCacheSpy).toHaveBeenCalled();

expect(cache.getVal(NetworkInfoCacheKey.CURRENT_EPOCH)).toEqual(greaterEpoch);
expect(cache.keys().length).toEqual(2);

await sleep(epochPollInterval);
expect(cache.getVal(CURRENT_EPOCH_CACHE_KEY)).toEqual(greaterEpoch);
expect(cache.keys().length).toEqual(1);
}, db)
);
});
Expand Down Expand Up @@ -306,17 +304,17 @@ describe('NetworkInfoHttpService', () => {
expect(dbConnectionQuerySpy).toBeCalledTimes(dbSyncQueriesCount * 2);
});

it('should not invalidate the epoch values from the cache if there is no epoch rollover', async () => {
it('should not invalidate the epoch values from the cache if there is no epoch rollover 2', async () => {
const currentEpochNo = 205;
const totalDbQueriesCount = dbSyncQueriesCount + DB_POLL_QUERIES_COUNT;
await provider.lovelaceSupply();
expect(cache.getVal(NetworkInfoCacheKey.CURRENT_EPOCH)).toBeUndefined();
expect(cache.getVal(CURRENT_EPOCH_CACHE_KEY)).toBeUndefined();
expect(cache.keys().length).toEqual(2);
await sleep(epochPollInterval);
expect(cache.getVal(NetworkInfoCacheKey.CURRENT_EPOCH)).toEqual(currentEpochNo);
expect(cache.getVal(CURRENT_EPOCH_CACHE_KEY)).toEqual(currentEpochNo);
expect(cache.keys().length).toEqual(3);
expect(dbConnectionQuerySpy).toBeCalledTimes(totalDbQueriesCount);
expect(invalidateCacheSpy).not.toHaveBeenCalled();
expect(clearCacheSpy).not.toHaveBeenCalled();
});
});

Expand Down
@@ -1,7 +1,7 @@
/* eslint-disable max-len */
/* eslint-disable sonarjs/no-duplicate-string */
import { DB_CACHE_TTL_DEFAULT } from '../../src/InMemoryCache';
import { EPOCH_POLL_INTERVAL_DEFAULT } from '../../src/NetworkInfo';
import { EPOCH_POLL_INTERVAL_DEFAULT, listenPromise, serverClosePromise } from '../../src/util';
import {
HttpServer,
InvalidArgsCombination,
Expand All @@ -23,7 +23,6 @@ import {
ogmiosServerReady
} from '../util';
import { getRandomPort } from 'get-port-please';
import { listenPromise, serverClosePromise } from '../../src/util';
import http from 'http';

jest.mock('dns', () => ({
Expand Down

0 comments on commit 49eb011

Please sign in to comment.