Skip to content

Commit

Permalink
Add locator support for AMQP (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad committed Jul 18, 2023
1 parent 454ebc7 commit 56802b8
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 6 deletions.
2 changes: 1 addition & 1 deletion packages/amqp/.eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
],
"max-lines": ["error", { "max": 600 }],
"max-params": ["error", { "max": 4 }],
"max-statements": ["error", { "max": 15 }],
"max-statements": ["error", { "max": 20 }],
"complexity": ["error", { "max": 20 }]
},
"overrides": [
Expand Down
27 changes: 25 additions & 2 deletions packages/amqp/lib/AbstractAmqpService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,19 @@ export type AMQPDependencies = QueueDependencies & {
export type AMQPConsumerDependencies = AMQPDependencies & QueueConsumerDependencies
export type AMQPQueueConfig = Options.AssertQueue

export type AMQPQueueLocatorType = {
queueName: string
}

export class AbstractAmqpService<
MessagePayloadType extends object,
DependenciesType extends AMQPDependencies = AMQPDependencies,
> extends AbstractQueueService<MessagePayloadType, DependenciesType, AMQPQueueConfig> {
> extends AbstractQueueService<
MessagePayloadType,
DependenciesType,
AMQPQueueConfig,
AMQPQueueLocatorType
> {
protected readonly connection: Connection
// @ts-ignore
protected channel: Channel
Expand Down Expand Up @@ -70,7 +79,21 @@ export class AbstractAmqpService<
this.handleError(err)
})

await this.channel.assertQueue(this.queueName, this.queueConfiguration)
if (!this.queueLocator) {
await this.channel.assertQueue(this.queueName, this.queueConfiguration)
} else {
// queue check breaks channel if not successful
const checkChannel = await this.connection.createChannel()
checkChannel.on('error', () => {
// it's OK
})
try {
await checkChannel.checkQueue(this.queueLocator.queueName)
await checkChannel.close()
} catch (err) {
throw new Error(`Queue with queueName ${this.queueLocator.queueName} does not exist.`)
}
}
}

async close(): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion packages/amqp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"zod": "^3.21.4"
},
"peerDependencies": {
"@message-queue-toolkit/core": "^1.1.0",
"@message-queue-toolkit/core": "^1.2.0",
"amqplib": "^0.10.3"
},
"devDependencies": {
Expand Down
48 changes: 48 additions & 0 deletions packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,54 @@ const perms: [string, ...string[]] = ['perm1', 'perm2']
const userIds = [100, 200, 300]

describe('PermissionPublisher', () => {
describe('init', () => {
let diContainer: AwilixContainer<Dependencies>
let channel: Channel
beforeAll(async () => {
diContainer = await registerDependencies(TEST_AMQP_CONFIG, {
consumerErrorResolver: asClass(FakeConsumerErrorResolver, SINGLETON_CONFIG),
permissionConsumer: asClass(FakeConsumer, {
lifetime: Lifetime.SINGLETON,
asyncInit: 'start',
asyncDispose: 'close',
asyncDisposePriority: 10,
}),
})
})

beforeEach(async () => {
channel = await diContainer.cradle.amqpConnection.createChannel()
})

afterEach(async () => {
await channel.deleteQueue(AmqpPermissionConsumer.QUEUE_NAME)
await channel.close()
})

it('throws an error when invalid queue locator is passed', async () => {
await channel.deleteQueue(AmqpPermissionConsumer.QUEUE_NAME)
const newPublisher = new AmqpPermissionPublisher(diContainer.cradle, {
queueLocator: {
queueName: AmqpPermissionPublisher.QUEUE_NAME,
},
})

await expect(() => newPublisher.init()).rejects.toThrow(/does not exist/)
})

it('does not create a new queue when queue locator is passed', async () => {
await channel.assertQueue(AmqpPermissionPublisher.QUEUE_NAME)

const newPublisher = new AmqpPermissionPublisher(diContainer.cradle, {
queueLocator: {
queueName: AmqpPermissionPublisher.QUEUE_NAME,
},
})

await expect(newPublisher.init()).resolves.toBeUndefined()
})
})

describe('publish', () => {
let diContainer: AwilixContainer<Dependencies>
let channel: Channel
Expand Down
13 changes: 11 additions & 2 deletions packages/amqp/test/publishers/AmqpPermissionPublisher.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import type { QueueOptions } from '@message-queue-toolkit/core'

import type { AMQPLocatorType } from '../../lib/AbstractAmqpConsumer'
import { AbstractAmqpPublisher } from '../../lib/AbstractAmqpPublisher'
import type { AMQPDependencies } from '../../lib/AbstractAmqpService'
import type { AMQPDependencies, AMQPQueueConfig } from '../../lib/AbstractAmqpService'
import type { PERMISSIONS_MESSAGE_TYPE } from '../consumers/userConsumerSchemas'
import { PERMISSIONS_MESSAGE_SCHEMA } from '../consumers/userConsumerSchemas'

export class AmqpPermissionPublisher extends AbstractAmqpPublisher<PERMISSIONS_MESSAGE_TYPE> {
public static QUEUE_NAME = 'user_permissions'

constructor(dependencies: AMQPDependencies) {
constructor(
dependencies: AMQPDependencies,
options: Partial<
Pick<QueueOptions<PERMISSIONS_MESSAGE_TYPE, AMQPQueueConfig, AMQPLocatorType>, 'queueLocator'>
>,
) {
super(dependencies, {
queueName: AmqpPermissionPublisher.QUEUE_NAME,
queueConfiguration: {
Expand All @@ -15,6 +23,7 @@ export class AmqpPermissionPublisher extends AbstractAmqpPublisher<PERMISSIONS_M
},
messageSchema: PERMISSIONS_MESSAGE_SCHEMA,
messageTypeField: 'messageType',
...options,
})
}
}

0 comments on commit 56802b8

Please sign in to comment.