Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/flow test options #8

Merged
merged 4 commits into from Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
54 changes: 53 additions & 1 deletion src/flow/flow-test.test.ts
Expand Up @@ -81,7 +81,7 @@ describe('Flow test', () => {
await terminate(...terminables);
});

it('should start', async () => {
it('should test using amqp successfully', async () => {
await testFlow(conn, {
type: IOType.AMQP,
exchange: "test.ex",
Expand All @@ -95,6 +95,58 @@ describe('Flow test', () => {
} as AmqpOutput);
});

it('should test using amqp successfully when options are given', async () => {
await testFlow(conn, {
type: IOType.AMQP,
exchange: "test.ex",
routingKey: "queue.2.key",
payload: inputPayload,
options: {
replyTo: "test",
expiration: 5000
}
} as AmqpInput, {
expectedOutput,
type: IOType.AMQP,
queues: ["test.queue.1", "test.queue.2"],
expectedQueue: "test.queue.2",
expectedProperties: {
replyTo: "test",
expiration: "5000"
}
} as AmqpOutput);
});

it('should fail when expected options do not match those given', async () => {
try {
await testFlow(conn, {
type: IOType.AMQP,
exchange: "test.ex",
routingKey: "queue.2.key",
payload: inputPayload,
expectedProperties: {
replyTo: "test",
expiration: 5000
}
} as AmqpInput, {
expectedOutput,
type: IOType.AMQP,
queues: ["test.queue.1", "test.queue.2"],
expectedQueue: "test.queue.2",
expectedProperties: {
replyTo: "not test",
expiration: "5000"
}
} as AmqpOutput);
} catch (e) {
expect(e).to.exist;

return;
}

assert.fail("Did not throw error");
});

it('should fail when message is not routed', async () => {
try {
await testFlow(conn, {
Expand Down
24 changes: 19 additions & 5 deletions src/flow/flow-test.ts
@@ -1,4 +1,4 @@
import { Channel, Connection, ConsumeMessage } from "amqplib";
import { Channel, Connection, ConsumeMessage, Options, MessageProperties } from "amqplib";
import axios from 'axios';
import { assert, expect } from 'chai';

Expand All @@ -15,6 +15,7 @@ export interface Input {
export interface AmqpInput extends Input {
exchange: string;
routingKey: string;
options?: Options.Publish;
}

export interface HttpInput extends Input {
Expand All @@ -30,6 +31,7 @@ export interface AmqpOutput extends Output {
type: IOType;
queues: string[];
expectedQueue: string;
expectedProperties: MessageProperties;
}

interface CreateMessageOut {
Expand All @@ -46,7 +48,8 @@ interface CreateMessageOut {
function createReceiveMessageCallback(
expectedOutput: any,
outQueue: string,
shouldReceive: boolean
shouldReceive: boolean,
expectedProperties?: MessageProperties,
): CreateMessageOut {
const res: any = {
resolve: undefined,
Expand All @@ -66,7 +69,7 @@ function createReceiveMessageCallback(
} else {
res.resolve();
}
}, 6000);
}, 4000);

return {
consumerPromise,
Expand All @@ -85,6 +88,15 @@ function createReceiveMessageCallback(
const out = JSON.parse(msg.content.toString());

expect(out).to.deep.equal(expectedOutput, `Output of queue ${outQueue} does not match expected output`);
if (expectedProperties) {
Object.keys(expectedProperties).forEach(key => {
expect((msg.properties as any)[key]).to.deep.equal(
(expectedProperties as any)[key],
`Output property ${key} of queue ${outQueue} does not match expected property`
);
})
}

if (timeoutReached) {
throw new Error(`Queue ${outQueue} received a message after the timeout was reached`);
}
Expand Down Expand Up @@ -123,7 +135,8 @@ export async function testFlow(
const res = createReceiveMessageCallback(
out.expectedOutput,
outQueue,
(out as AmqpOutput).expectedQueue === outQueue
(out as AmqpOutput).expectedQueue === outQueue,
(out as AmqpOutput).expectedProperties
);
const consumerTag = (await outChan.consume(outQueue, res.messageCallback)).consumerTag;
consumerTags.push(consumerTag);
Expand All @@ -143,7 +156,8 @@ export async function testFlow(
await inputChan.publish(
(input as AmqpInput).exchange,
(input as AmqpInput).routingKey,
Buffer.from(JSON.stringify(input.payload))
Buffer.from(JSON.stringify(input.payload)),
(input as AmqpInput).options,
);
} else if (input.type === IOType.HTTP) {
await axios.get((input as HttpInput).url);
Expand Down
2 changes: 1 addition & 1 deletion src/flow/process.ts
Expand Up @@ -18,7 +18,7 @@ export interface Flow {
export async function start(f: Flow = {}): Promise<Termination> {
return new Promise<Termination>((resolve, reject) => {
const {
path = 'flows.json',
path = 'flow.json',
port = 1880,
userDir = '.',
env = {},
Expand Down
9 changes: 1 addition & 8 deletions src/index.ts
Expand Up @@ -10,6 +10,7 @@ export const flow = flw;
export const probes = prb.builtin;
export const resources = rsc.builtin;
export const makeGlobal = rsc.makeGlobal;
export const testFlow = flw.testFlow;

export interface Options {
containers?: dkr.Container[];
Expand Down Expand Up @@ -65,11 +66,3 @@ export async function teardown(ctx: Context): Promise<void> {

return ctx.destroy();
}

export async function test(
ctx: Context,
input: flw.Input,
output: flw.Output,
): Promise<void> {
await flw.testFlow(ctx.resources.rabbitmq, input, output);
}