Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion server.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ const {
createMasternodeCountRepo,
} = require('./lib/masternodeCountRepo');
const { seedMasternodeCount } = require('./lib/mnCountSeed');
const { createMnCountLogger } = require('./services/mnCountLogger');
const {
createMnCountLogger,
evaluateCoreSyncReadiness,
} = require('./services/mnCountLogger');

// Per-process cache for `gobject_getcurrentvotes`. Concurrent callers
// hitting GET /gov/receipts for the same proposal share one RPC; a
Expand Down Expand Up @@ -155,6 +158,12 @@ seedMasternodeCount({
// to exit on SIGINT even if a tick is scheduled.
const mnCountLogger = createMnCountLogger({
repo: mnCountRepo,
isReadyForSample: async () => {
const services = rpcServices(client.callRpc);
const blockchainInfo = await services.getBlockchainInfo().call();
const mnSyncStatus = await services.mnSync('status').call();
return evaluateCoreSyncReadiness({ blockchainInfo, mnSyncStatus });
},
fetchTotal: async () => {
const r = await rpcServices(client.callRpc).masternode_count().call();
const total = r && r.total;
Expand Down
47 changes: 47 additions & 0 deletions services/mnCountLogger.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,32 @@ const BASE_RETRY_MS = 60 * 1000;
const MAX_RETRY_MS = 60 * 60 * 1000;
const POST_MIDNIGHT_SKEW_MS = 5 * 1000;

function evaluateCoreSyncReadiness({ blockchainInfo, mnSyncStatus } = {}) {
if (!blockchainInfo || blockchainInfo.initialblockdownload !== false) {
return {
ready: false,
reason: 'initial_block_download',
initialblockdownload: blockchainInfo && blockchainInfo.initialblockdownload,
};
}

if (
!mnSyncStatus ||
mnSyncStatus.IsBlockchainSynced !== true ||
mnSyncStatus.IsSynced !== true
) {
return {
ready: false,
reason: 'masternode_sync_incomplete',
isBlockchainSynced: mnSyncStatus && mnSyncStatus.IsBlockchainSynced,
isSynced: mnSyncStatus && mnSyncStatus.IsSynced,
assetName: mnSyncStatus && mnSyncStatus.AssetName,
};
}

return { ready: true };
}

function utcDateString(ms) {
return new Date(ms).toISOString().slice(0, 10);
}
Expand All @@ -69,6 +95,7 @@ function msUntilNextMidnightUtc(fromMs) {
function createMnCountLogger({
repo,
fetchTotal,
isReadyForSample = async () => ({ ready: true }),
now = () => Date.now(),
log = () => {},
setTimeoutImpl = setTimeout,
Expand All @@ -78,6 +105,9 @@ function createMnCountLogger({
if (typeof fetchTotal !== 'function') {
throw new Error('createMnCountLogger: fetchTotal must be a function');
}
if (typeof isReadyForSample !== 'function') {
throw new Error('createMnCountLogger: isReadyForSample must be a function');
}

let timer = null;
let stopped = false;
Expand All @@ -88,6 +118,22 @@ function createMnCountLogger({
let currentRetryMs = BASE_RETRY_MS;

async function sampleAndWrite(label) {
const readiness = await isReadyForSample();
const ready =
readiness === true ||
(readiness && typeof readiness === 'object' && readiness.ready === true);
if (!ready) {
const meta =
readiness && typeof readiness === 'object'
? { label, ...readiness }
: { label, ready: false };
log('warn', 'mncount_skip_not_synced', meta);
const err = new Error(`mncount source not synced: ${meta.reason || 'unknown'}`);
err.code = 'mncount_not_synced';
err.meta = meta;
throw err;
}

const total = await fetchTotal();
if (!Number.isInteger(total) || total < 0) {
throw new Error(
Expand Down Expand Up @@ -283,6 +329,7 @@ function createMnCountLogger({

module.exports = {
createMnCountLogger,
evaluateCoreSyncReadiness,
utcDateString,
msUntilNextMidnightUtc,
BASE_RETRY_MS,
Expand Down
134 changes: 133 additions & 1 deletion services/mnCountLogger.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const { openDatabase } = require('../lib/db');
const { createMasternodeCountRepo } = require('../lib/masternodeCountRepo');
const {
createMnCountLogger,
evaluateCoreSyncReadiness,
utcDateString,
msUntilNextMidnightUtc,
BASE_RETRY_MS,
Expand Down Expand Up @@ -95,16 +96,65 @@ describe('msUntilNextMidnightUtc', () => {
});
});

describe('evaluateCoreSyncReadiness', () => {
test('requires Core to be out of initial block download', () => {
expect(
evaluateCoreSyncReadiness({
blockchainInfo: { initialblockdownload: true },
mnSyncStatus: { IsBlockchainSynced: true, IsSynced: true },
})
).toMatchObject({
ready: false,
reason: 'initial_block_download',
});
});

test('requires masternode sync to be complete', () => {
expect(
evaluateCoreSyncReadiness({
blockchainInfo: { initialblockdownload: false },
mnSyncStatus: {
IsBlockchainSynced: true,
IsSynced: false,
AssetName: 'MASTERNODE_SYNC_LIST',
},
})
).toMatchObject({
ready: false,
reason: 'masternode_sync_incomplete',
isSynced: false,
});
});

test('accepts a fully synced Core and masternode layer', () => {
expect(
evaluateCoreSyncReadiness({
blockchainInfo: { initialblockdownload: false },
mnSyncStatus: {
IsBlockchainSynced: true,
IsSynced: true,
AssetName: 'MASTERNODE_SYNC_FINISHED',
},
})
).toEqual({ ready: true });
});
});

describe('createMnCountLogger', () => {
let db;
let repo;
let clock;
let scheduler;
let fetchTotal;
let isReadyForSample;
let logs;
let logger;

function setup({ rpcSequence = [], startAtIso = '2024-03-15T12:00:00Z' } = {}) {
function setup({
rpcSequence = [],
readinessSequence = [],
startAtIso = '2024-03-15T12:00:00Z',
} = {}) {
db = openDatabase(':memory:');
repo = createMasternodeCountRepo(db);
clock = { nowMs: msAt(startAtIso) };
Expand All @@ -121,10 +171,20 @@ describe('createMnCountLogger', () => {
return item;
});

let readinessIdx = 0;
isReadyForSample = jest.fn(async () => {
if (readinessIdx >= readinessSequence.length) return { ready: true };
const item = readinessSequence[readinessIdx++];
if (typeof item === 'function') return item();
if (item instanceof Error) throw item;
return item;
});

logs = [];
logger = createMnCountLogger({
repo,
fetchTotal,
isReadyForSample,
now: () => clock.nowMs,
log: (level, event, meta) => logs.push({ level, event, meta }),
setTimeoutImpl: scheduler.setTimeoutImpl,
Expand Down Expand Up @@ -179,6 +239,47 @@ describe('createMnCountLogger', () => {
expect(logs.some((l) => l.event === 'mncount_catchup_failed')).toBe(true);
});

test('catchUpIfNeeded skips the write while Core or mnsync is not ready', async () => {
setup({
rpcSequence: [2239],
readinessSequence: [
{
ready: false,
reason: 'masternode_sync_incomplete',
isBlockchainSynced: true,
isSynced: false,
},
],
startAtIso: '2024-03-15T12:00:00Z',
});

const result = await logger.catchUpIfNeeded();

expect(result).toMatchObject({
skipped: true,
reason: 'error',
err: 'mncount source not synced: masternode_sync_incomplete',
});
expect(fetchTotal).not.toHaveBeenCalled();
expect(repo.isEmpty()).toBe(true);
expect(logs).toEqual(
expect.arrayContaining([
expect.objectContaining({
level: 'warn',
event: 'mncount_skip_not_synced',
meta: expect.objectContaining({
reason: 'masternode_sync_incomplete',
isSynced: false,
}),
}),
expect.objectContaining({
level: 'error',
event: 'mncount_catchup_failed',
}),
])
);
});

test('repeated writes for the same UTC date are idempotent (skip + PK on date)', async () => {
setup({ rpcSequence: [2200], startAtIso: '2024-03-15T12:00:00Z' });
await logger.catchUpIfNeeded();
Expand Down Expand Up @@ -268,6 +369,37 @@ describe('createMnCountLogger', () => {
expect(afterSuccess[0]).toBeGreaterThan(17 * 3600 * 1000);
});

test('start() retries inside the same UTC day until sync is ready', async () => {
setup({
rpcSequence: [2239],
readinessSequence: [
{
ready: false,
reason: 'initial_block_download',
initialblockdownload: true,
},
{ ready: true },
],
startAtIso: '2024-03-15T06:00:00Z',
});

logger.start();
for (let i = 0; i < 20; i++) await new Promise((r) => setImmediate(r));

expect(fetchTotal).not.toHaveBeenCalled();
expect(repo.isEmpty()).toBe(true);
expect(logs.some((l) => l.event === 'mncount_skip_not_synced')).toBe(true);
expect(scheduler.pending()).toHaveLength(1);
expect(scheduler.pending()[0].delay).toBeLessThan(
18 * 3600 * 1000 + POST_MIDNIGHT_SKEW_MS
);

await scheduler.fireNext();

expect(fetchTotal).toHaveBeenCalledTimes(1);
expect(repo.getAll()).toEqual([{ date: '2024-03-15', users: 2239 }]);
});

test('start() keeps backing off on repeated RPC failure, still inside today', async () => {
// Three sequential failures early in a UTC day must all
// schedule retries inside the same day rather than skipping
Expand Down
Loading