Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions entities/queue/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "queue",
srcs = ["message.go"],
importpath = "github.com/uber/submitqueue/entities/queue",
visibility = ["//visibility:public"],
)

go_test(
name = "queue_test",
srcs = ["message_test.go"],
embed = [":queue"],
deps = ["@com_github_stretchr_testify//assert"],
)
55 changes: 55 additions & 0 deletions entities/queue/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package queue

import (
"maps"
"time"
)

// Message represents a queue message entity.
// Immutable - use Copy() for modifications.
type Message struct {
// ID uniquely identifies the message for deduplication and tracing.
ID string

// Payload is the message body as raw bytes.
Payload []byte

// Metadata contains key-value pairs for headers and attributes.
// Use for trace IDs, request IDs, and cross-service metadata.
Metadata map[string]string

// PartitionKey determines which partition/shard this message goes to.
// Messages with the same PartitionKey are guaranteed ordered delivery.
// Optional - if empty, backend may use round-robin distribution.
PartitionKey string

// PublishedAt is when the message was published (Unix milliseconds).
PublishedAt int64
}

// NewMessage creates a new message with the given ID and payload.
// Metadata is initialized as an empty map.
// PublishedAt is set to the current time.
func NewMessage(id string, payload []byte) Message {
return Message{
ID: id,
Payload: payload,
Metadata: make(map[string]string),
PublishedAt: time.Now().UnixMilli(),
}
}

// Copy creates a deep copy of the message.
// Safe to call concurrently.
func (m Message) Copy() Message {
payloadCopy := make([]byte, len(m.Payload))
copy(payloadCopy, m.Payload)

return Message{
ID: m.ID,
Payload: payloadCopy,
Metadata: maps.Clone(m.Metadata),
PartitionKey: m.PartitionKey,
PublishedAt: m.PublishedAt,
}
}
80 changes: 80 additions & 0 deletions entities/queue/message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package queue

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewMessage(t *testing.T) {
id := "test-id"
payload := []byte("test payload")

msg := NewMessage(id, payload)

assert.Equal(t, id, msg.ID)
assert.Equal(t, payload, msg.Payload)
assert.NotNil(t, msg.Metadata)
assert.Empty(t, msg.Metadata)
assert.NotZero(t, msg.PublishedAt)
}

func TestMessage_Copy(t *testing.T) {
original := NewMessage("id-123", []byte("payload"))
original.Metadata["key"] = "value"
original.PartitionKey = "partition-1"

copied := original.Copy()

// Verify immutable fields are equal
assert.Equal(t, original.ID, copied.ID)
assert.Equal(t, original.PublishedAt, copied.PublishedAt)
assert.Equal(t, original.PartitionKey, copied.PartitionKey)

// Verify deep copy of payload
assert.Equal(t, original.Payload, copied.Payload)
original.Payload[0] = 'X'
assert.NotEqual(t, original.Payload[0], copied.Payload[0])

// Verify deep copy of metadata
assert.Equal(t, original.Metadata, copied.Metadata)
original.Metadata["new"] = "value"
assert.NotContains(t, copied.Metadata, "new")
}

func TestMessage_Copy_EmptyPayload(t *testing.T) {
original := NewMessage("id", []byte{})
copied := original.Copy()

assert.NotNil(t, copied.Payload)
assert.Empty(t, copied.Payload)
assert.Equal(t, original.Payload, copied.Payload)
}

func TestMessage_Fields(t *testing.T) {
msg := NewMessage("id-123", []byte("payload"))

// Test metadata
msg.Metadata["trace-id"] = "xyz"
msg.Metadata["source"] = "gateway"
assert.Equal(t, "xyz", msg.Metadata["trace-id"])
assert.Equal(t, "gateway", msg.Metadata["source"])

// Test partition key
msg.PartitionKey = "user-123"
assert.Equal(t, "user-123", msg.PartitionKey)

// Test PublishedAt can be overridden
customTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
msg.PublishedAt = customTime
assert.Equal(t, customTime, msg.PublishedAt)

// Verify copy preserves fields
copied := msg.Copy()
assert.Equal(t, msg.ID, copied.ID)
assert.Equal(t, msg.PartitionKey, copied.PartitionKey)
assert.Equal(t, msg.PublishedAt, copied.PublishedAt)
assert.Equal(t, msg.Metadata, copied.Metadata)
}

14 changes: 14 additions & 0 deletions extensions/queue/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "queue",
srcs = [
"delivery.go",
"publisher.go",
"queue.go",
"subscriber.go",
],
importpath = "github.com/uber/submitqueue/extensions/queue",
visibility = ["//visibility:public"],
deps = ["//entities/queue"],
)
71 changes: 71 additions & 0 deletions extensions/queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Queue Abstractions

Vendor-agnostic interfaces for pub/sub messaging systems.

## Interfaces

### Queue
Creates publishers and subscribers.

### Publisher
Publishes messages to topics.

