Skip to content

Commit

Permalink
Merge pull request #247 from input-output-hk/feat/rabbitmq-tx-submit-…
Browse files Browse the repository at this point in the history
…prvider

feat(rabbitmq): TxSubmit via persistent queue
  • Loading branch information
rhyslbw committed May 17, 2022
2 parents 550a29b + ed915c3 commit 0d23d4c
Show file tree
Hide file tree
Showing 33 changed files with 729 additions and 82 deletions.
2 changes: 0 additions & 2 deletions .eslintignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
.eslintrc.js
*.d.ts
*jest.config.js
node_modules
dist
8 changes: 8 additions & 0 deletions .versionrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ const packageMap = [
filename: 'packages/golden-test-generator/package.json',
type: 'json'
},
{
filename: 'packages/ogmios/package.json',
type: 'json'
},
{
filename: 'packages/rabbitmq/package.json',
type: 'json'
},
{
filename: 'packages/util-dev/package.json',
type: 'json'
Expand Down
4 changes: 3 additions & 1 deletion packages/cardano-services/src/Http/HttpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ export class HttpServer extends RunnableModule {
this.server = await listenPromise(this.app, this.#config.listen);
}

shutdownImpl(): Promise<void> {
async shutdownImpl(): Promise<void> {
for (const service of this.#dependencies.services) await service.close();

return serverClosePromise(this.server);
}
}
4 changes: 4 additions & 0 deletions packages/cardano-services/src/Http/HttpService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,9 @@ export abstract class HttpService {
});
}

async close(): Promise<void> {
return Promise.resolve();
}

protected abstract healthCheck(): Promise<HealthCheckResponse>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ export enum ProgramOptionDescriptions {
DbConnection = 'DB Connection',
LoggerMinSeverity = 'Log level',
MetricsEnabled = 'Enable Prometheus Metrics',
OgmiosUrl = 'Ogmios URL'
OgmiosUrl = 'Ogmios URL',
RabbitMQUrl = 'RabbitMQ URL',
UseQueue = 'Enables RabbitMQ'
}
2 changes: 2 additions & 0 deletions packages/cardano-services/src/Program/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ export const OGMIOS_URL_DEFAULT = (() => {
const connection = createConnectionObject();
return connection.address.webSocket;
})();

export const RABBITMQ_URL_DEFAULT = 'amqp://localhost:5672';
16 changes: 11 additions & 5 deletions packages/cardano-services/src/Program/loadHttpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { LogLevel, createLogger } from 'bunyan';
import { MissingProgramOption, UnknownServiceName } from './errors';
import { Pool } from 'pg';
import { ProgramOptionDescriptions } from './ProgramOptionDescriptions';
import { RabbitMqTxSubmitProvider } from '@cardano-sdk/rabbitmq';
import { ServiceNames } from './ServiceNames';
import { TxSubmitHttpService } from '../TxSubmit';
import { ogmiosTxSubmitProvider } from '@cardano-sdk/ogmios';
Expand All @@ -16,6 +17,8 @@ export interface ProgramArgs {
loggerMinSeverity?: LogLevel;
metricsEnabled?: boolean;
ogmiosUrl?: URL;
rabbitmqUrl?: URL;
useQueue?: boolean;
};
}

Expand Down Expand Up @@ -45,11 +48,14 @@ export const loadHttpServer = async (args: ProgramArgs): Promise<HttpServer> =>
services.push(
await TxSubmitHttpService.create({
logger,
txSubmitProvider: ogmiosTxSubmitProvider({
host: args.options?.ogmiosUrl?.hostname,
port: args.options?.ogmiosUrl ? Number.parseInt(args.options.ogmiosUrl.port) : undefined,
tls: args.options?.ogmiosUrl?.protocol === 'wss'
})
txSubmitProvider:
args.options?.useQueue && args.options?.rabbitmqUrl
? new RabbitMqTxSubmitProvider(args.options.rabbitmqUrl)
: ogmiosTxSubmitProvider({
host: args.options?.ogmiosUrl?.hostname,
port: args.options?.ogmiosUrl ? Number.parseInt(args.options.ogmiosUrl.port) : undefined,
tls: args.options?.ogmiosUrl?.protocol === 'wss'
})
})
);
break;
Expand Down
4 changes: 4 additions & 0 deletions packages/cardano-services/src/TxSubmit/TxSubmitHttpService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ export class TxSubmitHttpService extends HttpService {
this.#txSubmitProvider = txSubmitProvider;
}

async close(): Promise<void> {
await this.#txSubmitProvider.close?.();
}

