Skip to content

Commit

Permalink
feat(cardano-services): cache stake pool queries
Browse files Browse the repository at this point in the history
- extend StakePoolProvider with cache and epochPollService
- refactor sub queries and optimize caching of pools
  • Loading branch information
Ivaylo Andonov committed Aug 8, 2022
1 parent 49eb011 commit 419252a
Show file tree
Hide file tree
Showing 11 changed files with 308 additions and 117 deletions.
Expand Up @@ -51,8 +51,8 @@ export class DbSyncNetworkInfoProvider extends DbSyncProvider implements Network
this.#logger = logger;
this.#cache = cache;
this.#cardanoNode = cardanoNode;
this.#genesisDataReady = loadGenesisData(cardanoNodeConfigPath);
this.#builder = new NetworkInfoBuilder(db, logger);
this.#genesisDataReady = loadGenesisData(cardanoNodeConfigPath);
this.#epochPollService = EpochPollService.create(cache, () => this.#builder.queryLatestEpoch(), epochPollInterval);
}

Expand Down Expand Up @@ -119,7 +119,7 @@ export class DbSyncNetworkInfoProvider extends DbSyncProvider implements Network

async close(): Promise<void> {
this.#cache.shutdown();
this.#epochPollService.stop();
this.#epochPollService.shutdown();
await this.#cardanoNode.shutdown();
}
}
10 changes: 7 additions & 3 deletions packages/cardano-services/src/Program/loadHttpServer.ts
Expand Up @@ -84,9 +84,13 @@ const serviceMapFactory = (

return new AssetHttpService({ assetProvider, logger });
}),
[ServiceNames.StakePool]: withDb(
(db) => new StakePoolHttpService({ logger, stakePoolProvider: new DbSyncStakePoolProvider(db, logger) })
),
[ServiceNames.StakePool]: withDb((db) => {
const stakePoolProvider = new DbSyncStakePoolProvider(
{ epochPollInterval: args.options!.epochPollInterval! },
{ cache, db, logger }
);
return new StakePoolHttpService({ logger, stakePoolProvider });
}),
[ServiceNames.Utxo]: withDb(
(db) => new UtxoHttpService({ logger, utxoProvider: new DbSyncUtxoProvider(db, logger) })
),
Expand Down
@@ -1,21 +1,45 @@
import { CommonPoolInfo, PoolAPY, PoolData, PoolMetrics, PoolSortType } from './types';
/* eslint-disable max-len */
/* eslint-disable max-params */

import {
Cardano,
StakePoolProvider,
StakePoolQueryOptions,
StakePoolSearchResults,
StakePoolStats
} from '@cardano-sdk/core';
import { CommonPoolInfo, OrderedResult, PoolAPY, PoolData, PoolMetrics, PoolSortType, PoolUpdate } from './types';
import { DbSyncProvider } from '../../DbSyncProvider';
import { EpochPollService } from '../../util';
import { IDS_NAMESPACE, StakePoolsSubQuery, getStakePoolSortType, queryCacheKey } from './util';
import { InMemoryCache, UNLIMITED_CACHE_TTL } from '../../InMemoryCache';
import { Logger } from 'ts-log';
import { Pool } from 'pg';
import { StakePoolBuilder } from './StakePoolBuilder';
import { StakePoolProvider, StakePoolQueryOptions, StakePoolSearchResults, StakePoolStats } from '@cardano-sdk/core';
import { getStakePoolSortType } from './util';
import { isNotNil } from '@cardano-sdk/util';
import { toCoreStakePool } from './mappers';
import { toStakePoolResults } from './mappers';

export interface StakePoolProviderProps {
epochPollInterval: number;
}
export interface StakePoolProviderDependencies {
db: Pool;
cache: InMemoryCache;
logger: Logger;
}

