Skip to content

Commit

Permalink
feat: integrated TxSubmitWorker in cli
Browse files Browse the repository at this point in the history
  • Loading branch information
iccicci committed May 23, 2022
1 parent ced7b77 commit 6da3db8
Show file tree
Hide file tree
Showing 20 changed files with 317 additions and 80 deletions.
@@ -1,9 +1,11 @@
export enum ProgramOptionDescriptions {
import { CommonOptionDescriptions } from '../ProgramsCommon';

enum HttpServerOptionDescriptions {
ApiUrl = 'API URL',
DbConnection = 'DB Connection',
LoggerMinSeverity = 'Log level',
MetricsEnabled = 'Enable Prometheus Metrics',
OgmiosUrl = 'Ogmios URL',
RabbitMQUrl = 'RabbitMQ URL',
UseQueue = 'Enables RabbitMQ'
}

export type ProgramOptionDescriptions = CommonOptionDescriptions | HttpServerOptionDescriptions;
export const ProgramOptionDescriptions = { ...CommonOptionDescriptions, ...HttpServerOptionDescriptions };
9 changes: 1 addition & 8 deletions packages/cardano-services/src/Program/defaults.ts
@@ -1,10 +1,3 @@
import { createConnectionObject } from '@cardano-sdk/ogmios';
export { OGMIOS_URL_DEFAULT, RABBITMQ_URL_DEFAULT } from '../ProgramsCommon';

export const API_URL_DEFAULT = 'http://localhost:3000';

export const OGMIOS_URL_DEFAULT = (() => {
const connection = createConnectionObject();
return connection.address.webSocket;
})();

export const RABBITMQ_URL_DEFAULT = 'amqp://localhost:5672';
18 changes: 9 additions & 9 deletions packages/cardano-services/src/Program/loadHttpServer.ts
@@ -1,25 +1,25 @@
import { CommonProgramOptions } from '../ProgramsCommon';
import { DbSyncStakePoolSearchProvider, StakePoolSearchHttpService } from '../StakePoolSearch';
import { HttpServer, HttpServerConfig, HttpService } from '../Http';
import { LogLevel, createLogger } from 'bunyan';
import { MissingProgramOption, UnknownServiceName } from './errors';
import { Pool } from 'pg';
import { ProgramOptionDescriptions } from './ProgramOptionDescriptions';
import { RabbitMqTxSubmitProvider } from '@cardano-sdk/rabbitmq';
import { ServiceNames } from './ServiceNames';
import { TxSubmitHttpService } from '../TxSubmit';
import { createLogger } from 'bunyan';
import { ogmiosTxSubmitProvider, urlToConnectionConfig } from '@cardano-sdk/ogmios';

export interface HttpServerOptions extends CommonProgramOptions {
dbConnectionString?: string;
metricsEnabled?: boolean;
useQueue?: boolean;
}

export interface ProgramArgs {
apiUrl: URL;
serviceNames: (ServiceNames.StakePoolSearch | ServiceNames.TxSubmit)[];
options?: {
dbConnectionString?: string;
loggerMinSeverity?: LogLevel;
metricsEnabled?: boolean;
ogmiosUrl?: URL;
rabbitmqUrl?: URL;
useQueue?: boolean;
};
options?: HttpServerOptions;
}

export const loadHttpServer = async (args: ProgramArgs): Promise<HttpServer> => {
Expand Down
@@ -0,0 +1,5 @@
export enum CommonOptionDescriptions {
LoggerMinSeverity = 'Log level',
OgmiosUrl = 'Ogmios URL',
RabbitMQUrl = 'RabbitMQ URL'
}
8 changes: 8 additions & 0 deletions packages/cardano-services/src/ProgramsCommon/defaults.ts
@@ -0,0 +1,8 @@
import { createConnectionObject } from '@cardano-sdk/ogmios';

export const OGMIOS_URL_DEFAULT = (() => {
const connection = createConnectionObject();
return connection.address.webSocket;
})();

export const RABBITMQ_URL_DEFAULT = 'amqp://localhost:5672';
3 changes: 3 additions & 0 deletions packages/cardano-services/src/ProgramsCommon/index.ts
@@ -0,0 +1,3 @@
export * from './CommonOptionDescriptions';
export * from './defaults';
export * from './options';
12 changes: 12 additions & 0 deletions packages/cardano-services/src/ProgramsCommon/options.ts
@@ -0,0 +1,12 @@
import { LogLevel } from 'bunyan';

/**
* Common options for programs:
* - HTTP server
* - RabbitMQ worker
*/
export interface CommonProgramOptions {
loggerMinSeverity?: LogLevel;
ogmiosUrl?: URL;
rabbitmqUrl?: URL;
}
@@ -0,0 +1,4 @@
export enum TxWorkerOptionDescriptions {
Parallel = 'Parallel mode',
PollingCycle = 'Polling cycle'
}
5 changes: 5 additions & 0 deletions packages/cardano-services/src/TxWorker/defaults.ts
@@ -0,0 +1,5 @@
export { OGMIOS_URL_DEFAULT, RABBITMQ_URL_DEFAULT } from '../ProgramsCommon';

export const PARALLEL_MODE_DEFAULT = false;

export const POLLING_CYCLE_DEFAULT = 500;
@@ -0,0 +1,10 @@
import { CustomError } from 'ts-custom-error';
import { ServiceNames } from '../../Program';
import { TxWorkerOptionDescriptions } from '../TxWorkerOptionDescriptions';

export class WrongProgramOption extends CustomError {
public constructor(service: ServiceNames, option: TxWorkerOptionDescriptions, expected: string[]) {
super();
this.message = `${service} requires a valid ${option} program option. Expected: ${expected.join(', ')}`;
}
}
1 change: 1 addition & 0 deletions packages/cardano-services/src/TxWorker/errors/index.ts
@@ -0,0 +1 @@
export * from './WrongTxWorkerOption';
4 changes: 4 additions & 0 deletions packages/cardano-services/src/TxWorker/index.ts
@@ -0,0 +1,4 @@
export * from './defaults';
export * from './errors';
export * from './loadTxWorker';
export * from './TxWorkerOptionDescriptions';
22 changes: 22 additions & 0 deletions packages/cardano-services/src/TxWorker/loadTxWorker.ts
@@ -0,0 +1,22 @@
import { CommonOptionDescriptions, CommonProgramOptions } from '../ProgramsCommon';
import { MissingProgramOption, ServiceNames } from '../Program';
import { TxSubmitWorker, TxSubmitWorkerOptions } from '@cardano-sdk/rabbitmq';
import { createLogger } from 'bunyan';
import { ogmiosTxSubmitProvider, urlToConnectionConfig } from '@cardano-sdk/ogmios';

export type TxWorkerOptions = CommonProgramOptions & Pick<TxSubmitWorkerOptions, 'parallel' | 'pollingCycle'>;

export interface TxWorkerArgs {
options: TxWorkerOptions;
}

export const loadTxWorker = async (args: TxWorkerArgs) => {
const { loggerMinSeverity, ogmiosUrl, rabbitmqUrl } = args.options;
const txSubmitProvider = ogmiosTxSubmitProvider(urlToConnectionConfig(ogmiosUrl));
const logger = createLogger({ level: loggerMinSeverity, name: 'tx-worker' });

// Ensure rabbitmqUrl is not undefined
if (!rabbitmqUrl) throw new MissingProgramOption(ServiceNames.TxSubmit, CommonOptionDescriptions.RabbitMQUrl);

return new TxSubmitWorker({ ...args.options, logger, rabbitmqUrl, txSubmitProvider });
};
100 changes: 71 additions & 29 deletions packages/cardano-services/src/cli.ts
Expand Up @@ -3,15 +3,24 @@
require('../scripts/patchRequire');
import {
API_URL_DEFAULT,
HttpServerOptions,
OGMIOS_URL_DEFAULT,
ProgramArgs,
ProgramOptionDescriptions,
RABBITMQ_URL_DEFAULT,
ServiceNames,
loadHttpServer
} from './Program';
import { Command } from 'commander';
import { CommonOptionDescriptions } from './ProgramsCommon';
import { InvalidLoggerLevel } from './errors';
import {
PARALLEL_MODE_DEFAULT,
POLLING_CYCLE_DEFAULT,
TxWorkerOptionDescriptions,
TxWorkerOptions,
WrongProgramOption,
loadTxWorker
} from './TxWorker';
import { URL } from 'url';
import { loggerMethodNames } from './util';
import onDeath from 'death';
Expand All @@ -22,44 +31,49 @@ clear();
// eslint-disable-next-line no-console
console.log('Cardano Services CLI');

const commonOptions = (command: Command) =>
command
.option(
'--logger-min-severity <level>',
CommonOptionDescriptions.LoggerMinSeverity,
(level) => {
if (!loggerMethodNames.includes(level)) {
throw new InvalidLoggerLevel(level);
}
return level;
},
'info'
)
.option(
'--ogmios-url <ogmiosUrl>',
CommonOptionDescriptions.OgmiosUrl,
(url) => new URL(url),
new URL(OGMIOS_URL_DEFAULT)
)
.option(
'--rabbitmq-url <rabbitMQUrl>',
CommonOptionDescriptions.RabbitMQUrl,
(url) => new URL(url),
new URL(RABBITMQ_URL_DEFAULT)
);

const program = new Command('cardano-services');

program.version(packageJson.version);

program
.command('start-server')
.description('Start the HTTP server')
.argument('<serviceNames...>', `List of services to attach: ${Object.values(ServiceNames).toString()}`)
commonOptions(
program
.command('start-server')
.description('Start the HTTP server')
.argument('<serviceNames...>', `List of services to attach: ${Object.values(ServiceNames).toString()}`)
)
.option('--api-url <apiUrl>', ProgramOptionDescriptions.ApiUrl, (url) => new URL(url), new URL(API_URL_DEFAULT))
.option('--enable-metrics <metricsEnabled>', ProgramOptionDescriptions.MetricsEnabled, false)
.option('--db-connection-string <dbConnectionString>', ProgramOptionDescriptions.DbConnection, (url) =>
new URL(url).toString()
)
.option(
'--ogmios-url <ogmiosUrl>',
ProgramOptionDescriptions.OgmiosUrl,
(url) => new URL(url),
new URL(OGMIOS_URL_DEFAULT)
)
.option(
'--rabbitmq-url <rabbitMQUrl>',
ProgramOptionDescriptions.RabbitMQUrl,
(url) => new URL(url),
new URL(RABBITMQ_URL_DEFAULT)
)
.option('--use-queue', ProgramOptionDescriptions.UseQueue, () => true, false)
.option(
'--logger-min-severity <level>',
ProgramOptionDescriptions.LoggerMinSeverity,
(level) => {
if (!loggerMethodNames.includes(level)) {
throw new InvalidLoggerLevel(level);
}
return level;
},
'info'
)
.action(async (serviceNames: ServiceNames[], options: { apiUrl: URL } & NonNullable<ProgramArgs['options']>) => {
.action(async (serviceNames: ServiceNames[], options: { apiUrl: URL } & HttpServerOptions) => {
const { apiUrl, ...rest } = options;
const server = await loadHttpServer({ apiUrl: apiUrl || API_URL_DEFAULT, options: rest, serviceNames });
await server.initialize();
Expand All @@ -70,6 +84,34 @@ program
});
});

commonOptions(program.command('start-worker').description('Start RabbitMQ worker'))
.option(
'--parallel [parallel]',
TxWorkerOptionDescriptions.Parallel,
(parallel) => {
if (parallel === 'false') return false;
if (parallel === 'true') return true;
throw new WrongProgramOption(ServiceNames.TxSubmit, TxWorkerOptionDescriptions.Parallel, ['false', 'true']);
},
PARALLEL_MODE_DEFAULT
)
.option(
'--polling-cycle <pollingCycle>',
TxWorkerOptionDescriptions.PollingCycle,
(pollingCycle) => Number.parseInt(pollingCycle, 10),
POLLING_CYCLE_DEFAULT
)
.action(async (options: TxWorkerOptions) => {
// eslint-disable-next-line no-console
console.log(`RabbitMQ transactions worker: ${options.parallel ? 'parallel' : 'serial'} mode`);
const txWorker = await loadTxWorker({ options });
await txWorker.start();
onDeath(async () => {
await txWorker.stop();
process.exit(1);
});
});

if (process.argv.slice(2).length === 0) {
program.outputHelp();
process.exit(1);
Expand Down
1 change: 1 addition & 0 deletions packages/cardano-services/src/index.ts
Expand Up @@ -3,4 +3,5 @@ export * from './Program';
export * from './RunnableModule';
export * from './StakePoolSearch';
export * from './TxSubmit';
export * from './TxWorker';
export * as errors from './errors';

0 comments on commit 6da3db8

Please sign in to comment.