Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement topic exchange logic for publishers and consumers #155

Merged
merged 2 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/amqp/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export type { AMQPQueueConfig } from './lib/AbstractAmqpService'

export { AbstractAmqpQueueConsumer } from './lib/AbstractAmqpQueueConsumer'
export { AbstractAmqpTopicConsumer } from './lib/AbstractAmqpTopicConsumer'
export { AbstractAmqpConsumer, AMQPConsumerOptions } from './lib/AbstractAmqpConsumer'

export { AmqpConsumerErrorResolver } from './lib/errors/AmqpConsumerErrorResolver'
Expand All @@ -13,6 +14,6 @@ export type { ConnectionReceiver } from './lib/AmqpConnectionManager'
export { deserializeAmqpMessage } from './lib/amqpMessageDeserializer'

export * from './lib/AbstractAmqpQueuePublisher'
export * from './lib/AbstractAmqpExchangePublisher'
export * from './lib/AmqpExchangePublisherManager'
export * from './lib/AbstractAmqpTopicPublisher'
export * from './lib/AmqpTopicPublisherManager'
export * from './lib/AmqpQueuePublisherManager'
33 changes: 25 additions & 8 deletions packages/amqp/lib/AbstractAmqpConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import type { Connection, Message } from 'amqplib'

import type {
AMQPConsumerDependencies,
AMQPLocator,
AMQPCreationConfig,
AMQPQueueLocator,
AMQPQueueCreationConfig,
} from './AbstractAmqpService'
import { AbstractAmqpService } from './AbstractAmqpService'
import { readAmqpMessage } from './amqpMessageReader'
Expand All @@ -33,9 +33,11 @@ export type AMQPConsumerOptions<
MessagePayloadType extends object,
ExecutionContext = undefined,
PrehandlerOutput = undefined,
CreationConfig extends AMQPQueueCreationConfig = AMQPQueueCreationConfig,
LocatorConfig extends AMQPQueueLocator = AMQPQueueLocator,
> = QueueConsumerOptions<
AMQPCreationConfig,
AMQPLocator,
CreationConfig,
LocatorConfig,
NonNullable<unknown>, // DeadLetterQueueIntegrationOptions -> empty object for now
MessagePayloadType,
ExecutionContext,
Expand All @@ -46,21 +48,25 @@ export abstract class AbstractAmqpConsumer<
MessagePayloadType extends object,
ExecutionContext,
PrehandlerOutput = undefined,
CreationConfig extends AMQPQueueCreationConfig = AMQPQueueCreationConfig,
LocatorConfig extends AMQPQueueLocator = AMQPQueueLocator,
>
extends AbstractAmqpService<
MessagePayloadType,
AMQPConsumerDependencies,
ExecutionContext,
PrehandlerOutput
PrehandlerOutput,
CreationConfig,
LocatorConfig
>
implements QueueConsumer
{
private readonly transactionObservabilityManager?: TransactionObservabilityManager
private readonly errorResolver: ErrorResolver
private readonly executionContext: ExecutionContext
private readonly deadLetterQueueOptions?: DeadLetterQueueOptions<
AMQPCreationConfig,
AMQPLocator,
AMQPQueueCreationConfig,
AMQPQueueLocator,
NonNullable<unknown>
>
private readonly maxRetryDuration: number
Expand All @@ -71,10 +77,17 @@ export abstract class AbstractAmqpConsumer<
ExecutionContext,
PrehandlerOutput
>
protected readonly queueName: string

constructor(
dependencies: AMQPConsumerDependencies,
options: AMQPConsumerOptions<MessagePayloadType, ExecutionContext, PrehandlerOutput>,
options: AMQPConsumerOptions<
MessagePayloadType,
ExecutionContext,
PrehandlerOutput,
CreationConfig,
LocatorConfig
>,
executionContext: ExecutionContext,
) {
super(dependencies, options)
Expand All @@ -84,6 +97,10 @@ export abstract class AbstractAmqpConsumer<
this.deadLetterQueueOptions = options.deadLetterQueue
this.maxRetryDuration = options.maxRetryDuration ?? DEFAULT_MAX_RETRY_DURATION

this.queueName = options.locatorConfig
? options.locatorConfig.queueName
: options.creationConfig!.queueName

const messageSchemas = options.handlers.map((entry) => entry.schema)
this.messageSchemaContainer = new MessageSchemaContainer<MessagePayloadType>({
messageSchemas,
Expand Down
55 changes: 0 additions & 55 deletions packages/amqp/lib/AbstractAmqpExchangePublisher.ts

This file was deleted.

46 changes: 32 additions & 14 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,52 @@
import type { Either } from '@lokalise/node-core'
import { InternalError } from '@lokalise/node-core'
import type { Either } from '@lokalise/node-core';
import { copyWithoutUndefined , InternalError } from '@lokalise/node-core'
import type {
BarrierResult,
CommonCreationConfigType,
MessageInvalidFormatError,
MessageValidationError,
QueuePublisherOptions,
SyncPublisher,
} from '@message-queue-toolkit/core'
import { MessageSchemaContainer, objectToBuffer } from '@message-queue-toolkit/core'
import { objectToBuffer, MessageSchemaContainer } from '@message-queue-toolkit/core'
import type { ZodSchema } from 'zod'

import type { AMQPLocator, AMQPCreationConfig, AMQPDependencies } from './AbstractAmqpService'
import type { AMQPDependencies } from './AbstractAmqpService'
import { AbstractAmqpService } from './AbstractAmqpService'

export type AMQPPublisherOptions<MessagePayloadType extends object> = QueuePublisherOptions<
AMQPCreationConfig,
AMQPLocator,
MessagePayloadType
> & {
export type AMQPPublisherOptions<
MessagePayloadType extends object,
CreationConfig extends CommonCreationConfigType,
LocatorConfig extends object,
> = QueuePublisherOptions<CreationConfig, LocatorConfig, MessagePayloadType> & {
exchange?: string
}

export abstract class AbstractAmqpPublisher<MessagePayloadType extends object, MessageOptionsType>
extends AbstractAmqpService<MessagePayloadType>
export abstract class AbstractAmqpPublisher<
MessagePayloadType extends object,
MessageOptionsType,
CreationConfig extends CommonCreationConfigType,
LocatorConfig extends object,
>
extends AbstractAmqpService<
MessagePayloadType,
AMQPDependencies,
unknown,
unknown,
CreationConfig,
LocatorConfig
>
implements SyncPublisher<MessagePayloadType, MessageOptionsType>
{
private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
protected readonly exchange?: string

private initPromise?: Promise<void>

constructor(dependencies: AMQPDependencies, options: AMQPPublisherOptions<MessagePayloadType>) {
constructor(
dependencies: AMQPDependencies,
options: AMQPPublisherOptions<MessagePayloadType, CreationConfig, LocatorConfig>,
) {
super(dependencies, options)

const messageSchemas = options.messageSchemas
Expand Down Expand Up @@ -99,12 +115,14 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object, M
throw new InternalError({
message: `Error while publishing to AMQP ${(err as Error).message}`,
errorCode: 'AMQP_PUBLISH_ERROR',
details: {
details: copyWithoutUndefined({
publisher: this.constructor.name,
// @ts-ignore
queueName: this.queueName,
exchange: this.exchange,
// @ts-ignore
messageType: message[this.messageTypeField] ?? 'unknown',
},
}),
cause: err as Error,
})
}
Expand Down
10 changes: 7 additions & 3 deletions packages/amqp/lib/AbstractAmqpQueueConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { AbstractAmqpConsumer } from './AbstractAmqpConsumer'
import { ensureAmqpQueue } from './utils/amqpQueueUtils'
import { deleteAmqpQueue, ensureAmqpQueue } from './utils/amqpQueueUtils'

export class AbstractAmqpQueueConsumer<
MessagePayloadType extends object,
ExecutionContext,
PrehandlerOutput = undefined,
> extends AbstractAmqpConsumer<MessagePayloadType, ExecutionContext, PrehandlerOutput> {
protected override createMissingEntities(): Promise<void> {
return ensureAmqpQueue(this.connection!, this.channel, this.creationConfig, this.locatorConfig)
protected override async createMissingEntities(): Promise<void> {
if (this.deletionConfig && this.creationConfig) {
await deleteAmqpQueue(this.channel, this.deletionConfig, this.creationConfig)
}

await ensureAmqpQueue(this.connection!, this.channel, this.creationConfig, this.locatorConfig)
}
}
27 changes: 26 additions & 1 deletion packages/amqp/lib/AbstractAmqpQueuePublisher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import type * as Buffer from 'node:buffer'

import type { Options } from 'amqplib/properties'

import type { AMQPPublisherOptions } from './AbstractAmqpPublisher'
import { AbstractAmqpPublisher } from './AbstractAmqpPublisher'
import type {
AMQPDependencies,
AMQPQueueCreationConfig,
AMQPQueueLocator,
} from './AbstractAmqpService'
import { ensureAmqpQueue } from './utils/amqpQueueUtils'

export type AmqpQueueMessageOptions = {
Expand All @@ -13,7 +21,24 @@ const NO_PARAMS: AmqpQueueMessageOptions = {

export abstract class AbstractAmqpQueuePublisher<
MessagePayloadType extends object,
> extends AbstractAmqpPublisher<MessagePayloadType, AmqpQueueMessageOptions> {
> extends AbstractAmqpPublisher<
MessagePayloadType,
AmqpQueueMessageOptions,
AMQPQueueCreationConfig,
AMQPQueueLocator
> {
protected readonly queueName: string

constructor(
dependencies: AMQPDependencies,
options: AMQPPublisherOptions<MessagePayloadType, AMQPQueueCreationConfig, AMQPQueueLocator>,
) {
super(dependencies, options)
this.queueName = options.locatorConfig
? options.locatorConfig.queueName
: options.creationConfig.queueName
}

protected publishInternal(message: Buffer, options: AmqpQueueMessageOptions): void {
this.channel.sendToQueue(this.queueName, message, options.publishOptions)
}
Expand Down
36 changes: 19 additions & 17 deletions packages/amqp/lib/AbstractAmqpService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type {
CommonCreationConfigType,
QueueConsumerDependencies,
QueueDependencies,
QueueOptions,
Expand All @@ -8,7 +9,6 @@ import type { Channel, Connection, Message } from 'amqplib'
import type { Options } from 'amqplib/properties'

import type { AmqpConnectionManager, ConnectionReceiver } from './AmqpConnectionManager'
import { deleteAmqpQueue } from './utils/amqpQueueUtils'

export type AMQPDependencies = QueueDependencies & {
amqpConnectionManager: AmqpConnectionManager
Expand All @@ -17,34 +17,44 @@ export type AMQPDependencies = QueueDependencies & {
export type AMQPConsumerDependencies = AMQPDependencies & QueueConsumerDependencies
export type AMQPQueueConfig = Options.AssertQueue

export type AMQPCreationConfig = {
export type AMQPQueueCreationConfig = {
queueOptions: AMQPQueueConfig
queueName: string
updateAttributesIfExists?: boolean
}

export type AMQPSubscriptionConfig = {
export type AMQPTopicCreationConfig = AMQPQueueCreationConfig & {
exchange: string
routingKey: string
topicPattern: string
}

export type AMQPLocator = {
export type AMQPTopicPublisherConfig = {
exchange: string
} & CommonCreationConfigType

export type AMQPQueueLocator = {
queueName: string
}

export type AMQPTopicLocator = AMQPQueueLocator & {
exchange: string
}

export abstract class AbstractAmqpService<
MessagePayloadType extends object,
DependenciesType extends AMQPDependencies = AMQPDependencies,
ExecutionContext = unknown,
PrehandlerOutput = unknown,
CreationConfig extends CommonCreationConfigType = AMQPQueueCreationConfig,
LocatorConfig extends object = AMQPQueueLocator,
>
extends AbstractQueueService<
MessagePayloadType,
Message,
DependenciesType,
AMQPCreationConfig,
AMQPLocator,
QueueOptions<AMQPCreationConfig, AMQPLocator>,
CreationConfig,
LocatorConfig,
QueueOptions<CreationConfig, LocatorConfig>,
ExecutionContext,
PrehandlerOutput
>
Expand All @@ -55,17 +65,13 @@ export abstract class AbstractAmqpService<
// @ts-ignore
protected channel: Channel
private isShuttingDown: boolean
protected readonly queueName: string

constructor(
dependencies: DependenciesType,
options: QueueOptions<AMQPCreationConfig, AMQPLocator>,
options: QueueOptions<CreationConfig, LocatorConfig>,
) {
super(dependencies, options)

this.queueName = options.locatorConfig
? options.locatorConfig.queueName
: options.creationConfig?.queueName
this.isShuttingDown = false
this.connectionManager = dependencies.amqpConnectionManager
this.connection = this.connectionManager.getConnectionSync()
Expand Down Expand Up @@ -98,10 +104,6 @@ export abstract class AbstractAmqpService<
this.isShuttingDown = false
}

if (this.deletionConfig && this.creationConfig) {
await deleteAmqpQueue(this.channel, this.deletionConfig, this.creationConfig)
}

this.channel.on('close', () => {
if (!this.isShuttingDown) {
this.logger.error(`AMQP connection lost!`)
Expand Down
Loading
Loading