```go
type Publisher interface {
Publish(ctx context.Context, topic string, message queue.Message) error
Close() error
}
```

### Subscriber
Consumes messages from topics.

```go
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan Delivery, error)
Close() error
}
```

### Delivery
Message with acknowledgment operations.

```go
type Delivery interface {
Message() queue.Message
Ack(ctx context.Context) error
Nack(ctx context.Context, requeueAfterMillis int64) error
ExtendVisibilityTimeout(ctx context.Context, durationMillis int64) error
DeliveryID() string
Attempt() int
ReceivedAt() int64
Metadata() map[string]string
}
```

## Usage

```go
q, _ := NewQueue(config)
defer q.Close()

// Publish
pub := q.Publisher()
msg := queue.NewMessage("id", []byte("payload"))
pub.Publish(ctx, "topic", msg)

// Subscribe
sub := q.Subscriber()
deliveries, _ := sub.Subscribe(ctx, "topic")
for delivery := range deliveries {
process(delivery.Message().Payload)
delivery.Ack(ctx)
}
```

## Implementing a Backend

1. Create `extensions/queue/{backend}/` directory
2. Implement `Queue`, `Publisher`, `Subscriber`, `Delivery` interfaces
3. Map `queue.Message` to backend format

43 changes: 43 additions & 0 deletions extensions/queue/delivery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package queue

import (
"context"

"github.com/uber/submitqueue/entities/queue"
)

// Delivery represents a message delivered by a Subscriber.
// Provides access to the message and methods to acknowledge or reject it.
//
// Implementations must be safe for concurrent Message() calls.
// Ack/Nack/ExtendVisibilityTimeout should not be called concurrently on the same instance.
type Delivery interface {
// Message returns the delivered message.
Message() queue.Message

// Ack acknowledges successful processing of the message.
// The message will be removed from the queue and not redelivered.
Ack(ctx context.Context) error

// Nack negatively acknowledges the message, indicating processing failure.
// The message will be requeued for redelivery after requeueAfterMillis.
// If requeueAfterMillis is 0, the message is requeued immediately.
Nack(ctx context.Context, requeueAfterMillis int64) error

// ExtendVisibilityTimeout extends the time before this message becomes
// visible to other consumers. Use when processing takes longer than expected.
ExtendVisibilityTimeout(ctx context.Context, durationMillis int64) error

// DeliveryID returns a backend-specific identifier for this delivery.
DeliveryID() string

// Attempt returns how many times this message has been delivered.
// Starts at 1 for first delivery.
Attempt() int

// ReceivedAt returns when this delivery was received (Unix milliseconds).
ReceivedAt() int64

// Metadata returns backend-specific delivery metadata.
Metadata() map[string]string
}
17 changes: 17 additions & 0 deletions extensions/queue/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package queue

import (
"context"

"github.com/uber/submitqueue/entities/queue"
)

// Publisher publishes messages to topics.
// Implementations must be thread-safe.
type Publisher interface {
// Publish sends a message to the specified topic.
Publish(ctx context.Context, topic string, message queue.Message) error

// Close gracefully shuts down the publisher, flushing pending messages.
Close() error
}
17 changes: 17 additions & 0 deletions extensions/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package queue

// Queue creates and manages queue publishers and subscribers.
// Implementations handle connection pooling, consumer group configuration,
// and resource lifecycle.
type Queue interface {
// Publisher returns a Publisher instance.
// May return a singleton or new instance depending on implementation.
Publisher() Publisher

// Subscriber returns a Subscriber instance.
// May return a singleton or new instance depending on implementation.
Subscriber() Subscriber

// Close shuts down the queue and all associated resources.
Close() error
}
24 changes: 24 additions & 0 deletions extensions/queue/subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package queue

import (
"context"
)

// Subscriber consumes messages from topics.
// Implementations must be thread-safe.
type Subscriber interface {
// Subscribe starts consuming messages from the specified topic.
// Returns a channel of Delivery instances and an error if subscription fails.
//
// The channel is closed when the subscriber is closed or context is cancelled.
Comment thread
behinddwalls marked this conversation as resolved.
// Implementations should handle infrastructure errors internally (e.g., reconnect).
//
// Each Delivery provides the message and methods to acknowledge or reject it.
// Consumers should call delivery.Ack() or delivery.Nack() for each delivery.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: also define when the implementation can return an error.

Subscribe(ctx context.Context, topic string) (<-chan Delivery, error)
Comment thread
behinddwalls marked this conversation as resolved.

// Close gracefully shuts down the subscriber.
// All delivery channels will be closed.
// Idempotent - safe to call multiple times.
Close() error
}
2 changes: 1 addition & 1 deletion extensions/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ load("@rules_go//go:def.bzl", "go_library")
go_library(
name = "storage",
srcs = [
"storage.go",
"request_store.go",
"storage.go",
],
importpath = "github.com/uber/submitqueue/extensions/storage",
visibility = ["//visibility:public"],
Expand Down