export class DbSyncStakePoolProvider extends DbSyncProvider implements StakePoolProvider {
#builder: StakePoolBuilder;
#logger: Logger;
#cache: InMemoryCache;
#epochPollService: EpochPollService;

constructor(db: Pool, logger: Logger) {
constructor({ epochPollInterval }: StakePoolProviderProps, { db, cache, logger }: StakePoolProviderDependencies) {
super(db);
this.#builder = new StakePoolBuilder(db, logger);
this.#logger = logger;
this.#cache = cache;
this.#builder = new StakePoolBuilder(db, logger);
this.#epochPollService = EpochPollService.create(cache, () => this.#builder.getLastEpoch(), epochPollInterval);
}

private getQueryBySortType(
Expand All @@ -37,19 +61,16 @@ export class DbSyncStakePoolProvider extends DbSyncProvider implements StakePool
}
}

public async queryStakePools(options?: StakePoolQueryOptions): Promise<StakePoolSearchResults> {
const { params, query } =
options?.filters?._condition === 'or'
? this.#builder.buildOrQuery(options?.filters)
: this.#builder.buildAndQuery(options?.filters);
this.#logger.debug('About to query pool hashes');
const poolUpdates = await this.#builder.queryPoolHashes(query, params);
private async getPoolsDataOrdered(
poolUpdates: PoolUpdate[],
totalAdaAmount: string,
options?: StakePoolQueryOptions
) {
const hashesIds = poolUpdates.map(({ id }) => id);
const updatesIds = poolUpdates.map(({ updateId }) => updateId);

this.#logger.debug(`${hashesIds.length} pools found`);
const totalAdaAmount = await this.#builder.getTotalAmountOfAda();

this.#logger.debug('About to query stake pools by sort options');
const sortType = options?.sort?.field ? getStakePoolSortType(options.sort.field) : 'data';
const orderedResult = await this.getQueryBySortType(sortType, { hashesIds, totalAdaAmount, updatesIds })(options);
const orderedResultHashIds = (orderedResult as CommonPoolInfo[]).map(({ hashId }) => hashId);
Expand Down Expand Up @@ -78,19 +99,27 @@ export class DbSyncStakePoolProvider extends DbSyncProvider implements StakePool
} else {
poolDatas = orderedResult as PoolData[];
}
return { hashesIds, orderedResult, orderedResultHashIds, orderedResultUpdateIds, poolDatas, sortType };
}

private cacheStakePools(itemsToCache: { [hashId: number]: Cardano.StakePool }) {
for (const [hashId, pool] of Object.entries(itemsToCache))
this.#cache.set(`${IDS_NAMESPACE}/${hashId}`, pool, UNLIMITED_CACHE_TTL);
}

private async queryExtraPoolsData(
idsToFetch: PoolUpdate[],
sortType: PoolSortType,
totalAdaAmount: string,
orderedResult: OrderedResult,
hashesIds: number[],
options?: StakePoolQueryOptions
) {
this.#logger.debug('About to query stake pool extra information');
const [
poolRelays,
poolOwners,
poolRegistrations,
poolRetirements,
poolRewards,
poolMetrics,
poolAPYs,
totalCount,
lastEpoch
] = await Promise.all([
const orderedResultHashIds = idsToFetch.map(({ id }) => id);
const orderedResultUpdateIds = idsToFetch.map(({ updateId }) => updateId);

return await Promise.all([
// TODO: it would be easier and make the code cleaner if all queries had the same id as argument
// (either hash or update id)
this.#builder.queryPoolRelays(orderedResultUpdateIds),
Expand All @@ -103,12 +132,57 @@ export class DbSyncStakePoolProvider extends DbSyncProvider implements StakePool
: this.#builder.queryPoolMetrics(orderedResultHashIds, totalAdaAmount),
sortType === 'apy'
? (orderedResult as PoolAPY[])
: this.#builder.queryPoolAPY(hashesIds, { rewardsHistoryLimit: options?.rewardsHistoryLimit }),
this.#builder.queryTotalCount(query, params),
this.#builder.getLastEpoch()
: this.#builder.queryPoolAPY(hashesIds, { rewardsHistoryLimit: options?.rewardsHistoryLimit })
]);
}

public async queryStakePools(options?: StakePoolQueryOptions): Promise<StakePoolSearchResults> {
const { params, query } =
options?.filters?._condition === 'or'
? this.#builder.buildOrQuery(options?.filters)
: this.#builder.buildAndQuery(options?.filters);

return toCoreStakePool(orderedResultHashIds, {
// Get pool updates/hashes
const poolUpdates = await this.#cache.get(queryCacheKey(StakePoolsSubQuery.POOL_HASHES, options), () =>
this.#builder.queryPoolHashes(query, params)
);
// Get pool total amount of ada
const totalAdaAmount = await this.#cache.get(queryCacheKey(StakePoolsSubQuery.TOTAL_ADA_AMOUNT), () =>
this.#builder.getTotalAmountOfAda()
);
// Get pool total stake pools count
const totalCount = await this.#builder.queryTotalCount(query, params);
// Get last epoch no
const lastEpoch = await this.#builder.getLastEpoch();
// Get stake pools data
const { orderedResultHashIds, orderedResultUpdateIds, orderedResult, poolDatas, hashesIds, sortType } =
await this.#cache.get(queryCacheKey(StakePoolsSubQuery.POOLS_DATA_ORDERED, options), () =>
this.getPoolsDataOrdered(poolUpdates, totalAdaAmount, options)
);

// Create lookup table with pool ids: (hashId:updateId)
const hashIdsMap = Object.fromEntries(
orderedResultHashIds.map((hashId, idx) => [hashId, orderedResultUpdateIds[idx]])
);

// Create a lookup table with cached pools: (hashId:Cardano.StakePool)
const fromCache = Object.fromEntries(
orderedResultHashIds.map((hashId) => [
hashId,
this.#cache.getVal<Cardano.StakePool>(`${IDS_NAMESPACE}/${hashId}`)
])
);

// Compute ids to fetch from db
const idsToFetch = Object.entries(fromCache)
.filter(([_, pool]) => pool === undefined)
.map(([hashId, _]) => ({ id: Number(hashId), updateId: hashIdsMap[hashId] }));

// Get stake pools extra information
const [poolRelays, poolOwners, poolRegistrations, poolRetirements, poolRewards, poolMetrics, poolAPYs] =
await this.queryExtraPoolsData(idsToFetch, sortType, totalAdaAmount, orderedResult, hashesIds, options);

const { results, poolsToCache } = toStakePoolResults(orderedResultHashIds, fromCache, {
lastEpoch,
poolAPYs,
poolDatas,
Expand All @@ -120,10 +194,23 @@ export class DbSyncStakePoolProvider extends DbSyncProvider implements StakePool
poolRewards: poolRewards.filter(isNotNil),
totalCount
});

// Cache stake pools core objects
this.cacheStakePools(poolsToCache);
return results;
}

public async stakePoolStats(): Promise<StakePoolStats> {
this.#logger.debug('About to query pool stats');
return this.#builder.queryPoolStats();
return await this.#cache.get(queryCacheKey(StakePoolsSubQuery.STATS), () => this.#builder.queryPoolStats());
}

async start(): Promise<void> {
this.#epochPollService.start();
}

async close(): Promise<void> {
this.#cache.shutdown();
this.#epochPollService.shutdown();
}
}
Expand Up @@ -60,28 +60,33 @@ export class StakePoolBuilder {
this.#db = db;
this.#logger = logger;
}

public async queryRetirements(hashesIds: number[]) {
this.#logger.debug('About to query pool retirements');
const result: QueryResult<PoolRetirementModel> = await this.#db.query(Queries.findPoolsRetirements, [hashesIds]);
return result.rows.length > 0 ? result.rows.map(mapPoolRetirement) : [];
}

public async queryRegistrations(hashesIds: number[]) {
this.#logger.debug('About to query pool registrations');
const result: QueryResult<PoolRegistrationModel> = await this.#db.query(Queries.findPoolsRegistrations, [
hashesIds
]);
return result.rows.length > 0 ? result.rows.map(mapPoolRegistration) : [];
}

