Skip to content

feat(queue): add vendor-agnostic messaging queue interfaces#18

Merged
behinddwalls merged 1 commit into
mainfrom
queue-interface
Feb 19, 2026
Merged

feat(queue): add vendor-agnostic messaging queue interfaces#18
behinddwalls merged 1 commit into
mainfrom
queue-interface

Conversation

@behinddwalls
Copy link
Copy Markdown
Collaborator

@behinddwalls behinddwalls commented Feb 16, 2026

Summary

Why?

To enable vendor-agnostic queue integration across multiple backends (Kafka, SQS, RabbitMQ) without coupling to specific implementations.

What?

Adds queue extension interfaces and message entity:

extensions/queue/

  • Delivery interface - provides Message(), Ack(), Nack(), ExtendVisibilityTimeout()
  • Subscriber interface - returns (<-chan Delivery, error)
  • Publisher interface - publishes messages to topics
  • Factory interface - creates publishers/subscribers

entities/queue/

  • Message value type with ID, Payload, Metadata, PartitionKey, PublishedAt
  • Includes Copy() for deep copying

Test Plan

  • Message entity tests pass
    • Extension interfaces build successfully
    • Maps.Clone() handles nil metadata

Stack

  1. @ feat(queue): add vendor-agnostic messaging queue interfaces #18
  2. docs(queue/sql): add comprehensive RFC for SQL queue design #25
  3. feat(queue/sql): add MySQL schema and configuration #20
  4. feat(queue/sql): add data access layer stores #21
  5. feat(queue/sql): add publisher implementation #22
  6. feat(queue/sql): add subscriber with partition leasing #23
  7. feat(queue/sql): add factory, tests, and documentation #24

Comment thread entities/queue/delivery.go Outdated
Comment thread entities/queue/delivery.go Outdated
Comment thread entities/queue/delivery.go Outdated
Comment thread entities/queue/delivery_test.go Outdated
Comment thread entities/queue/delivery_test.go Outdated
Comment thread entities/queue/message.go Outdated
Comment thread extensions/queue/subscriber.go Outdated
Comment thread Makefile Outdated
behinddwalls added a commit that referenced this pull request Feb 17, 2026
Address PR #18 architectural feedback to follow Go idioms by making
Delivery an interface that provides both data access and operations.

**extensions/queue/delivery.go** - Delivery interface
- Message() returns Message by value (not pointer)
- Ack(ctx) error - acknowledge successful processing
- Nack(ctx, requeueAfter) error - reject and requeue
- ExtendVisibilityTimeout(ctx, duration) error - extend processing time
- Helper methods: DeliveryID(), Attempt(), ReceivedAt(), Metadata()

**extensions/queue/subscriber.go** - Updated interface
- Subscribe returns (<-chan Delivery, error)
- Each Delivery has its own ack/nack methods

**entities/queue/** - Message entity only
- Removed Delivery from entities (now an interface in extensions)
- Message remains as pure value type entity
- Message.Copy() uses maps.Clone() with nil check

Key design:
- Delivery is an interface (implementations hold backend-specific state)
- Message is a value type (pure data, follows Go idioms)
- Cleaner API: delivery.Ack(ctx) instead of handler.Ack(ctx, deliveryID)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Comment thread entities/queue/message.go Outdated
Comment thread entities/queue/message.go Outdated
Comment thread entities/queue/message.go Outdated
Comment thread extensions/queue/publisher.go Outdated
Comment thread extensions/queue/queue.go Outdated
Comment thread extensions/queue/subscriber.go
Comment thread extensions/queue/subscriber.go Outdated
Comment thread extensions/queue/subscriber.go
Address PR #18 architectural feedback to follow Go idioms by making
Delivery an interface that provides both data access and operations.

**extensions/queue/delivery.go** - Delivery interface
- Message() returns Message by value (not pointer)
- Ack(ctx) error - acknowledge successful processing
- Nack(ctx, requeueAfter) error - reject and requeue
- ExtendVisibilityTimeout(ctx, duration) error - extend processing time
- Helper methods: DeliveryID(), Attempt(), ReceivedAt(), Metadata()

**extensions/queue/subscriber.go** - Updated interface
- Subscribe returns (<-chan Delivery, error)
- Each Delivery has its own ack/nack methods

**entities/queue/** - Message entity only
- Removed Delivery from entities (now an interface in extensions)
- Message remains as pure value type entity
- Message.Copy() uses maps.Clone() with nil check

Key design:
- Delivery is an interface (implementations hold backend-specific state)
- Message is a value type (pure data, follows Go idioms)
- Cleaner API: delivery.Ack(ctx) instead of handler.Ack(ctx, deliveryID)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
// Implementations should handle infrastructure errors internally (e.g., reconnect).
//
// Each Delivery provides the message and methods to acknowledge or reject it.
// Consumers should call delivery.Ack() or delivery.Nack() for each delivery.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: also define when the implementation can return an error.

@behinddwalls behinddwalls merged commit 61d75c2 into main Feb 19, 2026
1 check passed
@behinddwalls behinddwalls deleted the queue-interface branch February 19, 2026 03:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants