Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions entities/queue/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,19 @@ type Message struct {
PublishedAt int64
}

// NewMessage creates a new message with the given ID and payload.
// Metadata is initialized as an empty map.
// NewMessage creates a new message with the given ID, payload, partition key, and metadata.
// If metadata is nil, it will be initialized as an empty map.
// PublishedAt is set to the current time.
func NewMessage(id string, payload []byte) Message {
func NewMessage(id string, payload []byte, partitionKey string, metadata map[string]string) Message {
if metadata == nil {
metadata = make(map[string]string)
}
return Message{
ID: id,
Payload: payload,
Metadata: make(map[string]string),
PublishedAt: time.Now().UnixMilli(),
ID: id,
Payload: payload,
PartitionKey: partitionKey,
Metadata: metadata,
PublishedAt: time.Now().UnixMilli(),
}
}

Expand Down
17 changes: 8 additions & 9 deletions entities/queue/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,18 @@ func TestNewMessage(t *testing.T) {
id := "test-id"
payload := []byte("test payload")

msg := NewMessage(id, payload)
msg := NewMessage(id, payload, "", nil)

assert.Equal(t, id, msg.ID)
assert.Equal(t, payload, msg.Payload)
assert.Empty(t, msg.PartitionKey)
assert.NotNil(t, msg.Metadata)
assert.Empty(t, msg.Metadata)
assert.NotZero(t, msg.PublishedAt)
}

func TestMessage_Copy(t *testing.T) {
original := NewMessage("id-123", []byte("payload"))
original.Metadata["key"] = "value"
original.PartitionKey = "partition-1"
original := NewMessage("id-123", []byte("payload"), "partition-1", map[string]string{"key": "value"})

copied := original.Copy()

Expand All @@ -44,7 +43,7 @@ func TestMessage_Copy(t *testing.T) {
}

func TestMessage_Copy_EmptyPayload(t *testing.T) {
original := NewMessage("id", []byte{})
original := NewMessage("id", []byte{}, "", nil)
copied := original.Copy()

assert.NotNil(t, copied.Payload)
Expand All @@ -53,16 +52,16 @@ func TestMessage_Copy_EmptyPayload(t *testing.T) {
}

func TestMessage_Fields(t *testing.T) {
msg := NewMessage("id-123", []byte("payload"))
msg := NewMessage("id-123", []byte("payload"), "user-123", map[string]string{
"trace-id": "xyz",
"source": "gateway",
})

// Test metadata
msg.Metadata["trace-id"] = "xyz"
msg.Metadata["source"] = "gateway"
assert.Equal(t, "xyz", msg.Metadata["trace-id"])
assert.Equal(t, "gateway", msg.Metadata["source"])

// Test partition key
msg.PartitionKey = "user-123"
assert.Equal(t, "user-123", msg.PartitionKey)

// Test PublishedAt can be overridden
Expand Down
10 changes: 9 additions & 1 deletion extensions/queue/sql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type RetryConfig struct {
type DLQConfig struct {
// Enabled enables dead letter queue
Enabled bool

// TopicSuffix is appended to the original topic name to create the DLQ topic
// For example, if original topic is "orders" and suffix is "_dlq", DLQ topic will be "orders_dlq"
TopicSuffix string
}

// DefaultConfig returns a Config with sensible defaults
Expand All @@ -79,7 +83,8 @@ func DefaultConfig(consumerGroup, workerID string) Config {
BackoffMultiplier: 2.0,
},
DLQ: DLQConfig{
Enabled: true,
Enabled: true,
TopicSuffix: "_dlq",
},
}
}
Expand Down Expand Up @@ -122,5 +127,8 @@ func (c *Config) Validate() error {
if c.Retry.BackoffMultiplier < 1.0 {
return fmt.Errorf("Retry.BackoffMultiplier must be >= 1.0")
}
if c.DLQ.Enabled && c.DLQ.TopicSuffix == "" {
return fmt.Errorf("DLQ.TopicSuffix is required when DLQ is enabled")
}
return nil
}
37 changes: 37 additions & 0 deletions extensions/queue/sql/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func TestDefaultConfig(t *testing.T) {
assert.Equal(t, 10*time.Second, cfg.LeaseRenewalInterval)
assert.Equal(t, 30*time.Second, cfg.LeaseDuration)
assert.True(t, cfg.DLQ.Enabled)
assert.Equal(t, "_dlq", cfg.DLQ.TopicSuffix)
assert.Equal(t, 3, cfg.Retry.MaxAttempts)
assert.Equal(t, 1*time.Second, cfg.Retry.InitialBackoff)
assert.Equal(t, 30*time.Second, cfg.Retry.MaxBackoff)
Expand Down Expand Up @@ -92,6 +93,42 @@ func TestConfigValidation(t *testing.T) {
},
expectError: true,
},
{
name: "DLQ enabled without topic suffix",
config: Config{
ConsumerGroup: "test",
WorkerID: "test-worker",
PollInterval: 100 * time.Millisecond,
BatchSize: 10,
VisibilityTimeout: 60 * time.Second,
LeaseRenewalInterval: 10 * time.Second,
LeaseDuration: 30 * time.Second,
Retry: DefaultConfig("dummy", "dummy").Retry,
DLQ: DLQConfig{
Enabled: true,
TopicSuffix: "", // Missing suffix
},
},
expectError: true,
},
{
name: "DLQ disabled without topic suffix - valid",
config: Config{
ConsumerGroup: "test",
WorkerID: "test-worker",
PollInterval: 100 * time.Millisecond,
BatchSize: 10,
VisibilityTimeout: 60 * time.Second,
LeaseRenewalInterval: 10 * time.Second,
LeaseDuration: 30 * time.Second,
Retry: DefaultConfig("dummy", "dummy").Retry,
DLQ: DLQConfig{
Enabled: false,
TopicSuffix: "", // Suffix not required when disabled
},
},
expectError: false,
},
}

for _, tt := range tests {
Expand Down
58 changes: 38 additions & 20 deletions extensions/queue/sql/message_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []q
defer tx.Rollback()

stmt, err := tx.PrepareContext(ctx, fmt.Sprintf(`
INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, retry_count, invisible_until)
VALUES (?, ?, ?, ?, ?, ?, ?, 0, 0)
INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, retry_count, invisible_until, failed_at, failure_count, last_error, original_topic)
VALUES (?, ?, ?, ?, ?, ?, ?, 0, 0, 0, 0, '', '')
`, MessagesTableName))
if err != nil {
s.logger.Errorw("failed to prepare statement",
Expand Down Expand Up @@ -210,7 +210,7 @@ func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, parti

// Fetch visible messages (invisible_until <= now)
rows, err := tx.QueryContext(ctx, fmt.Sprintf(`
SELECT offset, id, payload, metadata, partition_key, retry_count, published_at
SELECT offset, id, payload, metadata, partition_key, retry_count, published_at, failed_at, failure_count, last_error, original_topic
FROM %s
WHERE topic = ? AND partition_key = ? AND offset > ? AND invisible_until <= ?
ORDER BY offset
Expand Down Expand Up @@ -239,9 +239,13 @@ func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, parti
partKey string
retryCount int
publishedAtMilli int64
failedAt int64
failureCount int
lastError string
originalTopic string
)

if err := rows.Scan(&offset, &id, &payload, &metadataJSON, &partKey, &retryCount, &publishedAtMilli); err != nil {
if err := rows.Scan(&offset, &id, &payload, &metadataJSON, &partKey, &retryCount, &publishedAtMilli, &failedAt, &failureCount, &lastError, &originalTopic); err != nil {
s.logger.Errorw("failed to scan message row",
logTopic, topic,
logPartitionKey, partitionKey,
Expand Down Expand Up @@ -269,13 +273,17 @@ func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, parti
}

results = append(results, messageRow{
Offset: offset,
ID: id,
Payload: payload,
Metadata: metadata,
PartitionKey: partKey,
RetryCount: retryCount,
PublishedAt: publishedAtMilli,
Offset: offset,
ID: id,
Payload: payload,
Metadata: metadata,
PartitionKey: partKey,
RetryCount: retryCount,
PublishedAt: publishedAtMilli,
FailedAt: failedAt,
FailureCount: failureCount,
LastError: lastError,
OriginalTopic: originalTopic,
})

messageIDs = append(messageIDs, id)
Expand Down Expand Up @@ -349,7 +357,9 @@ func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, parti
return results, nil
}

// MoveToDLQ atomically moves a message from the main table to the DLQ table
// MoveToDLQ atomically moves a message to the DLQ by reinserting it with the DLQ topic name
// The message is inserted back into queue_messages table with the DLQ topic (original + suffix)
// This allows DLQ messages to be consumed using the normal subscriber
func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID string, failureCount int, lastError string) error {
start := time.Now()
success := false
Expand All @@ -361,6 +371,9 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID
s.metrics.Tagged(map[string]string{"result": result}).Timer("move_to_dlq.latency").Record(time.Since(start))
}()

// Construct DLQ topic name
dlqTopic := topic + s.config.DLQ.TopicSuffix

tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
s.logger.Errorw("failed to begin transaction for DLQ move",
Expand All @@ -380,13 +393,14 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID
partitionKey string
createdAtMilli int64
publishedAtMilli int64
retryCount int
)

err = tx.QueryRowContext(ctx, fmt.Sprintf(`
SELECT payload, metadata, partition_key, created_at, published_at
SELECT payload, metadata, partition_key, created_at, published_at, retry_count
FROM %s
WHERE topic = ? AND id = ?
`, MessagesTableName), topic, messageID).Scan(&payload, &metadataJSON, &partitionKey, &createdAtMilli, &publishedAtMilli)
`, MessagesTableName), topic, messageID).Scan(&payload, &metadataJSON, &partitionKey, &createdAtMilli, &publishedAtMilli, &retryCount)

if err != nil {
if err == sql.ErrNoRows {
Expand All @@ -406,16 +420,19 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID
return fmt.Errorf("failed to fetch message: %w", err)
}

// Insert into DLQ table
// Insert into queue_messages table with DLQ topic name and DLQ-specific fields
// Reset retry_count to 0 since this is a new topic (DLQ processing starts fresh)
// Store the original failure count for tracking purposes
now := start.UnixMilli()
_, err = tx.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, failed_at, failure_count, last_error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, DLQTableName), topic, messageID, payload, metadataJSON, partitionKey, createdAtMilli, publishedAtMilli, now, failureCount, lastError)
INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, invisible_until, retry_count, failed_at, failure_count, last_error, original_topic)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, MessagesTableName), dlqTopic, messageID, payload, metadataJSON, partitionKey, createdAtMilli, publishedAtMilli, int64(0), 0, now, failureCount, lastError, topic)

if err != nil {
s.logger.Errorw("failed to insert into DLQ",
s.logger.Errorw("failed to insert into DLQ topic",
logTopic, topic,
"dlq_topic", dlqTopic,
logMessageID, messageID,
logPartitionKey, partitionKey,
"failure_count", failureCount,
Expand All @@ -425,7 +442,7 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID
return fmt.Errorf("failed to insert into DLQ: %w", err)
}

// Delete from main table
// Delete from original topic
_, err = tx.ExecContext(ctx, fmt.Sprintf(`
DELETE FROM %s WHERE topic = ? AND id = ?
`, MessagesTableName), topic, messageID)
Expand Down Expand Up @@ -454,6 +471,7 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID
s.metrics.Counter("messages.moved_to_dlq").Inc(1)
s.logger.Infow("moved message to DLQ",
logTopic, topic,
"dlq_topic", dlqTopic,
logMessageID, messageID,
logPartitionKey, partitionKey,
"failure_count", failureCount,
Expand Down
23 changes: 15 additions & 8 deletions extensions/queue/sql/message_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ func TestmessageStore_FetchByOffset(t *testing.T) {
// Expect transaction begin
mock.ExpectBegin()

// Mock query results
rows := sqlmock.NewRows([]string{"offset", "id", "payload", "metadata", "partition_key", "retry_count", "published_at"}).
AddRow(int64(1), "msg1", []byte("payload1"), []byte("{}"), "part1", 0, time.Now().UnixMilli())
// Mock query results (including DLQ columns)
rows := sqlmock.NewRows([]string{"offset", "id", "payload", "metadata", "partition_key", "retry_count", "published_at", "failed_at", "failure_count", "last_error", "original_topic"}).
AddRow(int64(1), "msg1", []byte("payload1"), []byte("{}"), "part1", 0, time.Now().UnixMilli(), int64(0), 0, "", "")

mock.ExpectQuery("SELECT (.+) FROM queue_messages").
WithArgs(topic, partitionKey, currentOffset, sqlmock.AnyArg(), limit).
Expand Down Expand Up @@ -163,19 +163,26 @@ func TestmessageStore_MoveToDLQ(t *testing.T) {
failureCount := 3
lastError := "test error"

// Get config to know the DLQ suffix
config := DefaultConfig("test-consumer", "test-worker")
dlqTopic := topic + config.DLQ.TopicSuffix // "test_topic_dlq"

// Expect transaction begin
mock.ExpectBegin()

// Mock query for fetching message - SELECT payload, metadata, partition_key, created_at, published_at
rows := sqlmock.NewRows([]string{"payload", "metadata", "partition_key", "created_at", "published_at"}).
AddRow([]byte("payload1"), []byte("{}"), "part1", time.Now().UnixMilli(), time.Now().UnixMilli())
// Mock query for fetching message - SELECT payload, metadata, partition_key, created_at, published_at, retry_count
rows := sqlmock.NewRows([]string{"payload", "metadata", "partition_key", "created_at", "published_at", "retry_count"}).
AddRow([]byte("payload1"), []byte(`{"key":"value"}`), "part1", time.Now().UnixMilli(), time.Now().UnixMilli(), failureCount)

mock.ExpectQuery("SELECT (.+) FROM queue_messages").
WithArgs(topic, messageID).
WillReturnRows(rows)

// Expect insert into DLQ
mock.ExpectExec("INSERT INTO queue_dlq").
// Expect insert into queue_messages with DLQ topic and DLQ-specific columns
// Columns: topic, id, payload, metadata, partition_key, created_at, published_at, invisible_until, retry_count, failed_at, failure_count, last_error, original_topic
// Note: retry_count is reset to 0 for DLQ processing, but failure_count preserves the original attempts
mock.ExpectExec("INSERT INTO queue_messages").
WithArgs(dlqTopic, messageID, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), int64(0), 0, sqlmock.AnyArg(), failureCount, lastError, topic).
WillReturnResult(sqlmock.NewResult(1, 1))

// Expect delete from main table
Expand Down
9 changes: 3 additions & 6 deletions extensions/queue/sql/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ func TestPublisher_PublishAfterClose(t *testing.T) {
require.NoError(t, err)

// Try to publish after close
msg := queue.NewMessage("msg1", []byte("payload"))
msg.PartitionKey = "part1"
msg := queue.NewMessage("msg1", []byte("payload"), "part1", nil)
err = pub.Publish(ctx, "test_topic", msg)
require.Error(t, err)
}
Expand Down Expand Up @@ -253,8 +252,7 @@ func TestValidateTopicName(t *testing.T) {

// Try to publish with this topic name
ctx := context.Background()
msg := queue.NewMessage("msg1", []byte("test"))
msg.PartitionKey = "part1"
msg := queue.NewMessage("msg1", []byte("test"), "part1", nil)

if !tt.wantErr {
mockStore.EXPECT().Insert(gomock.Any(), tt.topicName, gomock.Any()).Return(nil).Times(1)
Expand Down Expand Up @@ -348,8 +346,7 @@ func TestPublisher_PublishContextCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

msg := queue.NewMessage("msg1", []byte("payload"))
msg.PartitionKey = "part1"
msg := queue.NewMessage("msg1", []byte("payload"), "part1", nil)

// Should fail with context cancelled error
err := pub.Publish(ctx, "test_topic", msg)
Expand Down
1 change: 0 additions & 1 deletion extensions/queue/sql/schema/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
filegroup(
name = "schema",
srcs = [
"queue_dlq.sql",
"queue_messages.sql",
"queue_offsets.sql",
"queue_partition_leases.sql",
Expand Down
Loading