async healthCheck() {
return this.#txSubmitProvider.healthCheck();
}
Expand Down
55 changes: 19 additions & 36 deletions packages/cardano-services/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ require('../scripts/patchRequire');
import {
API_URL_DEFAULT,
OGMIOS_URL_DEFAULT,
ProgramArgs,
ProgramOptionDescriptions,
RABBITMQ_URL_DEFAULT,
ServiceNames,
loadHttpServer
} from './Program';
import { Command } from 'commander';
import { InvalidLoggerLevel } from './errors';
import { LogLevel } from 'bunyan';
import { URL } from 'url';
import { loggerMethodNames } from './util';
import onDeath from 'death';
Expand Down Expand Up @@ -40,6 +41,13 @@ program
(url) => new URL(url),
new URL(OGMIOS_URL_DEFAULT)
)
.option(
'--rabbitmq-url <rabbitMQUrl>',
ProgramOptionDescriptions.RabbitMQUrl,
(url) => new URL(url),
new URL(RABBITMQ_URL_DEFAULT)
)
.option('--use-queue', ProgramOptionDescriptions.UseQueue, () => true, false)
.option(
'--logger-min-severity <level>',
ProgramOptionDescriptions.LoggerMinSeverity,
Expand All @@ -51,41 +59,16 @@ program
},
'info'
)
.action(
async (
serviceNames: ServiceNames[],
{
apiUrl,
dbConnectionString,
loggerMinSeverity,
metricsEnabled,
ogmiosUrl
}: {
apiUrl: URL;
dbConnectionString?: string;
loggerMinSeverity: LogLevel;
metricsEnabled: boolean;
ogmiosUrl?: URL;
}
) => {
const server = await loadHttpServer({
apiUrl: apiUrl || API_URL_DEFAULT,
options: {
dbConnectionString,
loggerMinSeverity,
metricsEnabled,
ogmiosUrl
},
serviceNames
});
await server.initialize();
await server.start();
onDeath(async () => {
await server.shutdown();
process.exit(1);
});
}
);
.action(async (serviceNames: ServiceNames[], options: { apiUrl: URL } & NonNullable<ProgramArgs['options']>) => {
const { apiUrl, ...rest } = options;
const server = await loadHttpServer({ apiUrl: apiUrl || API_URL_DEFAULT, options: rest, serviceNames });
await server.initialize();
await server.start();
onDeath(async () => {
await server.shutdown();
process.exit(1);
});
});

if (process.argv.slice(2).length === 0) {
program.outputHelp();
Expand Down
11 changes: 8 additions & 3 deletions packages/cardano-services/src/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/* eslint-disable import/imports-first */
require('../scripts/patchRequire');
import * as envalid from 'envalid';
import { API_URL_DEFAULT, OGMIOS_URL_DEFAULT, ServiceNames, loadHttpServer } from './Program';
import { API_URL_DEFAULT, OGMIOS_URL_DEFAULT, RABBITMQ_URL_DEFAULT, ServiceNames, loadHttpServer } from './Program';
import { LogLevel } from 'bunyan';
import { URL } from 'url';
import { config } from 'dotenv';
Expand All @@ -14,14 +14,17 @@ const envSpecs = {
DB_CONNECTION_STRING: envalid.str({ default: undefined }),
LOGGER_MIN_SEVERITY: envalid.str({ choices: loggerMethodNames as string[], default: 'info' }),
OGMIOS_URL: envalid.url({ default: OGMIOS_URL_DEFAULT }),
SERVICE_NAMES: envalid.str({ example: Object.values(ServiceNames).toString() })
RABBITMQ_URL: envalid.url({ default: RABBITMQ_URL_DEFAULT }),
SERVICE_NAMES: envalid.str({ example: Object.values(ServiceNames).toString() }),
USE_QUEUE: envalid.bool({ default: false })
};

void (async () => {
config();
const env = envalid.cleanEnv(process.env, envSpecs);
const apiUrl = new URL(env.API_URL);
const ogmiosUrl = new URL(env.OGMIOS_URL);
const rabbitmqUrl = new URL(env.RABBITMQ_URL);
const dbConnectionString = env.DB_CONNECTION_STRING ? new URL(env.DB_CONNECTION_STRING).toString() : undefined;
const serviceNames = env.SERVICE_NAMES.split(',') as ServiceNames[];

Expand All @@ -31,7 +34,9 @@ void (async () => {
options: {
dbConnectionString,
loggerMinSeverity: env.LOGGER_MIN_SEVERITY as LogLevel,
ogmiosUrl
ogmiosUrl,
rabbitmqUrl,
useQueue: env.USE_QUEUE
},
serviceNames
});
Expand Down
3 changes: 2 additions & 1 deletion packages/cardano-services/src/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
},
"references": [
{ "path": "../../core/src" },
{ "path": "../../ogmios/src" }
{ "path": "../../ogmios/src" },
{ "path": "../../rabbitmq/src" }
]
}
67 changes: 66 additions & 1 deletion packages/cardano-services/test/entrypoints.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/* eslint-disable sonarjs/no-duplicate-string */
import { ChildProcess, fork } from 'child_process';
import { Connection, ConnectionConfig, createConnectionObject } from '@cardano-ogmios/client';
import { ServiceNames } from '../src';
import { RABBITMQ_URL_DEFAULT, ServiceNames } from '../src';
import { createHealthyMockOgmiosServer, createUnhealthyMockOgmiosServer, ogmiosServerReady, serverReady } from './util';
import { getRandomPort } from 'get-port-please';
import { listenPromise, serverClosePromise } from '../src/util';
Expand Down Expand Up @@ -141,6 +141,7 @@ describe('entrypoints', () => {
});
});
});

