Skip to content

Commit

Permalink
fixup! refactor(cardano-services): callbacks pattern - apply pattern …
Browse files Browse the repository at this point in the history
…with callbacks in `DbSyncEpochPollService` - expose interface `EpochMonitor` - update and add new tests
  • Loading branch information
Ivaylo Andonov committed Aug 10, 2022
1 parent 9a6e57e commit 9c2bc2f
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 33 deletions.
Expand Up @@ -11,8 +11,8 @@ export const EPOCH_POLL_INTERVAL_DEFAULT = 10_000;
*/
export class DbSyncEpochPollService implements EpochMonitor {
#timeoutId?: number;
#currentEpoch: Promise<Cardano.Epoch>;
#callbacks: Function[];
#currentEpoch: Promise<Cardano.Epoch | null>;

/**
* Db connection
Expand All @@ -32,14 +32,15 @@ export class DbSyncEpochPollService implements EpochMonitor {
this.#db = db;
this.#callbacks = [];
this.#interval = interval;
this.#currentEpoch = Promise.resolve(null);
}

/**
* Poll execution to detect a new epoch rollover
* Upon the occurrence of rollover event it executes all callbacks by registered dependand services
*/
async #executePoll() {
const lastEpoch = await this.#getLastEpoch();
const lastEpoch = await this.#queryLastEpoch();
const currentEpoch = await this.#currentEpoch;
const shouldClearCache = !!(currentEpoch && lastEpoch > currentEpoch);

Expand All @@ -52,11 +53,11 @@ export class DbSyncEpochPollService implements EpochMonitor {
}

/**
* Gets the last epoch number stored in db
* Query the last epoch number stored in db
*
* @returns {number} epoch number
*/
async #getLastEpoch() {
async #queryLastEpoch() {
const result: QueryResult<EpochModel> = await this.#db.query(findLastEpoch);
return result.rows[0].no;
}
Expand All @@ -67,7 +68,7 @@ export class DbSyncEpochPollService implements EpochMonitor {
#start() {
if (this.#timeoutId) return;

this.#currentEpoch = this.#getLastEpoch();
this.#currentEpoch = this.#queryLastEpoch();
this.#timeoutId = setInterval(() => this.#executePoll(), this.#interval);
}

Expand All @@ -91,9 +92,9 @@ export class DbSyncEpochPollService implements EpochMonitor {
}

/**
* Get current epoch
* Get last known epoch
*/
async getEpoch() {
async getLastKnownEpoch() {
return await this.#currentEpoch;
}
}
1 change: 0 additions & 1 deletion packages/cardano-services/src/util/polling/types.ts
Expand Up @@ -2,5 +2,4 @@ export type Disposer = () => void;

export interface EpochMonitor {
onEpochRollover(callback: () => void): Disposer;
getEpoch(): Promise<number>;
}
Expand Up @@ -111,7 +111,7 @@ describe('NetworkInfoHttpService', () => {
it('should start epoch monitor once the db provider is initialized and started', async () => {
await sleep(epochPollInterval * 2);

expect(await epochMonitor.getEpoch()).toBeDefined();
expect(await epochMonitor.getLastKnownEpoch()).toBeDefined();
expect(clearCacheSpy).not.toHaveBeenCalled();
});
});
Expand Down Expand Up @@ -206,7 +206,7 @@ describe('NetworkInfoHttpService', () => {
await provider.stake();
expect(cache.keys().length).toEqual(stakeTotalQueriesCount);
await sleep(epochPollInterval * 2);
expect(await epochMonitor.getEpoch()).toEqual(currentEpochNo);
expect(await epochMonitor.getLastKnownEpoch()).toEqual(currentEpochNo);
expect(cache.keys().length).toEqual(stakeTotalQueriesCount);
expect(dbConnectionQuerySpy).toBeCalledTimes(totalQueriesCount);
expect(cardanoNodeStakeSpy).toHaveBeenCalledTimes(stakeNodeQueriesCount);
Expand All @@ -232,7 +232,7 @@ describe('NetworkInfoHttpService', () => {
await sleep(epochPollInterval);
expect(clearCacheSpy).toHaveBeenCalled();

expect(await epochMonitor.getEpoch()).toEqual(greaterEpoch);
expect(await epochMonitor.getLastKnownEpoch()).toEqual(greaterEpoch);
expect(cache.keys().length).toEqual(0);
}, db)
);
Expand Down
Expand Up @@ -128,7 +128,7 @@ describe('StakePoolHttpService', () => {
it('should start epoch monitor once the db provider is initialized and started', async () => {
await sleep(epochPollInterval * 2);

expect(await epochMonitor.getEpoch()).toBeDefined();
expect(await epochMonitor.getLastKnownEpoch()).toBeDefined();
expect(clearCacheSpy).not.toHaveBeenCalled();
});
});
Expand Down Expand Up @@ -216,7 +216,7 @@ describe('StakePoolHttpService', () => {

await sleep(epochPollInterval);

expect(await epochMonitor.getEpoch()).toEqual(currentEpochNo);
expect(await epochMonitor.getLastKnownEpoch()).toEqual(currentEpochNo);
expect(cache.keys().length).toEqual(cacheKeysCount);
expect(dbConnectionQuerySpy).toBeCalledTimes(
cachedSubQueriesCount + nonCacheableSubQueriesCount + DB_POLL_QUERIES_COUNT
Expand Down Expand Up @@ -247,7 +247,7 @@ describe('StakePoolHttpService', () => {
await sleep(epochPollInterval);
expect(clearCacheSpy).toHaveBeenCalled();

expect(await epochMonitor.getEpoch()).toEqual(greaterEpoch);
expect(await epochMonitor.getLastKnownEpoch()).toEqual(greaterEpoch);
expect(cache.keys().length).toEqual(0);
}, db)
);
Expand All @@ -261,6 +261,9 @@ describe('StakePoolHttpService', () => {
expect(response.pageResults.length).toEqual(10);
expect(responseWithPagination.pageResults.length).toEqual(2);
expect(response.pageResults[0]).not.toEqual(responseWithPagination.pageResults[0]);

const responseWithPaginationCached = await provider.queryStakePools(reqWithPagination);
expect(responseWithPagination.pageResults).toEqual(responseWithPaginationCached.pageResults);
});
it('should paginate response with or condition', async () => {
const req: StakePoolQueryOptions = { filters: { _condition: 'or' } };
Expand All @@ -270,6 +273,9 @@ describe('StakePoolHttpService', () => {
expect(response.pageResults.length).toEqual(10);
expect(responseWithPagination.pageResults.length).toEqual(2);
expect(response.pageResults[0]).not.toEqual(responseWithPagination.pageResults[0]);

const responseWithPaginationCached = await provider.queryStakePools(reqWithPagination);
expect(responseWithPagination.pageResults).toEqual(responseWithPaginationCached.pageResults);
});
it('should paginate rewards response', async () => {
const req = { pagination: { limit: 1, startAt: 1 } };
Expand All @@ -279,6 +285,9 @@ describe('StakePoolHttpService', () => {
const response = await provider.queryStakePools(req);
expect(response.pageResults[0].epochRewards.length).toEqual(1);
expect(responseWithPagination.pageResults[0].epochRewards.length).toEqual(0);

const responseCached = await provider.queryStakePools(req);
expect(response.pageResults).toEqual(responseCached.pageResults);
});
it('should paginate rewards response with or condition', async () => {
const req: StakePoolQueryOptions = { filters: { _condition: 'or' }, pagination: { limit: 1, startAt: 1 } };
Expand All @@ -288,6 +297,9 @@ describe('StakePoolHttpService', () => {
const response = await provider.queryStakePools(req);
expect(response.pageResults[0].epochRewards.length).toEqual(1);
expect(responseWithPagination.pageResults[0].epochRewards.length).toEqual(0);

const responseCached = await provider.queryStakePools(req);
expect(response.pageResults).toEqual(responseCached.pageResults);
});
it('should cache paginated response', async () => {
const reqWithPagination: StakePoolQueryOptions = { pagination: { limit: 2, startAt: 1 } };
Expand Down Expand Up @@ -358,6 +370,10 @@ describe('StakePoolHttpService', () => {
const responseWithOrCondition = await provider.queryStakePools(setFilterCondition(req, 'or'));
expect(response).toMatchSnapshot();
expect(response).toEqual(responseWithOrCondition);

const responseCached = await provider.queryStakePools(req);
expect(response).toEqual(responseCached);
expect(responseWithOrCondition).toEqual(responseCached);
});
it('stake pools do not match identifier filter', async () => {
const req = {
Expand Down Expand Up @@ -726,11 +742,17 @@ describe('StakePoolHttpService', () => {
it('desc order', async () => {
const response = await provider.queryStakePools(setSortCondition({}, 'desc', 'name'));
expect(response).toMatchSnapshot();

const responseCached = await provider.queryStakePools(setSortCondition({}, 'desc', 'name'));
expect(response.pageResults).toEqual(responseCached.pageResults);
});

it('asc order', async () => {
const response = await provider.queryStakePools(setSortCondition({}, 'asc', 'name'));
expect(response).toMatchSnapshot();

const responseCached = await provider.queryStakePools(setSortCondition({}, 'asc', 'name'));
expect(response.pageResults).toEqual(responseCached.pageResults);
});

it('if sort not provided, defaults to order by name and then by poolId asc', async () => {
Expand Down Expand Up @@ -801,23 +823,30 @@ describe('StakePoolHttpService', () => {
});

it('with applied filters', async () => {
const response = await provider.queryStakePools(
setSortCondition(setFilterCondition(filterArgs, 'or'), 'desc', 'name')
);
const reqWithFilters = setSortCondition(setFilterCondition(filterArgs, 'or'), 'desc', 'name');
const response = await provider.queryStakePools(reqWithFilters);
expect(response).toMatchSnapshot();

const responseCached = await provider.queryStakePools(reqWithFilters);
expect(response.pageResults).toEqual(responseCached.pageResults);
});

it('asc order with applied pagination', async () => {
const firstPageResultSet = await provider.queryStakePools(
setSortCondition(setPagination({}, 0, 3), 'asc', 'name')
);
const firstPageReq = setSortCondition(setPagination({}, 0, 3), 'asc', 'name');
const secondPageReq = setSortCondition(setPagination({}, 3, 3), 'asc', 'name');

const secondPageResultSet = await provider.queryStakePools(
setSortCondition(setPagination({}, 3, 3), 'asc', 'name')
);
const firstPageResultSet = await provider.queryStakePools(firstPageReq);

const secondPageResultSet = await provider.queryStakePools(secondPageReq);

expect(firstPageResultSet).toMatchSnapshot();
expect(secondPageResultSet).toMatchSnapshot();

const firstResponseCached = await provider.queryStakePools(firstPageReq);
const secondResponseCached = await provider.queryStakePools(secondPageReq);

expect(firstPageResultSet).toEqual(firstResponseCached);
expect(secondPageResultSet).toEqual(secondResponseCached);
});

it('asc order with applied pagination, with change sort order on next page', async () => {
Expand Down
@@ -1,22 +1,15 @@
import { DbSyncEpochPollService } from '../../../src/util';
import { InMemoryCache, UNLIMITED_CACHE_TTL } from '../../../src/InMemoryCache';
import { Pool } from 'pg';
import { ingestDbData, sleep, wrapWithTransaction } from '../../util';

describe('DbSyncEpochPollService', () => {
const epochPollInterval = 2 * 1000;
const cache = new InMemoryCache(UNLIMITED_CACHE_TTL);
const db = new Pool({ connectionString: process.env.POSTGRES_CONNECTION_STRING, max: 1, min: 1 });
const epochMonitor = new DbSyncEpochPollService(db, epochPollInterval!);

describe('healthy state', () => {
afterAll(async () => {
await db.end();
cache.shutdown();
});

beforeEach(async () => {
cache.clear();
});

it(
Expand All @@ -27,12 +20,15 @@ describe('DbSyncEpochPollService', () => {

const firstRegisteredCallback = jest.fn();
const secondRegisteredCallback = jest.fn();

expect(await epochMonitor.getLastKnownEpoch()).toEqual(null);

const firstDisposer = epochMonitor.onEpochRollover(firstRegisteredCallback);
const secondDisposer = epochMonitor.onEpochRollover(secondRegisteredCallback);

await sleep(epochPollInterval * 2);

expect(await epochMonitor.getEpoch()).toEqual(currentEpoch);
expect(await epochMonitor.getLastKnownEpoch()).toEqual(currentEpoch);
expect(firstRegisteredCallback).not.toHaveBeenCalled();
expect(secondRegisteredCallback).not.toHaveBeenCalled();

Expand All @@ -47,8 +43,7 @@ describe('DbSyncEpochPollService', () => {

expect(firstRegisteredCallback).toHaveBeenCalled();
expect(secondRegisteredCallback).toHaveBeenCalled();
expect(await epochMonitor.getEpoch()).toEqual(greaterEpoch);
expect(cache.keys().length).toEqual(0);
expect(await epochMonitor.getLastKnownEpoch()).toEqual(greaterEpoch);

// Dispose the registered callbacks in epoch monitor
firstDisposer();
Expand Down

0 comments on commit 9c2bc2f

Please sign in to comment.