Skip to content

Commit

Permalink
Improve signature of creation/locator configs (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad committed Jul 20, 2023
1 parent e36cd1e commit f2bd25f
Show file tree
Hide file tree
Showing 26 changed files with 308 additions and 153 deletions.
2 changes: 1 addition & 1 deletion packages/amqp/.eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"@typescript-eslint/no-empty-interface": "warn",
"@typescript-eslint/ban-ts-comment": "off",
"@typescript-eslint/no-use-before-define": "off",
"@typescript-eslint/no-non-null-assertion": "warn",
"@typescript-eslint/no-non-null-assertion": "off",
"@typescript-eslint/no-var-requires": "off",
"@typescript-eslint/indent": "off",
"@typescript-eslint/no-explicit-any": "warn",
Expand Down
23 changes: 18 additions & 5 deletions packages/amqp/lib/AbstractAmqpConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import type { Either, ErrorResolver } from '@lokalise/node-core'
import type {
QueueConsumer,
QueueOptions,
NewQueueOptions,
TransactionObservabilityManager,
Deserializer,
ExistingQueueOptions,
} from '@message-queue-toolkit/core'
import { isMessageError } from '@message-queue-toolkit/core'
import type { Message } from 'amqplib'

import type { AMQPConsumerDependencies, AMQPQueueConfig } from './AbstractAmqpService'
import type { AMQPConsumerDependencies, CreateAMQPQueueOptions } from './AbstractAmqpService'
import { AbstractAmqpService } from './AbstractAmqpService'
import { deserializeAmqpMessage } from './amqpMessageDeserializer'

Expand All @@ -18,9 +19,15 @@ const ABORT_EARLY_EITHER: Either<'abort', never> = {

export type AMQPLocatorType = { queueName: string }

export type AMQPConsumerOptions<MessagePayloadType extends object> = QueueOptions<
export type NewAMQPConsumerOptions<MessagePayloadType extends object> = NewQueueOptions<
MessagePayloadType,
CreateAMQPQueueOptions
> & {
deserializer?: Deserializer<MessagePayloadType, Message>
}

export type ExistingAMQPConsumerOptions<MessagePayloadType extends object> = ExistingQueueOptions<
MessagePayloadType,
AMQPQueueConfig,
AMQPLocatorType
> & {
deserializer?: Deserializer<MessagePayloadType, Message>
Expand All @@ -36,13 +43,19 @@ export abstract class AbstractAmqpConsumer<MessagePayloadType extends object>

constructor(
dependencies: AMQPConsumerDependencies,
options: AMQPConsumerOptions<MessagePayloadType>,
options:
| NewAMQPConsumerOptions<MessagePayloadType>
| ExistingAMQPConsumerOptions<MessagePayloadType>,
) {
super(dependencies, options)
this.transactionObservabilityManager = dependencies.transactionObservabilityManager
this.errorResolver = dependencies.consumerErrorResolver

this.deserializer = options.deserializer ?? deserializeAmqpMessage

if (!options.locatorConfig?.queueName && !options.creationConfig?.queueName) {
throw new Error('queueName must be set in either locatorConfig or creationConfig')
}
}

abstract processMessage(
Expand Down
33 changes: 25 additions & 8 deletions packages/amqp/lib/AbstractAmqpService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type {
QueueConsumerDependencies,
QueueDependencies,
QueueOptions,
NewQueueOptions,
ExistingQueueOptions,
} from '@message-queue-toolkit/core'
import { AbstractQueueService } from '@message-queue-toolkit/core'
import type { Channel, Connection } from 'amqplib'
Expand All @@ -16,6 +17,11 @@ export type AMQPDependencies = QueueDependencies & {
export type AMQPConsumerDependencies = AMQPDependencies & QueueConsumerDependencies
export type AMQPQueueConfig = Options.AssertQueue

export type CreateAMQPQueueOptions = {
queueOptions: AMQPQueueConfig
queueName: string
}

export type AMQPQueueLocatorType = {
queueName: string
}
Expand All @@ -26,20 +32,28 @@ export class AbstractAmqpService<
> extends AbstractQueueService<
MessagePayloadType,
DependenciesType,
AMQPQueueConfig,
AMQPQueueLocatorType
CreateAMQPQueueOptions,
AMQPQueueLocatorType,
| NewQueueOptions<MessagePayloadType, CreateAMQPQueueOptions>
| ExistingQueueOptions<MessagePayloadType, AMQPLocatorType>
> {
protected readonly connection: Connection
// @ts-ignore
protected channel: Channel
private isShuttingDown: boolean
protected readonly queueName: string

constructor(
dependencies: DependenciesType,
options: QueueOptions<MessagePayloadType, AMQPQueueConfig, AMQPLocatorType>,
options:
| NewQueueOptions<MessagePayloadType, CreateAMQPQueueOptions>
| ExistingQueueOptions<MessagePayloadType, AMQPLocatorType>,
) {
super(dependencies, options)

this.queueName = options.locatorConfig
? options.locatorConfig.queueName
: options.creationConfig?.queueName
this.connection = dependencies.amqpConnection
this.isShuttingDown = false
}
Expand Down Expand Up @@ -79,19 +93,22 @@ export class AbstractAmqpService<
this.handleError(err)
})

if (!this.queueLocator) {
await this.channel.assertQueue(this.queueName, this.queueConfiguration)
if (this.creationConfig) {
await this.channel.assertQueue(
this.creationConfig.queueName,
this.creationConfig.queueOptions,
)
} else {
// queue check breaks channel if not successful
const checkChannel = await this.connection.createChannel()
checkChannel.on('error', () => {
// it's OK
})
try {
await checkChannel.checkQueue(this.queueLocator.queueName)
await checkChannel.checkQueue(this.locatorConfig!.queueName)
await checkChannel.close()
} catch (err) {
throw new Error(`Queue with queueName ${this.queueLocator.queueName} does not exist.`)
throw new Error(`Queue with queueName ${this.locatorConfig!.queueName} does not exist.`)
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions packages/amqp/test/consumers/AmqpPermissionConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ export class AmqpPermissionConsumer extends AbstractAmqpConsumer<PERMISSIONS_MES

constructor(dependencies: AMQPConsumerDependencies) {
super(dependencies, {
queueName: AmqpPermissionConsumer.QUEUE_NAME,
queueConfiguration: {
durable: true,
autoDelete: false,
creationConfig: {
queueName: AmqpPermissionConsumer.QUEUE_NAME,
queueOptions: {
durable: true,
autoDelete: false,
},
},
messageSchema: PERMISSIONS_MESSAGE_SCHEMA,
messageTypeField: 'messageType',
Expand Down
10 changes: 6 additions & 4 deletions packages/amqp/test/fakes/FakeConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import type { CommonMessage } from '../../lib/types/MessageTypes'
export class FakeConsumer extends AbstractAmqpConsumer<CommonMessage> {
constructor(dependencies: AMQPConsumerDependencies, queueName = 'dummy', messageSchema: ZodType) {
super(dependencies, {
queueName: queueName,
queueConfiguration: {
durable: true,
autoDelete: false,
creationConfig: {
queueName: queueName,
queueOptions: {
durable: true,
autoDelete: false,
},
},
messageSchema,
messageTypeField: 'messageType',
Expand Down
4 changes: 2 additions & 2 deletions packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe('PermissionPublisher', () => {
it('throws an error when invalid queue locator is passed', async () => {
await channel.deleteQueue(AmqpPermissionConsumer.QUEUE_NAME)
const newPublisher = new AmqpPermissionPublisher(diContainer.cradle, {
queueLocator: {
locatorConfig: {
queueName: AmqpPermissionPublisher.QUEUE_NAME,
},
})
Expand All @@ -59,7 +59,7 @@ describe('PermissionPublisher', () => {
await channel.assertQueue(AmqpPermissionPublisher.QUEUE_NAME)

const newPublisher = new AmqpPermissionPublisher(diContainer.cradle, {
queueLocator: {
locatorConfig: {
queueName: AmqpPermissionPublisher.QUEUE_NAME,
},
})
Expand Down
28 changes: 16 additions & 12 deletions packages/amqp/test/publishers/AmqpPermissionPublisher.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type { QueueOptions } from '@message-queue-toolkit/core'

import type { AMQPLocatorType } from '../../lib/AbstractAmqpConsumer'
import type {
NewAMQPConsumerOptions,
ExistingAMQPConsumerOptions,
} from '../../lib/AbstractAmqpConsumer'
import { AbstractAmqpPublisher } from '../../lib/AbstractAmqpPublisher'
import type { AMQPDependencies, AMQPQueueConfig } from '../../lib/AbstractAmqpService'
import type { AMQPDependencies } from '../../lib/AbstractAmqpService'
import type { PERMISSIONS_MESSAGE_TYPE } from '../consumers/userConsumerSchemas'
import { PERMISSIONS_MESSAGE_SCHEMA } from '../consumers/userConsumerSchemas'

Expand All @@ -11,16 +12,19 @@ export class AmqpPermissionPublisher extends AbstractAmqpPublisher<PERMISSIONS_M

constructor(
dependencies: AMQPDependencies,
options: Partial<
Pick<QueueOptions<PERMISSIONS_MESSAGE_TYPE, AMQPQueueConfig, AMQPLocatorType>, 'queueLocator'>
>,
options:
| Pick<NewAMQPConsumerOptions<PERMISSIONS_MESSAGE_TYPE>, 'creationConfig'>
| Pick<ExistingAMQPConsumerOptions<PERMISSIONS_MESSAGE_TYPE>, 'locatorConfig'> = {
creationConfig: {
queueName: AmqpPermissionPublisher.QUEUE_NAME,
queueOptions: {
durable: true,
autoDelete: false,
},
},
},
) {
super(dependencies, {
queueName: AmqpPermissionPublisher.QUEUE_NAME,
queueConfiguration: {
durable: true,
autoDelete: false,
},
messageSchema: PERMISSIONS_MESSAGE_SCHEMA,
messageTypeField: 'messageType',
...options,
Expand Down
3 changes: 2 additions & 1 deletion packages/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ export type {

export { AbstractQueueService } from './lib/queues/AbstractQueueService'
export type {
QueueOptions,
NewQueueOptions,
ExistingQueueOptions,
QueueDependencies,
QueueConsumerDependencies,
Deserializer,
Expand Down
40 changes: 23 additions & 17 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,24 @@ export type Deserializer<
errorProcessor: ErrorResolver,
) => Either<MessageInvalidFormatError | MessageValidationError, MessagePayloadType>

export type QueueOptions<
export type NewQueueOptions<
MessagePayloadType extends object,
CreationConfigType extends object,
> = {
messageSchema: ZodSchema<MessagePayloadType>
messageTypeField: string
locatorConfig?: never
creationConfig: CreationConfigType
}

export type ExistingQueueOptions<
MessagePayloadType extends object,
QueueConfiguration extends object,
QueueLocatorType extends object,
> = {
messageSchema: ZodSchema<MessagePayloadType>
messageTypeField: string
queueName: string
queueLocator?: QueueLocatorType
queueConfiguration?: QueueConfiguration
locatorConfig: QueueLocatorType
creationConfig?: never
}

export type CommonQueueLocator = {
Expand All @@ -47,32 +55,30 @@ export abstract class AbstractQueueService<
DependenciesType extends QueueDependencies,
QueueConfiguration extends object,
QueueLocatorType extends object = CommonQueueLocator,
OptionsType extends QueueOptions<
MessagePayloadType,
QueueConfiguration,
QueueLocatorType
> = QueueOptions<MessagePayloadType, QueueConfiguration, QueueLocatorType>,
OptionsType extends
| NewQueueOptions<MessagePayloadType, QueueConfiguration>
| ExistingQueueOptions<MessagePayloadType, QueueLocatorType> =
| NewQueueOptions<MessagePayloadType, QueueConfiguration>
| ExistingQueueOptions<MessagePayloadType, QueueLocatorType>,
> {
protected readonly queueName: string
protected readonly errorReporter: ErrorReporter
protected readonly messageSchema: ZodSchema<MessagePayloadType>
protected readonly logger: Logger
protected readonly messageTypeField: string
protected readonly queueConfiguration?: QueueConfiguration
protected readonly queueLocator?: QueueLocatorType
protected readonly creationConfig?: QueueConfiguration
protected readonly locatorConfig?: QueueLocatorType

constructor(
{ errorReporter, logger }: DependenciesType,
{ messageSchema, messageTypeField, queueName, queueConfiguration, queueLocator }: OptionsType,
{ messageSchema, messageTypeField, creationConfig, locatorConfig }: OptionsType,
) {
this.errorReporter = errorReporter
this.logger = logger

this.queueName = queueName
this.messageSchema = messageSchema
this.messageTypeField = messageTypeField
this.queueConfiguration = queueConfiguration
this.queueLocator = queueLocator
this.creationConfig = creationConfig
this.locatorConfig = locatorConfig
}

protected handleError(err: unknown) {
Expand Down
2 changes: 0 additions & 2 deletions packages/sns/.eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@
}
],
"max-lines": ["error", { "max": 600 }],
"max-params": ["error", { "max": 4 }],
"max-statements": ["error", { "max": 20 }],
"complexity": ["error", { "max": 20 }]
},
"overrides": [
Expand Down
Loading

0 comments on commit f2bd25f

Please sign in to comment.