-
Notifications
You must be signed in to change notification settings - Fork 54
/
pgBossWorker.ts
68 lines (56 loc) · 2.62 KB
/
pgBossWorker.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import { CommonProgramOptions, PosgresProgramOptions, PostgresOptionDescriptions } from '../options';
import { HttpServer } from '../../Http/HttpServer';
import { Logger } from 'ts-log';
import { MissingProgramOption } from '../errors';
import { PgBossHttpService, PgBossServiceConfig, PgBossServiceDependencies } from '../services/pgboss';
import { STAKE_POOL_METRICS_UPDATE } from '@cardano-sdk/projection-typeorm';
import { SrvRecord } from 'dns';
import { createDnsResolver } from '../utils';
import { createLogger } from 'bunyan';
import { getConnectionConfig, getPool } from '../services/postgres';
import { getListen } from '../../Http/util';
export const PARALLEL_JOBS_DEFAULT = 10;
export const PG_BOSS_WORKER_API_URL_DEFAULT = new URL('http://localhost:3003');
export enum PgBossWorkerOptionDescriptions {
ParallelJobs = 'Parallel jobs to run',
Queues = 'Comma separated queue names',
StakePoolProviderUrl = 'Stake pool provider URL'
}
export type PgBossWorkerArgs = CommonProgramOptions &
PosgresProgramOptions<'DbSync'> &
PosgresProgramOptions<'StakePool'> &
PgBossServiceConfig;
export interface LoadPgBossWorkerDependencies {
dnsResolver?: (serviceName: string) => Promise<SrvRecord>;
logger?: Logger;
}
const pgBossWorker = 'pg-boss-worker';
export interface PgBossWorkerConfig extends PgBossServiceConfig {
apiUrl: URL;
}
export class PgBossWorkerHttpServer extends HttpServer {
constructor(cfg: PgBossWorkerConfig, deps: PgBossServiceDependencies) {
const { apiUrl } = cfg;
const { logger } = deps;
const pgBossService = new PgBossHttpService(cfg, deps);
super(
{ listen: getListen(apiUrl), name: pgBossWorker },
{ logger, runnableDependencies: [], services: [pgBossService] }
);
}
}
export const loadPgBossWorker = async (args: PgBossWorkerArgs, deps: LoadPgBossWorkerDependencies = {}) => {
const logger = deps?.logger || createLogger({ level: args.loggerMinSeverity, name: pgBossWorker });
const dnsResolver =
deps?.dnsResolver ||
createDnsResolver(
{ factor: args.serviceDiscoveryBackoffFactor, maxRetryTime: args.serviceDiscoveryTimeout },
logger
);
const connectionConfig$ = getConnectionConfig(dnsResolver, pgBossWorker, args);
const db = await getPool(dnsResolver, logger, args);
if (!db) throw new MissingProgramOption(pgBossWorker, PostgresOptionDescriptions.ConnectionString);
if (args.queues.includes(STAKE_POOL_METRICS_UPDATE) && !args.stakePoolProviderUrl)
throw new MissingProgramOption(STAKE_POOL_METRICS_UPDATE, PgBossWorkerOptionDescriptions.StakePoolProviderUrl);
return new PgBossWorkerHttpServer(args, { connectionConfig$, db, logger });
};