public async queryPoolRelays(updatesIds: number[]) {
this.#logger.debug('About to query pool relays');
const result: QueryResult<RelayModel> = await this.#db.query(Queries.findPoolsRelays, [updatesIds]);
return result.rows.length > 0 ? result.rows.map(mapRelay) : [];
}

public async queryPoolOwners(updatesIds: number[]) {
this.#logger.debug('About to query pool owners');
const result: QueryResult<OwnerAddressModel> = await this.#db.query(Queries.findPoolsOwners, [updatesIds]);
return result.rows.length > 0 ? result.rows.map(mapAddressOwner) : [];
}

public async queryPoolRewards(hashesIds: number[], limit?: number) {
this.#logger.debug('About to query pool rewards');
return Promise.all(
Expand All @@ -93,6 +98,7 @@ export class StakePoolBuilder {
})
);
}

public async queryPoolAPY(hashesIds: number[], options?: StakePoolQueryOptions): Promise<PoolAPY[]> {
this.#logger.debug('About to query pools APY');
const defaultSort: OrderByOptions[] = [{ field: 'apy', order: 'desc' }];
Expand All @@ -103,6 +109,7 @@ export class StakePoolBuilder {
const result: QueryResult<PoolAPYModel> = await this.#db.query(queryWithSortAndPagination, [hashesIds]);
return result.rows.map(mapPoolAPY);
}

public async queryPoolData(updatesIds: number[], options?: StakePoolQueryOptions) {
this.#logger.debug('About to query pool data');
const defaultSort: OrderByOptions[] = [
Expand All @@ -116,11 +123,14 @@ export class StakePoolBuilder {
const result: QueryResult<PoolDataModel> = await this.#db.query(queryWithSortAndPagination, [updatesIds]);
return result.rows.length > 0 ? result.rows.map(mapPoolData) : [];
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
public async queryPoolHashes(query: string, params: any[] = []) {
this.#logger.debug('About to query pool hashes');
const result: QueryResult<PoolUpdateModel> = await this.#db.query(query, params);
return result.rows.length > 0 ? result.rows.map(mapPoolUpdate) : [];
}

public async queryPoolMetrics(hashesIds: number[], totalAdaAmount: string, options?: StakePoolQueryOptions) {
this.#logger.debug('About to query pool metrics');
const queryWithSortAndPagination = withPagination(
Expand All @@ -133,6 +143,7 @@ export class StakePoolBuilder {
]);
return result.rows.length > 0 ? result.rows.map(mapPoolMetrics) : [];
}

public buildPoolsByIdentifierQuery(
identifier: MultipleChoiceSearchFilter<
Partial<Pick<Cardano.PoolParameters, 'id'> & Pick<Cardano.StakePoolMetadata, 'name' | 'ticker'>>
Expand All @@ -148,6 +159,7 @@ export class StakePoolBuilder {
`;
return { id: { isPrimary: true, name: 'pools_by_identifier' }, params, query };
}

public buildPoolsByStatusQuery(status: Cardano.StakePoolStatus[]) {
const whereClause = getStatusWhereClause(status);
const query = `
Expand All @@ -156,6 +168,7 @@ export class StakePoolBuilder {
`;
return { id: { isPrimary: true, name: 'pools_by_status' }, query };
}

public buildPoolsByPledgeMetQuery(pledgeMet: boolean) {
const subQueries = [...poolsByPledgeMetSubqueries];
subQueries.push({
Expand All @@ -167,16 +180,19 @@ export class StakePoolBuilder {
});
return subQueries;
}

public async getLastEpoch() {
this.#logger.debug('About to query last epoch');
const result: QueryResult<EpochModel> = await this.#db.query(Queries.findLastEpoch);
return result.rows[0].no;
}

public async getTotalAmountOfAda() {
this.#logger.debug('About to get total amount of ada');
this.#logger.debug('About to query total ada amount');
const result: QueryResult<TotalAdaModel> = await this.#db.query(Queries.findTotalAda);
return result.rows[0].total_ada;
}

public buildOrQuery(filters: StakePoolQueryOptions['filters']) {
const subQueries: SubQuery[] = [];
const params = [];
Expand All @@ -201,6 +217,7 @@ export class StakePoolBuilder {
}
return { params, query };
}

// eslint-disable-next-line max-statements
public buildAndQuery(filters: StakePoolQueryOptions['filters']) {
let query = Queries.findPools;
Expand Down Expand Up @@ -257,6 +274,7 @@ export class StakePoolBuilder {
query = addSentenceToQuery(query, groupByClause);
return { params, query };
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
public async queryTotalCount(query: string, _params: any[]) {
this.#logger.debug('About to get total count of pools');
Expand Down

0 comments on commit 419252a

Please sign in to comment.