describe('specifying an Ogmios-dependent service without providing the Ogmios URL', () => {
beforeEach(async () => {
ogmiosServer = createHealthyMockOgmiosServer();
Expand Down Expand Up @@ -178,6 +179,70 @@ describe('entrypoints', () => {
});
});
});

describe('with RabbitMQ and explicit URL', () => {
it('cli:start-server', async () => {
proc = fork(
exePath('cli'),
[
'start-server',
'--api-url',
apiUrl,
'--logger-min-severity',
'error',
'--use-queue',
'--rabbitmq-url',
RABBITMQ_URL_DEFAULT,
ServiceNames.TxSubmit
],
{
stdio: 'pipe'
}
);
await assertServiceHealthy(apiUrl, ServiceNames.TxSubmit);
});

it('run', async () => {
proc = fork(exePath('run'), {
env: {
API_URL: apiUrl,
LOGGER_MIN_SEVERITY: 'error',
RABBITMQ_URL: RABBITMQ_URL_DEFAULT,
SERVICE_NAMES: ServiceNames.TxSubmit,
USE_QUEUE: 'true'
},
stdio: 'pipe'
});
await assertServiceHealthy(apiUrl, ServiceNames.TxSubmit);
});
});

describe('with RabbitMQ and default URL', () => {
it('cli:start-server', async () => {
proc = fork(
exePath('cli'),
['start-server', '--api-url', apiUrl, '--logger-min-severity', 'error', '--use-queue', ServiceNames.TxSubmit],
{
stdio: 'pipe'
}
);
await assertServiceHealthy(apiUrl, ServiceNames.TxSubmit);
});

it('run', async () => {
proc = fork(exePath('run'), {
env: {
API_URL: apiUrl,
LOGGER_MIN_SEVERITY: 'error',
SERVICE_NAMES: ServiceNames.TxSubmit,
USE_QUEUE: 'true'
},
stdio: 'pipe'
});
await assertServiceHealthy(apiUrl, ServiceNames.TxSubmit);
});
});

describe('with unhealthy internal providers', () => {
let spy: jest.Mock;
beforeEach(async () => {
Expand Down
2 changes: 2 additions & 0 deletions packages/cardano-services/test/jest-setup/jest-setup.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { parse } from 'pg-connection-string';
import { setupPostgresContainer } from './docker';
import { setupRabbitMQContainer } from '@cardano-sdk/rabbitmq/test/jest-setup/docker';
import dotenv from 'dotenv';
import path from 'path';

Expand All @@ -16,4 +17,5 @@ module.exports = async () => {
password ? password : 'mysecretpassword',
port ? port : '5432'
);
await setupRabbitMQContainer();
};
2 changes: 2 additions & 0 deletions packages/cardano-services/test/jest-setup/jest-teardown.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { removePostgresContainer } from './docker';
import { removeRabbitMQContainer } from '@cardano-sdk/rabbitmq/test/jest-setup/docker';

module.exports = async () => {
await removePostgresContainer();
await removeRabbitMQContainer();
};
2 changes: 1 addition & 1 deletion packages/cardano-services/test/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{ "path": "../src" },
{ "path": "../../core/src" },
{ "path": "../../ogmios/test" },
{ "path": "../../rabbitmq/test" },
{ "path": "../../util-dev/src" }
]
}

1 change: 1 addition & 0 deletions packages/core/src/Provider/Provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export type HealthCheckResponse = {
};

export interface Provider {
close?(): Promise<void>;
/**
* @throws ProviderError
*/
Expand Down
4 changes: 4 additions & 0 deletions packages/rabbitmq/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
node_modules
dist
secrets
coverage

0 comments on commit 0d23d4c

Please sign in to comment.