/
storeStakePoolMetadataJob.test.ts
137 lines (131 loc) · 4.66 KB
/
storeStakePoolMetadataJob.test.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import {
BlockDataEntity,
BlockEntity,
STAKE_POOL_METADATA_QUEUE,
TypeormStabilityWindowBuffer,
storeBlock,
storeStakePoolMetadataJob,
typeormTransactionCommit,
withTypeormTransaction
} from '../../src';
import { Bootstrap, Mappers, requestNext } from '@cardano-sdk/projection';
import { ChainSyncDataSet, chainSyncData, logger } from '@cardano-sdk/util-dev';
import { ChainSyncEventType } from '@cardano-sdk/core';
import { QueryRunner } from 'typeorm';
import { StakePoolMetadataJob, createPgBoss } from '../../src/pgBoss';
import { createProjectorTilFirst } from './util';
import { defer, filter, from } from 'rxjs';
import { initializeDataSource } from '../util';
const testPromise = () => {
let resolvePromise: Function;
const promise = new Promise<void>((resolve) => (resolvePromise = resolve));
return [promise, resolvePromise!] as const;
};
describe('storeStakePoolMetadataJob', () => {
const stubEvents = chainSyncData(ChainSyncDataSet.WithPoolRetirement);
let queryRunner: QueryRunner;
let buffer: TypeormStabilityWindowBuffer;
const project$ = () =>
Bootstrap.fromCardanoNode({
buffer,
cardanoNode: stubEvents.cardanoNode,
logger
}).pipe(
// skipping 1st event because it's not rolled back
filter((evt) => {
const SKIP = 32_159;
if (evt.block.header.blockNo <= SKIP) {
evt.requestNext();
}
return evt.block.header.blockNo > SKIP;
}),
Mappers.withCertificates(),
Mappers.withStakePools(),
withTypeormTransaction(
{
dataSource$: defer(() =>
from(
initializeDataSource({
entities: [BlockEntity, BlockDataEntity],
extensions: { pgBoss: true }
})
)
),
logger
},
{ pgBoss: true }
),
storeBlock(),
storeStakePoolMetadataJob(),
buffer.storeBlockData(),
typeormTransactionCommit(),
requestNext()
);
const projectTilFirst = createProjectorTilFirst(project$);
const projectTilFirstPoolUpdateWithMetadata = () =>
projectTilFirst((evt) => evt.stakePools.updates.some((update) => update.poolParameters.metadataJson));
beforeEach(async () => {
const dataSource = await initializeDataSource({
entities: [BlockEntity, BlockDataEntity],
extensions: { pgBoss: true }
});
queryRunner = dataSource.createQueryRunner();
buffer = new TypeormStabilityWindowBuffer({ allowNonSequentialBlockHeights: true, logger });
await buffer.initialize(queryRunner);
});
afterEach(async () => {
await queryRunner.release();
buffer.shutdown();
});
it('creates jobs referencing Block table that can be picked up by a worker', async () => {
const { block } = await projectTilFirstPoolUpdateWithMetadata();
const jobQueryResult = await queryRunner.query(`SELECT * FROM pgboss.job WHERE block_slot=${block.header.slot}`);
expect(jobQueryResult).toHaveLength(1);
const boss = createPgBoss(queryRunner, logger);
await boss.start();
const [jobComplete, resolveJobComplete] = testPromise();
void boss.work<StakePoolMetadataJob, boolean>(
STAKE_POOL_METADATA_QUEUE,
{ newJobCheckInterval: 100 },
async ({ data }) => {
expect(typeof data.metadataJson).toBe('object');
resolveJobComplete();
return true;
}
);
await jobComplete;
await boss.stop();
});
it('rollbacks do not brick the worker', async () => {
const { block } = await projectTilFirstPoolUpdateWithMetadata();
const boss = createPgBoss(queryRunner, logger);
await boss.start();
const [rollbackComplete, resolveRollbackComplete] = testPromise();
const [job1Complete, resolveJob1Complete] = testPromise();
const [job2Complete, resolveJob2Complete] = testPromise();
void boss.work<StakePoolMetadataJob, boolean>(
STAKE_POOL_METADATA_QUEUE,
jest
.fn()
.mockImplementationOnce(async () => {
await rollbackComplete;
resolveJob1Complete();
return Promise.reject<boolean>('Failed to write metadata because poolRegistration no longer exists');
})
.mockImplementationOnce(async () => {
resolveJob2Complete();
return true;
})
);
const { block: rollbackBlock } = await projectTilFirst(
(evt) => evt.eventType === ChainSyncEventType.RollBackward && evt.block.header.blockNo === block.header.blockNo
);
// sanity check
expect(rollbackBlock.header.hash).toEqual(block.header.hash);
resolveRollbackComplete();
await job1Complete;
await projectTilFirstPoolUpdateWithMetadata();
await job2Complete;
await boss.stop();
});
});