Skip to content

Commit

Permalink
feat(projection-typeorm): add stake pools metrics computation job sch…
Browse files Browse the repository at this point in the history
…edule
  • Loading branch information
iccicci committed May 29, 2023
1 parent 46edd0b commit a5f56e9
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 7 deletions.
@@ -0,0 +1,46 @@
import { BigIntColumnOptions, DeleteCascadeRelationOptions } from './util';
import { Cardano } from '@cardano-sdk/core';
import { Column, Entity, JoinColumn, OneToOne, PrimaryColumn } from 'typeorm';
import { StakePoolEntity } from './StakePool.entity';
import { float } from './transformers';

@Entity()
export class CurrentPoolMetricsEntity {
// Using the same column for both primary and foreign key
@PrimaryColumn({ length: 56, type: 'char' })
stakePoolId?: Cardano.PoolId;

@Column()
slot: Cardano.Slot;

@OneToOne(() => StakePoolEntity, DeleteCascadeRelationOptions)
@JoinColumn()
stakePool?: StakePoolEntity;

@Column({ type: 'integer' })
mintedBlocks?: number;

@Column({ type: 'integer' })
liveDelegators?: number;

@Column(BigIntColumnOptions)
activeStake?: Cardano.Lovelace;

@Column(BigIntColumnOptions)
liveStake?: Cardano.Lovelace;

@Column(BigIntColumnOptions)
livePledge?: Cardano.Lovelace;

@Column({ transformer: float, type: 'numeric' })
liveSaturation?: Cardano.Percent;

@Column({ transformer: float, type: 'numeric' })
activeSize?: Cardano.Percent;

@Column({ transformer: float, type: 'numeric' })
liveSize?: Cardano.Percent;

@Column({ transformer: float, type: 'numeric' })
apy?: Cardano.Percent;
}
4 changes: 4 additions & 0 deletions packages/projection-typeorm/src/entity/StakePool.entity.ts
@@ -1,5 +1,6 @@
import { Cardano } from '@cardano-sdk/core';
import { Column, Entity, Index, JoinColumn, OneToMany, OneToOne, PrimaryColumn, RelationOptions } from 'typeorm';
import { CurrentPoolMetricsEntity } from './CurrentPoolMetrics.entity';
import { PoolRegistrationEntity } from './PoolRegistration.entity';
import { PoolRetirementEntity } from './PoolRetirement.entity';

Expand Down Expand Up @@ -28,4 +29,7 @@ export class StakePoolEntity {
@JoinColumn()
@OneToOne(() => PoolRetirementEntity, OnDeleteSetNull)
lastRetirement?: PoolRetirementEntity | null;

@OneToOne(() => CurrentPoolMetricsEntity, (metric) => metric.stakePool)
metrics?: CurrentPoolMetricsEntity | null;
}
1 change: 1 addition & 0 deletions packages/projection-typeorm/src/entity/index.ts
Expand Up @@ -8,3 +8,4 @@ export * from './PoolMetadata.entity';
export * from './Asset.entity';
export * from './Tokens.entity';
export * from './Output.entity';
export * from './CurrentPoolMetrics.entity';
9 changes: 9 additions & 0 deletions packages/projection-typeorm/src/entity/transformers.ts
Expand Up @@ -2,6 +2,15 @@
import { ValueTransformer } from 'typeorm';
import { fromSerializableObject, toSerializableObject } from '@cardano-sdk/util';

export const float: ValueTransformer = {
from(data: string) {
return Number.parseFloat(data);
},
to(data: number) {
return data;
}
};

export const stringBytea: ValueTransformer = {
from(bytea: Buffer) {
return bytea.toString('utf8');
Expand Down
4 changes: 3 additions & 1 deletion packages/projection-typeorm/src/index.ts
Expand Up @@ -7,7 +7,9 @@ export * from './TypeormStabilityWindowBuffer';
export * from './isRecoverableTypeormError';
export {
STAKE_POOL_METADATA_QUEUE,
StakePoolMetadataJob as StakePoolMetadataTask,
STAKE_POOL_METRICS_UPDATE,
StakePoolMetadataJob,
StakePoolMetricsUpdateJob,
createPgBoss,
createPgBossExtension
} from './pgBoss';
1 change: 1 addition & 0 deletions packages/projection-typeorm/src/operators/index.ts
Expand Up @@ -6,3 +6,4 @@ export * from './storeBlock';
export * from './storeAssets';
export * from './storeUtxo';
export * from './util';
export * from './storePoolMetricsUpdateJob';
@@ -0,0 +1,26 @@
import { Cardano, ChainSyncEventType } from '@cardano-sdk/core';
import { STAKE_POOL_METRICS_UPDATE, StakePoolMetricsUpdateJob } from '../pgBoss';
import { WithPgBoss } from './withTypeormTransaction';
import { typeormOperator } from './util';

export const createStorePoolMetricsUpdateJob = (jobFrequency = 1000) => {
// Remember the blockNo of last sent job in order to no resend another job in case of rollback
let lastSentBlock: Cardano.BlockNo | undefined;
let reachedTheTip = false;

return typeormOperator<WithPgBoss>(async ({ eventType, pgBoss, block: { header }, tip }) => {
let insertFirstJob = false;

if (eventType === ChainSyncEventType.RollBackward) return;
if (!reachedTheTip) insertFirstJob = reachedTheTip = tip.slot === header.slot;

const { blockNo, slot } = header;

if (insertFirstJob || (blockNo % jobFrequency === 0 && blockNo !== lastSentBlock && reachedTheTip)) {
const task: StakePoolMetricsUpdateJob = { slot };

lastSentBlock = blockNo;
await pgBoss.send(STAKE_POOL_METRICS_UPDATE, task, { slot });
}
});
};
5 changes: 5 additions & 0 deletions packages/projection-typeorm/src/pgBoss.ts
Expand Up @@ -12,6 +12,7 @@ import Attorney from 'pg-boss/src/attorney';
import PgBoss, { SendOptions } from 'pg-boss';

export const STAKE_POOL_METADATA_QUEUE = 'pool-metadata';
export const STAKE_POOL_METRICS_UPDATE = 'pool-metrics';

export interface PgBossExtension {
send: <T extends object>(
Expand All @@ -21,6 +22,10 @@ export interface PgBossExtension {
) => Promise<string | null>;
}

export interface StakePoolMetricsUpdateJob {
slot: Cardano.Slot;
}

export interface StakePoolMetadataJob {
/**
* bigint
Expand Down
@@ -0,0 +1,57 @@
import { OperatorFunction, of } from 'rxjs';
import { STAKE_POOL_METRICS_UPDATE, createStorePoolMetricsUpdateJob } from '../../src';

const testPromise = () => {
let resolvePromise: Function;
const promise = new Promise<void>((resolve) => (resolvePromise = resolve));
return [promise, resolvePromise!] as const;
};

describe('createStorePoolMetricsUpdateJob', () => {
it('sends jobs only at expected blocks', async () => {
let counter = 0;
const [promise, resolver] = testPromise();
const send = jest.fn(() => {
if (++counter === 3) resolver();
return Promise.resolve();
});
const createEvent = (blockNo: number, tipSlot?: number) => ({
block: { header: { blockNo, slot: blockNo * 10 } },
eventType: 0,
pgBoss: { send },
tip: { slot: tipSlot ?? blockNo * 10 }
});

of(
createEvent(2, 80),
createEvent(3, 80),
createEvent(4, 80),
createEvent(5, 80), // doesn't generate event since tip is not reached
createEvent(6, 80),
createEvent(7, 80),
createEvent(8), // generates first event once tip is reached
createEvent(9),
createEvent(10), // generates a std event
createEvent(11),
createEvent(10), // doesn't generate event due to rollback
createEvent(11),
createEvent(12),
createEvent(13),
createEvent(14),
createEvent(15), // generates a std event
createEvent(16)
)
.pipe(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
createStorePoolMetricsUpdateJob(5)() as OperatorFunction<any, any>
)
.subscribe();

await promise;

expect(send).toBeCalledTimes(3);
expect(send).toBeCalledWith(STAKE_POOL_METRICS_UPDATE, { slot: 80 }, { slot: 80 });
expect(send).toBeCalledWith(STAKE_POOL_METRICS_UPDATE, { slot: 100 }, { slot: 100 });
expect(send).toBeCalledWith(STAKE_POOL_METRICS_UPDATE, { slot: 150 }, { slot: 150 });
});
});
Expand Up @@ -89,11 +89,15 @@ describe('storeStakePoolMetadataJob', () => {
const boss = createPgBoss(queryRunner, logger);
await boss.start();
const [jobComplete, resolveJobComplete] = testPromise();
void boss.work<StakePoolMetadataJob, boolean>(STAKE_POOL_METADATA_QUEUE, async ({ data }) => {
expect(typeof data.metadataJson).toBe('object');
resolveJobComplete();
return true;
});
void boss.work<StakePoolMetadataJob, boolean>(
STAKE_POOL_METADATA_QUEUE,
{ newJobCheckInterval: 100 },
async ({ data }) => {
expect(typeof data.metadataJson).toBe('object');
resolveJobComplete();
return true;
}
);
await jobComplete;
await boss.stop();
});
Expand Down
@@ -1,6 +1,7 @@
import {
BlockDataEntity,
BlockEntity,
CurrentPoolMetricsEntity,
PoolRegistrationEntity,
PoolRetirementEntity,
StakePoolEntity,
Expand Down Expand Up @@ -56,7 +57,14 @@ describe('storeStakePools', () => {

beforeEach(async () => {
dataSource = await initializeDataSource({
entities: [BlockDataEntity, BlockEntity, StakePoolEntity, PoolRegistrationEntity, PoolRetirementEntity]
entities: [
BlockDataEntity,
BlockEntity,
CurrentPoolMetricsEntity,
PoolRegistrationEntity,
PoolRetirementEntity,
StakePoolEntity
]
});
queryRunner = dataSource.createQueryRunner();
poolsRepo = queryRunner.manager.getRepository(StakePoolEntity);
Expand Down

0 comments on commit a5f56e9

Please sign in to comment.