Skip to content

Commit

Permalink
fixup! feat(cardano-services): cache stake pool queries - extend Stak…
Browse files Browse the repository at this point in the history
…ePoolProvider with cache and epochPollService - refactor sub queries and optimize caching of pools
  • Loading branch information
Ivaylo Andonov committed Aug 8, 2022
1 parent 419252a commit 7c8cd1f
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 57 deletions.
Expand Up @@ -8,8 +8,8 @@ import {
SupplySummary,
TimeSettings
} from '@cardano-sdk/core';
import { DbSyncEpochPollService } from '../../util/polling/DbSyncEpochPollService';
import { DbSyncProvider } from '../../DbSyncProvider';
import { EpochPollService } from '../../util/polling/EpochPollService';
import { GenesisData } from './types';
import { InMemoryCache, UNLIMITED_CACHE_TTL } from '../../InMemoryCache';
import { Logger } from 'ts-log';
Expand All @@ -27,33 +27,33 @@ import {

export interface NetworkInfoProviderProps {
cardanoNodeConfigPath: string;
epochPollInterval: number;
}
export interface NetworkInfoProviderDependencies {
db: Pool;
cache: InMemoryCache;
logger: Logger;
cardanoNode: CardanoNode;
epochPollService: DbSyncEpochPollService;
}
export class DbSyncNetworkInfoProvider extends DbSyncProvider implements NetworkInfoProvider {
#logger: Logger;
#cache: InMemoryCache;
#builder: NetworkInfoBuilder;
#genesisDataReady: Promise<GenesisData>;
#epochPollService: EpochPollService;
#epochPollService: DbSyncEpochPollService;
#cardanoNode: CardanoNode;

constructor(
{ cardanoNodeConfigPath, epochPollInterval }: NetworkInfoProviderProps,
{ db, cache, logger, cardanoNode }: NetworkInfoProviderDependencies
{ cardanoNodeConfigPath }: NetworkInfoProviderProps,
{ db, cache, logger, cardanoNode, epochPollService }: NetworkInfoProviderDependencies
) {
super(db);
this.#logger = logger;
this.#cache = cache;
this.#cardanoNode = cardanoNode;
this.#epochPollService = epochPollService;
this.#builder = new NetworkInfoBuilder(db, logger);
this.#genesisDataReady = loadGenesisData(cardanoNodeConfigPath);
this.#epochPollService = EpochPollService.create(cache, () => this.#builder.queryLatestEpoch(), epochPollInterval);
}

public async ledgerTip(): Promise<Cardano.Tip> {
Expand Down
24 changes: 16 additions & 8 deletions packages/cardano-services/src/Program/loadHttpServer.ts
Expand Up @@ -3,6 +3,7 @@
import { AssetHttpService, CardanoTokenRegistry, DbSyncAssetProvider, DbSyncNftMetadataService } from '../Asset';
import { ChainHistoryHttpService, DbSyncChainHistoryProvider } from '../ChainHistory';
import { CommonProgramOptions } from '../ProgramsCommon';
import { DbSyncEpochPollService } from '../util';
import { DbSyncNetworkInfoProvider, NetworkInfoHttpService } from '../NetworkInfo';
import { DbSyncRewardsProvider, RewardsHttpService } from '../Rewards';
import { DbSyncStakePoolProvider, StakePoolHttpService } from '../StakePool';
Expand All @@ -17,6 +18,7 @@ import { TxSubmitHttpService } from '../TxSubmit';
import { createDbSyncMetadataService } from '../Metadata';
import { getOgmiosCardanoNode, getOgmiosTxSubmitProvider, getPool, getRabbitMqTxSubmitProvider } from './services';
import Logger, { createLogger } from 'bunyan';
import memoize from 'lodash/memoize';
import pg from 'pg';

export interface HttpServerOptions extends CommonProgramOptions {
Expand Down Expand Up @@ -85,10 +87,16 @@ const serviceMapFactory = (
return new AssetHttpService({ assetProvider, logger });
}),
[ServiceNames.StakePool]: withDb((db) => {
const stakePoolProvider = new DbSyncStakePoolProvider(
{ epochPollInterval: args.options!.epochPollInterval! },
{ cache, db, logger }
const getEpochPollService = memoize(
(dbPool) => new DbSyncEpochPollService(cache, dbPool, args.options!.epochPollInterval!)
);
const stakePoolProvider = new DbSyncStakePoolProvider({
cache,
db,
epochPollService: getEpochPollService(db),
logger
});

return new StakePoolHttpService({ logger, stakePoolProvider });
}),
[ServiceNames.Utxo]: withDb(
Expand All @@ -110,13 +118,13 @@ const serviceMapFactory = (
if (args.options?.ogmiosUrl === undefined)
throw new MissingProgramOption(ServiceNames.NetworkInfo, ProgramOptionDescriptions.OgmiosUrl);

const getEpochPollService = memoize(
(dbPool) => new DbSyncEpochPollService(cache, dbPool, args.options!.epochPollInterval!)
);
const cardanoNode = await getOgmiosCardanoNode(dnsResolver, logger, args.options);
const networkInfoProvider = new DbSyncNetworkInfoProvider(
{
cardanoNodeConfigPath: args.options.cardanoNodeConfigPath,
epochPollInterval: args.options?.epochPollInterval
},
{ cache, cardanoNode, db, logger }
{ cardanoNodeConfigPath: args.options.cardanoNodeConfigPath },
{ cache, cardanoNode, db, epochPollService: getEpochPollService(db), logger }
);

return new NetworkInfoHttpService({ logger, networkInfoProvider });
Expand Down
Expand Up @@ -9,8 +9,8 @@ import {
StakePoolStats
} from '@cardano-sdk/core';
import { CommonPoolInfo, OrderedResult, PoolAPY, PoolData, PoolMetrics, PoolSortType, PoolUpdate } from './types';
import { DbSyncEpochPollService } from '../../util';
import { DbSyncProvider } from '../../DbSyncProvider';
import { EpochPollService } from '../../util';
import { IDS_NAMESPACE, StakePoolsSubQuery, getStakePoolSortType, queryCacheKey } from './util';
import { InMemoryCache, UNLIMITED_CACHE_TTL } from '../../InMemoryCache';
import { Logger } from 'ts-log';
Expand All @@ -19,27 +19,25 @@ import { StakePoolBuilder } from './StakePoolBuilder';
import { isNotNil } from '@cardano-sdk/util';
import { toStakePoolResults } from './mappers';

export interface StakePoolProviderProps {
epochPollInterval: number;
}
export interface StakePoolProviderDependencies {
db: Pool;
cache: InMemoryCache;
logger: Logger;
cache: InMemoryCache;
epochPollService: DbSyncEpochPollService;
}

export class DbSyncStakePoolProvider extends DbSyncProvider implements StakePoolProvider {
#builder: StakePoolBuilder;
#logger: Logger;
#cache: InMemoryCache;
#epochPollService: EpochPollService;
#epochPollService: DbSyncEpochPollService;

constructor({ epochPollInterval }: StakePoolProviderProps, { db, cache, logger }: StakePoolProviderDependencies) {
constructor({ db, cache, logger, epochPollService }: StakePoolProviderDependencies) {
super(db);
this.#logger = logger;
this.#cache = cache;
this.#epochPollService = epochPollService;
this.#builder = new StakePoolBuilder(db, logger);
this.#epochPollService = EpochPollService.create(cache, () => this.#builder.getLastEpoch(), epochPollInterval);
}

private getQueryBySortType(
Expand Down
@@ -1,30 +1,32 @@
import { AsyncAction, InMemoryCache, UNLIMITED_CACHE_TTL } from '../../InMemoryCache';
import { EpochModel } from '../../StakePool';
import { InMemoryCache, UNLIMITED_CACHE_TTL } from '../../InMemoryCache';
import { Pool, QueryResult } from 'pg';
import { findLastEpoch } from './queries';

export const EPOCH_POLL_INTERVAL_DEFAULT = 10_000;
export const CURRENT_EPOCH_CACHE_KEY = 'current_epoch';

/**
* Class to handle epoch rollover through db polling
*/
export class EpochPollService {
static #instance: EpochPollService;
export class DbSyncEpochPollService {
#cache: InMemoryCache;
#timeoutId: number;
#interval: number;
#asyncAction: AsyncAction<number>;
#db: Pool;

private constructor(cache: InMemoryCache, asyncAction: AsyncAction<number>, interval: number) {
constructor(cache: InMemoryCache, db: Pool, interval: number) {
this.#db = db;
this.#cache = cache;
this.#interval = interval;
this.#asyncAction = asyncAction;
this.executePoll = this.executePoll.bind(this);
}

/**
* Poll execution to determine new epoch rollover
*/
private async executePoll() {
const lastEpoch = await this.#asyncAction();
const lastEpoch = await this.getLastEpoch();
const currentEpoch = this.#cache.getVal<number>(CURRENT_EPOCH_CACHE_KEY);
const shouldClearCache = !!(currentEpoch && lastEpoch > currentEpoch);

Expand All @@ -35,15 +37,11 @@ export class EpochPollService {
}

/**
* Creates a single instance of EpochPollService
*
* @returns {EpochPollService} EpochPollService instance
* Fetch the last epoch number
*/
static create(cache: InMemoryCache, asyncAction: AsyncAction<number>, interval: number): EpochPollService {
if (!this.#instance) {
this.#instance = new EpochPollService(cache, asyncAction, interval);
}
return this.#instance;
private async getLastEpoch() {
const result: QueryResult<EpochModel> = await this.#db.query(findLastEpoch);
return result.rows[0].no;
}

/**
Expand Down
3 changes: 2 additions & 1 deletion packages/cardano-services/src/util/polling/index.ts
@@ -1 +1,2 @@
export * from './EpochPollService';
export * from './DbSyncEpochPollService';
export * from './queries';
6 changes: 6 additions & 0 deletions packages/cardano-services/src/util/polling/queries.ts
@@ -0,0 +1,6 @@
export const findLastEpoch = `
SELECT "no"
FROM epoch
ORDER BY no DESC
LIMIT 1
`;
Expand Up @@ -2,7 +2,7 @@
/* eslint-disable sonarjs/no-duplicate-string */
/* eslint-disable sonarjs/cognitive-complexity */
/* eslint-disable sonarjs/no-identical-functions */
import { CURRENT_EPOCH_CACHE_KEY } from '../../src/util';
import { CURRENT_EPOCH_CACHE_KEY, DbSyncEpochPollService } from '../../src/util';
import { CardanoNode, NetworkInfoProvider, ProviderError, ProviderFailure } from '@cardano-sdk/core';
import { CreateHttpProviderConfig, networkInfoHttpProvider } from '@cardano-sdk/cardano-services-client';
import { DbSyncNetworkInfoProvider, NetworkInfoHttpService } from '../../src/NetworkInfo';
Expand Down Expand Up @@ -38,6 +38,7 @@ describe('NetworkInfoHttpService', () => {
const cache = new InMemoryCache(UNLIMITED_CACHE_TTL);
const cardanoNodeConfigPath = process.env.CARDANO_NODE_CONFIG_PATH!;
const db = new Pool({ connectionString: process.env.POSTGRES_CONNECTION_STRING, max: 1, min: 1 });
const epochPollService = new DbSyncEpochPollService(cache, db, epochPollInterval!);

describe('unhealthy NetworkInfoProvider', () => {
beforeEach(async () => {
Expand Down Expand Up @@ -80,8 +81,8 @@ describe('NetworkInfoHttpService', () => {
cardanoNode = mockCardanoNode();
config = { listen: { port } };
networkInfoProvider = new DbSyncNetworkInfoProvider(
{ cardanoNodeConfigPath, epochPollInterval },
{ cache, cardanoNode, db, logger }
{ cardanoNodeConfigPath },
{ cache, cardanoNode, db, epochPollService, logger }
);
service = new NetworkInfoHttpService({ logger, networkInfoProvider });
httpServer = new HttpServer(config, { logger, services: [service] });
Expand Down
18 changes: 7 additions & 11 deletions packages/cardano-services/test/Program/services/ogmios.test.ts
Expand Up @@ -3,6 +3,7 @@
/* eslint-disable max-len */
import { Cardano, TxSubmitProvider } from '@cardano-sdk/core';
import { Connection } from '@cardano-ogmios/client';
import { DbSyncEpochPollService, listenPromise, serverClosePromise } from '../../../src/util';
import { DbSyncNetworkInfoProvider, NetworkInfoHttpService } from '../../../src/NetworkInfo';
import {
HttpServer,
Expand All @@ -20,7 +21,6 @@ import { SrvRecord } from 'dns';
import { createHealthyMockOgmiosServer, ogmiosServerReady } from '../../util';
import { createMockOgmiosServer } from '../../../../ogmios/test/mocks/mockOgmiosServer';
import { getPort, getRandomPort } from 'get-port-please';
import { listenPromise, serverClosePromise } from '../../../src/util';
import { dummyLogger as logger } from 'ts-log';
import { mockCardanoNode } from '../../../../core/test/CardanoNode/mocks';
import { types } from 'util';
Expand Down Expand Up @@ -126,12 +126,10 @@ describe('Service dependency abstractions', () => {
serviceDiscoveryBackoffFactor: 1.1,
serviceDiscoveryTimeout: 1000
});
const epochPollService = new DbSyncEpochPollService(cache, db!, 10_000);
const networkInfoProvider = new DbSyncNetworkInfoProvider(
{
cardanoNodeConfigPath,
epochPollInterval: 10_000
},
{ cache, cardanoNode, db: db!, logger }
{ cardanoNodeConfigPath },
{ cache, cardanoNode, db: db!, epochPollService, logger }
);

httpServer = new HttpServer(config, {
Expand Down Expand Up @@ -237,12 +235,10 @@ describe('Service dependency abstractions', () => {
serviceDiscoveryBackoffFactor: 1.1,
serviceDiscoveryTimeout: 1000
});
const epochPollService = new DbSyncEpochPollService(cache, db!, 10_000);
const networkInfoProvider = new DbSyncNetworkInfoProvider(
{
cardanoNodeConfigPath,
epochPollInterval: 10_000
},
{ cache, cardanoNode, db: db!, logger }
{ cardanoNodeConfigPath },
{ cache, cardanoNode, db: db!, epochPollService, logger }
);

httpServer = new HttpServer(config, {
Expand Down
13 changes: 9 additions & 4 deletions packages/cardano-services/test/Program/services/postgres.test.ts
@@ -1,6 +1,7 @@
/* eslint-disable sonarjs/no-identical-functions */
/* eslint-disable sonarjs/no-duplicate-string */
/* eslint-disable max-len */
import { DbSyncEpochPollService } from '../../../src/util';
import { DbSyncNetworkInfoProvider, NetworkInfoHttpService } from '../../../src/NetworkInfo';
import { HttpServer, HttpServerConfig, createDnsResolver, getPool } from '../../../src';
import { InMemoryCache, UNLIMITED_CACHE_TTL } from '../../../src/InMemoryCache';
Expand Down Expand Up @@ -37,6 +38,7 @@ describe('Service dependency abstractions', () => {
let config: HttpServerConfig;
let service: NetworkInfoHttpService;
let networkInfoProvider: DbSyncNetworkInfoProvider;
let epochPollService: DbSyncEpochPollService;

beforeAll(async () => {
db = await getPool(dnsResolver, logger, {
Expand All @@ -56,9 +58,10 @@ describe('Service dependency abstractions', () => {
port = await getPort();
config = { listen: { port } };
apiUrlBase = `http://localhost:${port}/network-info`;
epochPollService = new DbSyncEpochPollService(cache, db!, 10_000);
networkInfoProvider = new DbSyncNetworkInfoProvider(
{ cardanoNodeConfigPath, epochPollInterval: 2000 },
{ cache, cardanoNode, db: db!, logger }
{ cardanoNodeConfigPath },
{ cache, cardanoNode, db: db!, epochPollService, logger }
);
service = new NetworkInfoHttpService({ logger, networkInfoProvider });
httpServer = new HttpServer(config, { logger, services: [service] });
Expand Down Expand Up @@ -96,6 +99,7 @@ describe('Service dependency abstractions', () => {
let config: HttpServerConfig;
let service: NetworkInfoHttpService;
let networkInfoProvider: DbSyncNetworkInfoProvider;
let epochPollService: DbSyncEpochPollService;

beforeAll(async () => {
db = await getPool(dnsResolver, logger, {
Expand All @@ -110,9 +114,10 @@ describe('Service dependency abstractions', () => {
port = await getPort();
config = { listen: { port } };
apiUrlBase = `http://localhost:${port}/network-info`;
epochPollService = new DbSyncEpochPollService(cache, db!, 1000);
networkInfoProvider = new DbSyncNetworkInfoProvider(
{ cardanoNodeConfigPath, epochPollInterval: 2000 },
{ cache, cardanoNode, db: db!, logger }
{ cardanoNodeConfigPath },
{ cache, cardanoNode, db: db!, epochPollService, logger }
);
service = new NetworkInfoHttpService({ logger, networkInfoProvider });
httpServer = new HttpServer(config, { logger, services: [service] });
Expand Down
Expand Up @@ -12,6 +12,7 @@ import {
StakePoolStats
} from '@cardano-sdk/core';
import { CreateHttpProviderConfig, stakePoolHttpProvider } from '../../../cardano-services-client';
import { DbSyncEpochPollService } from '../../src/util';
import { DbSyncStakePoolProvider, HttpServer, HttpServerConfig, StakePoolHttpService } from '../../src';
import { INFO, createLogger } from 'bunyan';
import { InMemoryCache, UNLIMITED_CACHE_TTL } from '../../src/InMemoryCache';
Expand Down Expand Up @@ -63,6 +64,7 @@ describe('StakePoolHttpService', () => {
let config: HttpServerConfig;
let doStakePoolRequest: ReturnType<typeof doServerRequest>;
let provider: StakePoolProvider;
let epochPollService: DbSyncEpochPollService;

const epochPollInterval = 2 * 1000;
const cache = new InMemoryCache(UNLIMITED_CACHE_TTL);
Expand All @@ -73,6 +75,8 @@ describe('StakePoolHttpService', () => {
config = { listen: { port } };
clientConfig = { baseUrl, logger: createLogger({ level: INFO, name: 'unit tests' }) };
db = new Pool({ connectionString: process.env.POSTGRES_CONNECTION_STRING, max: 1, min: 1 });
epochPollService = new DbSyncEpochPollService(cache, db, epochPollInterval!);

doStakePoolRequest = doServerRequest(baseUrl);
});

Expand Down Expand Up @@ -105,7 +109,7 @@ describe('StakePoolHttpService', () => {
// eslint-disable-next-line sonarjs/cognitive-complexity
describe('healthy state', () => {
beforeAll(async () => {
stakePoolProvider = new DbSyncStakePoolProvider({ epochPollInterval }, { cache, db, logger });
stakePoolProvider = new DbSyncStakePoolProvider({ cache, db, epochPollService, logger });
service = new StakePoolHttpService({ logger, stakePoolProvider });
httpServer = new HttpServer(config, { logger, services: [service] });
await httpServer.initialize();
Expand Down

0 comments on commit 7c8cd1f

Please sign in to comment.