From 068672d5203e6b7919f3f973db84a89042ace0f7 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Sun, 15 Feb 2026 23:00:05 -0800 Subject: [PATCH] refactor(queue): make Delivery an interface with Message as value type Address PR #18 architectural feedback to follow Go idioms by making Delivery an interface that provides both data access and operations. **extensions/queue/delivery.go** - Delivery interface - Message() returns Message by value (not pointer) - Ack(ctx) error - acknowledge successful processing - Nack(ctx, requeueAfter) error - reject and requeue - ExtendVisibilityTimeout(ctx, duration) error - extend processing time - Helper methods: DeliveryID(), Attempt(), ReceivedAt(), Metadata() **extensions/queue/subscriber.go** - Updated interface - Subscribe returns (<-chan Delivery, error) - Each Delivery has its own ack/nack methods **entities/queue/** - Message entity only - Removed Delivery from entities (now an interface in extensions) - Message remains as pure value type entity - Message.Copy() uses maps.Clone() with nil check Key design: - Delivery is an interface (implementations hold backend-specific state) - Message is a value type (pure data, follows Go idioms) - Cleaner API: delivery.Ack(ctx) instead of handler.Ack(ctx, deliveryID) Co-Authored-By: Claude Opus 4.6 --- entities/queue/BUILD.bazel | 15 +++++++ entities/queue/message.go | 55 +++++++++++++++++++++++ entities/queue/message_test.go | 80 ++++++++++++++++++++++++++++++++++ extensions/queue/BUILD.bazel | 14 ++++++ extensions/queue/README.md | 71 ++++++++++++++++++++++++++++++ extensions/queue/delivery.go | 43 ++++++++++++++++++ extensions/queue/publisher.go | 17 ++++++++ extensions/queue/queue.go | 17 ++++++++ extensions/queue/subscriber.go | 24 ++++++++++ extensions/storage/BUILD.bazel | 2 +- 10 files changed, 337 insertions(+), 1 deletion(-) create mode 100644 entities/queue/BUILD.bazel create mode 100644 entities/queue/message.go create mode 100644 entities/queue/message_test.go create mode 100644 extensions/queue/BUILD.bazel create mode 100644 extensions/queue/README.md create mode 100644 extensions/queue/delivery.go create mode 100644 extensions/queue/publisher.go create mode 100644 extensions/queue/queue.go create mode 100644 extensions/queue/subscriber.go diff --git a/entities/queue/BUILD.bazel b/entities/queue/BUILD.bazel new file mode 100644 index 00000000..e9f1afbc --- /dev/null +++ b/entities/queue/BUILD.bazel @@ -0,0 +1,15 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "queue", + srcs = ["message.go"], + importpath = "github.com/uber/submitqueue/entities/queue", + visibility = ["//visibility:public"], +) + +go_test( + name = "queue_test", + srcs = ["message_test.go"], + embed = [":queue"], + deps = ["@com_github_stretchr_testify//assert"], +) diff --git a/entities/queue/message.go b/entities/queue/message.go new file mode 100644 index 00000000..d82a9af1 --- /dev/null +++ b/entities/queue/message.go @@ -0,0 +1,55 @@ +package queue + +import ( + "maps" + "time" +) + +// Message represents a queue message entity. +// Immutable - use Copy() for modifications. +type Message struct { + // ID uniquely identifies the message for deduplication and tracing. + ID string + + // Payload is the message body as raw bytes. + Payload []byte + + // Metadata contains key-value pairs for headers and attributes. + // Use for trace IDs, request IDs, and cross-service metadata. + Metadata map[string]string + + // PartitionKey determines which partition/shard this message goes to. + // Messages with the same PartitionKey are guaranteed ordered delivery. + // Optional - if empty, backend may use round-robin distribution. + PartitionKey string + + // PublishedAt is when the message was published (Unix milliseconds). + PublishedAt int64 +} + +// NewMessage creates a new message with the given ID and payload. +// Metadata is initialized as an empty map. +// PublishedAt is set to the current time. +func NewMessage(id string, payload []byte) Message { + return Message{ + ID: id, + Payload: payload, + Metadata: make(map[string]string), + PublishedAt: time.Now().UnixMilli(), + } +} + +// Copy creates a deep copy of the message. +// Safe to call concurrently. +func (m Message) Copy() Message { + payloadCopy := make([]byte, len(m.Payload)) + copy(payloadCopy, m.Payload) + + return Message{ + ID: m.ID, + Payload: payloadCopy, + Metadata: maps.Clone(m.Metadata), + PartitionKey: m.PartitionKey, + PublishedAt: m.PublishedAt, + } +} diff --git a/entities/queue/message_test.go b/entities/queue/message_test.go new file mode 100644 index 00000000..d1dcef1a --- /dev/null +++ b/entities/queue/message_test.go @@ -0,0 +1,80 @@ +package queue + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewMessage(t *testing.T) { + id := "test-id" + payload := []byte("test payload") + + msg := NewMessage(id, payload) + + assert.Equal(t, id, msg.ID) + assert.Equal(t, payload, msg.Payload) + assert.NotNil(t, msg.Metadata) + assert.Empty(t, msg.Metadata) + assert.NotZero(t, msg.PublishedAt) +} + +func TestMessage_Copy(t *testing.T) { + original := NewMessage("id-123", []byte("payload")) + original.Metadata["key"] = "value" + original.PartitionKey = "partition-1" + + copied := original.Copy() + + // Verify immutable fields are equal + assert.Equal(t, original.ID, copied.ID) + assert.Equal(t, original.PublishedAt, copied.PublishedAt) + assert.Equal(t, original.PartitionKey, copied.PartitionKey) + + // Verify deep copy of payload + assert.Equal(t, original.Payload, copied.Payload) + original.Payload[0] = 'X' + assert.NotEqual(t, original.Payload[0], copied.Payload[0]) + + // Verify deep copy of metadata + assert.Equal(t, original.Metadata, copied.Metadata) + original.Metadata["new"] = "value" + assert.NotContains(t, copied.Metadata, "new") +} + +func TestMessage_Copy_EmptyPayload(t *testing.T) { + original := NewMessage("id", []byte{}) + copied := original.Copy() + + assert.NotNil(t, copied.Payload) + assert.Empty(t, copied.Payload) + assert.Equal(t, original.Payload, copied.Payload) +} + +func TestMessage_Fields(t *testing.T) { + msg := NewMessage("id-123", []byte("payload")) + + // Test metadata + msg.Metadata["trace-id"] = "xyz" + msg.Metadata["source"] = "gateway" + assert.Equal(t, "xyz", msg.Metadata["trace-id"]) + assert.Equal(t, "gateway", msg.Metadata["source"]) + + // Test partition key + msg.PartitionKey = "user-123" + assert.Equal(t, "user-123", msg.PartitionKey) + + // Test PublishedAt can be overridden + customTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli() + msg.PublishedAt = customTime + assert.Equal(t, customTime, msg.PublishedAt) + + // Verify copy preserves fields + copied := msg.Copy() + assert.Equal(t, msg.ID, copied.ID) + assert.Equal(t, msg.PartitionKey, copied.PartitionKey) + assert.Equal(t, msg.PublishedAt, copied.PublishedAt) + assert.Equal(t, msg.Metadata, copied.Metadata) +} + diff --git a/extensions/queue/BUILD.bazel b/extensions/queue/BUILD.bazel new file mode 100644 index 00000000..4beee766 --- /dev/null +++ b/extensions/queue/BUILD.bazel @@ -0,0 +1,14 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "queue", + srcs = [ + "delivery.go", + "publisher.go", + "queue.go", + "subscriber.go", + ], + importpath = "github.com/uber/submitqueue/extensions/queue", + visibility = ["//visibility:public"], + deps = ["//entities/queue"], +) diff --git a/extensions/queue/README.md b/extensions/queue/README.md new file mode 100644 index 00000000..c40dfa7d --- /dev/null +++ b/extensions/queue/README.md @@ -0,0 +1,71 @@ +# Queue Abstractions + +Vendor-agnostic interfaces for pub/sub messaging systems. + +## Interfaces + +### Queue +Creates publishers and subscribers. + +### Publisher +Publishes messages to topics. + +```go +type Publisher interface { + Publish(ctx context.Context, topic string, message queue.Message) error + Close() error +} +``` + +### Subscriber +Consumes messages from topics. + +```go +type Subscriber interface { + Subscribe(ctx context.Context, topic string) (<-chan Delivery, error) + Close() error +} +``` + +### Delivery +Message with acknowledgment operations. + +```go +type Delivery interface { + Message() queue.Message + Ack(ctx context.Context) error + Nack(ctx context.Context, requeueAfterMillis int64) error + ExtendVisibilityTimeout(ctx context.Context, durationMillis int64) error + DeliveryID() string + Attempt() int + ReceivedAt() int64 + Metadata() map[string]string +} +``` + +## Usage + +```go +q, _ := NewQueue(config) +defer q.Close() + +// Publish +pub := q.Publisher() +msg := queue.NewMessage("id", []byte("payload")) +pub.Publish(ctx, "topic", msg) + +// Subscribe +sub := q.Subscriber() +deliveries, _ := sub.Subscribe(ctx, "topic") +for delivery := range deliveries { + process(delivery.Message().Payload) + delivery.Ack(ctx) +} +``` + +## Implementing a Backend + +1. Create `extensions/queue/{backend}/` directory +2. Implement `Queue`, `Publisher`, `Subscriber`, `Delivery` interfaces +3. Map `queue.Message` to backend format + diff --git a/extensions/queue/delivery.go b/extensions/queue/delivery.go new file mode 100644 index 00000000..0616f63a --- /dev/null +++ b/extensions/queue/delivery.go @@ -0,0 +1,43 @@ +package queue + +import ( + "context" + + "github.com/uber/submitqueue/entities/queue" +) + +// Delivery represents a message delivered by a Subscriber. +// Provides access to the message and methods to acknowledge or reject it. +// +// Implementations must be safe for concurrent Message() calls. +// Ack/Nack/ExtendVisibilityTimeout should not be called concurrently on the same instance. +type Delivery interface { + // Message returns the delivered message. + Message() queue.Message + + // Ack acknowledges successful processing of the message. + // The message will be removed from the queue and not redelivered. + Ack(ctx context.Context) error + + // Nack negatively acknowledges the message, indicating processing failure. + // The message will be requeued for redelivery after requeueAfterMillis. + // If requeueAfterMillis is 0, the message is requeued immediately. + Nack(ctx context.Context, requeueAfterMillis int64) error + + // ExtendVisibilityTimeout extends the time before this message becomes + // visible to other consumers. Use when processing takes longer than expected. + ExtendVisibilityTimeout(ctx context.Context, durationMillis int64) error + + // DeliveryID returns a backend-specific identifier for this delivery. + DeliveryID() string + + // Attempt returns how many times this message has been delivered. + // Starts at 1 for first delivery. + Attempt() int + + // ReceivedAt returns when this delivery was received (Unix milliseconds). + ReceivedAt() int64 + + // Metadata returns backend-specific delivery metadata. + Metadata() map[string]string +} diff --git a/extensions/queue/publisher.go b/extensions/queue/publisher.go new file mode 100644 index 00000000..4a7f0d65 --- /dev/null +++ b/extensions/queue/publisher.go @@ -0,0 +1,17 @@ +package queue + +import ( + "context" + + "github.com/uber/submitqueue/entities/queue" +) + +// Publisher publishes messages to topics. +// Implementations must be thread-safe. +type Publisher interface { + // Publish sends a message to the specified topic. + Publish(ctx context.Context, topic string, message queue.Message) error + + // Close gracefully shuts down the publisher, flushing pending messages. + Close() error +} diff --git a/extensions/queue/queue.go b/extensions/queue/queue.go new file mode 100644 index 00000000..4a327e39 --- /dev/null +++ b/extensions/queue/queue.go @@ -0,0 +1,17 @@ +package queue + +// Queue creates and manages queue publishers and subscribers. +// Implementations handle connection pooling, consumer group configuration, +// and resource lifecycle. +type Queue interface { + // Publisher returns a Publisher instance. + // May return a singleton or new instance depending on implementation. + Publisher() Publisher + + // Subscriber returns a Subscriber instance. + // May return a singleton or new instance depending on implementation. + Subscriber() Subscriber + + // Close shuts down the queue and all associated resources. + Close() error +} diff --git a/extensions/queue/subscriber.go b/extensions/queue/subscriber.go new file mode 100644 index 00000000..46d0ca79 --- /dev/null +++ b/extensions/queue/subscriber.go @@ -0,0 +1,24 @@ +package queue + +import ( + "context" +) + +// Subscriber consumes messages from topics. +// Implementations must be thread-safe. +type Subscriber interface { + // Subscribe starts consuming messages from the specified topic. + // Returns a channel of Delivery instances and an error if subscription fails. + // + // The channel is closed when the subscriber is closed or context is cancelled. + // Implementations should handle infrastructure errors internally (e.g., reconnect). + // + // Each Delivery provides the message and methods to acknowledge or reject it. + // Consumers should call delivery.Ack() or delivery.Nack() for each delivery. + Subscribe(ctx context.Context, topic string) (<-chan Delivery, error) + + // Close gracefully shuts down the subscriber. + // All delivery channels will be closed. + // Idempotent - safe to call multiple times. + Close() error +} diff --git a/extensions/storage/BUILD.bazel b/extensions/storage/BUILD.bazel index 94afc868..efab6747 100644 --- a/extensions/storage/BUILD.bazel +++ b/extensions/storage/BUILD.bazel @@ -3,8 +3,8 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "storage", srcs = [ - "storage.go", "request_store.go", + "storage.go", ], importpath = "github.com/uber/submitqueue/extensions/storage", visibility = ["//visibility:public"],