Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions extension/queue/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,18 @@ Publishes messages to topics.
```go
type Publisher interface {
Publish(ctx context.Context, topic string, message queue.Message) error
PublishAfter(ctx context.Context, topic string, message queue.Message, delayMs int64) error
Close() error
}
```

**`PublishAfter`** inserts a fresh message that becomes visible to subscribers only after `delayMs`. It is distinct from `Nack(requeueAfterMillis)` even though both can produce "next delivery happens at T+delay":

- `Nack` is "this delivery failed, try again" — it bumps `retry_count` and eventually trips DLQ.
- `PublishAfter` is "postpone this work" — `retry_count` resets to 0, DLQ stays available for true failures.

Use `PublishAfter` for self-driven poll loops (e.g. the orchestrator's `buildsignal` consumer re-publishing itself between `Status` calls). Use `Nack` for processing failures.

### Subscriber
Consumes messages from topics with per-subscription configuration.

Expand Down
14 changes: 14 additions & 0 deletions extension/queue/mock/publisher_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions extension/queue/mysql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ extension/queue/mysql/
| `queue_partition_leases` | Partition lease coordination | `(consumer_group, topic, partition_key)` |
| `queue_subscriber_heartbeats` | Active subscriber tracking | `(consumer_group, topic, subscriber_name)` |

`queue_messages` has a `visible_after BIGINT UNSIGNED NOT NULL DEFAULT 0` column that supports `Publisher.PublishAfter`: subscribers' `FetchByOffset` skips rows where `visible_after > now`. Default 0 means immediately visible, so existing rows continue to behave as before — the column is back-compatible.

See `schema/` for full SQL definitions. See the [RFC](../../doc/rfc/sql-queue-rfc.md#database-schema) for field-level documentation.

### Store Architecture
Expand Down
34 changes: 24 additions & 10 deletions extension/queue/mysql/message_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@ func newMessageStore(db *sql.DB, logger *zap.SugaredLogger, scope tally.Scope) m
}
}

// Insert inserts messages into the messages table.
// Insert inserts messages into the messages table with no visibility delay.
// Equivalent to InsertDelayed with visibleAfterMs == 0.
func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []queue.Message) error {
return s.InsertDelayed(ctx, topic, messages, 0)
}

// InsertDelayed inserts messages into the messages table, optionally deferring
// delivery until visibleAfterMs (epoch milliseconds). 0 means immediately
// visible; FetchByOffset skips rows where visible_after > now.
//
// Publishes are idempotent on the (topic, partition_key, id) unique key: a
// repeated publish for the same key is silently treated as success and does
Expand All @@ -53,7 +61,7 @@ func newMessageStore(db *sql.DB, logger *zap.SugaredLogger, scope tally.Scope) m
// idempotent publishes") and lets callers safely retry publishes (e.g. a
// second Cancel RPC for the same request) without surfacing 1062 duplicate-key
// errors.
func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []queue.Message) (retErr error) {
func (s *sqlmessageStore) InsertDelayed(ctx context.Context, topic string, messages []queue.Message, visibleAfterMs int64) (retErr error) {
op := metrics.Begin(s.scope, "insert", metrics.NewTag("topic", topic))
defer func() { op.Complete(retErr) }()

Expand All @@ -64,6 +72,7 @@ func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []q
s.logger.Debugw("inserting messages",
logTopic, topic,
"count", len(messages),
"visible_after", visibleAfterMs,
)

tx, err := s.db.BeginTx(ctx, nil)
Expand All @@ -75,8 +84,8 @@ func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []q
// ON DUPLICATE KEY UPDATE topic=topic is a no-op write that makes MySQL
// swallow the unique-key violation without mutating the existing row.
stmt, err := tx.PrepareContext(ctx, fmt.Sprintf(`
INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, failed_at, failure_count, last_error, original_topic)
VALUES (?, ?, ?, ?, ?, ?, ?, 0, 0, '', '')
INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, visible_after, failed_at, failure_count, last_error, original_topic)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0, 0, '', '')
ON DUPLICATE KEY UPDATE topic = topic
`, MessagesTableName))
if err != nil {
Expand All @@ -102,6 +111,7 @@ func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []q
msg.PartitionKey,
now,
msg.PublishedAt,
visibleAfterMs,
)
if err != nil {
return fmt.Errorf("insert message topic=%s message=%s partition=%s: %w", topic, msg.ID, msg.PartitionKey, err)
Expand Down Expand Up @@ -137,18 +147,20 @@ func (s *sqlmessageStore) Delete(ctx context.Context, topic string, partitionKey
}

// FetchByOffset fetches messages with offset > currentOffset for a specific partition.
// Rows whose visible_after > nowMs are skipped — those are deferred deliveries
// (published via InsertDelayed) that should not yet be surfaced to subscribers.
// Messages are fetched from the immutable log; no per-message mutation occurs.
func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, limit int) (_ []messageRow, retErr error) {
func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, nowMs int64, limit int) (_ []messageRow, retErr error) {
op := metrics.Begin(s.scope, "fetch", metrics.NewTag("topic", topic))
defer func() { op.Complete(retErr) }()

rows, err := s.db.QueryContext(ctx, fmt.Sprintf(`
SELECT offset, id, payload, metadata, partition_key, published_at, failed_at, failure_count, last_error, original_topic
FROM %s
WHERE topic = ? AND partition_key = ? AND offset > ?
WHERE topic = ? AND partition_key = ? AND offset > ? AND visible_after <= ?
ORDER BY offset
LIMIT ?
`, MessagesTableName), topic, partitionKey, currentOffset, limit)
`, MessagesTableName), topic, partitionKey, currentOffset, nowMs, limit)
if err != nil {
return nil, fmt.Errorf("query messages topic=%s partition=%s: %w", topic, partitionKey, err)
}
Expand Down Expand Up @@ -254,11 +266,13 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, partition
return fmt.Errorf("fetch message for DLQ topic=%s partition=%s message=%s: %w", topic, partitionKey, messageID, err)
}

// Insert into queue_messages table with DLQ topic name and DLQ-specific fields
// Insert into queue_messages table with DLQ topic name and DLQ-specific fields.
// DLQ messages are always immediately visible (visible_after=0); any delay on
// the original message has already been consumed by the time it failed.
now := time.Now().UnixMilli()
_, err = tx.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, failed_at, failure_count, last_error, original_topic)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, visible_after, failed_at, failure_count, last_error, original_topic)
VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?, ?, ?, ?)
`, MessagesTableName), dlqTopic, messageID, payload, metadataJSON, fetchPartKey, createdAtMilli, publishedAtMilli, now, failureCount, lastError, topic)

if err != nil {
Expand Down
49 changes: 47 additions & 2 deletions extension/queue/mysql/message_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,23 +141,68 @@ func TestMessageStore_FetchByOffset(t *testing.T) {
topic := "test_topic"
partitionKey := "part1"
currentOffset := int64(0)
nowMs := time.Now().UnixMilli()
limit := 10

// Mock query results (no transaction, simple SELECT)
rows := sqlmock.NewRows([]string{"offset", "id", "payload", "metadata", "partition_key", "published_at", "failed_at", "failure_count", "last_error", "original_topic"}).
AddRow(int64(1), "msg1", []byte("payload1"), []byte("{}"), "part1", time.Now().UnixMilli(), int64(0), 0, "", "")

mock.ExpectQuery("SELECT (.+) FROM queue_messages").
WithArgs(topic, partitionKey, currentOffset, limit).
WithArgs(topic, partitionKey, currentOffset, nowMs, limit).
WillReturnRows(rows)

results, err := store.FetchByOffset(ctx, topic, partitionKey, currentOffset, limit)
results, err := store.FetchByOffset(ctx, topic, partitionKey, currentOffset, nowMs, limit)
require.NoError(t, err)
require.Len(t, results, 1)
require.Equal(t, "msg1", results[0].ID)
require.NoError(t, mock.ExpectationsWereMet())
}

func TestMessageStore_FetchByOffset_SkipsDelayed(t *testing.T) {
db, mock, store := setupmessageStoreTest(t)
defer db.Close()

ctx := context.Background()
topic := "test_topic"
partitionKey := "part1"
currentOffset := int64(0)
nowMs := int64(1000)
limit := 10

// The SQL filter (visible_after <= nowMs) is applied by the DB; sqlmock just
// verifies the parameter binding. An empty result row simulates the case
// where the only message is still deferred.
mock.ExpectQuery("SELECT (.+) FROM queue_messages").
WithArgs(topic, partitionKey, currentOffset, nowMs, limit).
WillReturnRows(sqlmock.NewRows([]string{"offset", "id", "payload", "metadata", "partition_key", "published_at", "failed_at", "failure_count", "last_error", "original_topic"}))

results, err := store.FetchByOffset(ctx, topic, partitionKey, currentOffset, nowMs, limit)
require.NoError(t, err)
require.Empty(t, results)
require.NoError(t, mock.ExpectationsWereMet())
}

func TestMessageStore_InsertDelayed(t *testing.T) {
db, mock, store := setupmessageStoreTest(t)
defer db.Close()

ctx := context.Background()
visibleAfter := time.Now().UnixMilli() + 5000
msg := queue.Message{ID: "msg-delayed", Payload: []byte("p"), PartitionKey: "part1", PublishedAt: time.Now().UnixMilli()}

mock.ExpectBegin()
mock.ExpectPrepare("INSERT INTO queue_messages")
mock.ExpectExec("INSERT INTO queue_messages").
WithArgs("test_topic", msg.ID, msg.Payload, []byte(nil), msg.PartitionKey, sqlmock.AnyArg(), msg.PublishedAt, visibleAfter).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

err := store.InsertDelayed(ctx, "test_topic", []queue.Message{msg}, visibleAfter)
require.NoError(t, err)
require.NoError(t, mock.ExpectationsWereMet())
}

func TestMessageStore_MoveToDLQ(t *testing.T) {
db, mock, store := setupmessageStoreTest(t)
defer db.Close()
Expand Down
22 changes: 18 additions & 4 deletions extension/queue/mysql/mock_stores.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions extension/queue/mysql/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/uber-go/tally/v4"
"go.uber.org/zap"
Expand Down Expand Up @@ -66,6 +67,36 @@ func (p *publisher) Publish(ctx context.Context, topic string, message queue.Mes
return nil
}

// PublishAfter sends a message that becomes visible to subscribers only
// after delayMs from now. The message is inserted with visible_after =
// now + delayMs; FetchByOffset skips it until that timestamp.
// delayMs <= 0 is equivalent to Publish.
func (p *publisher) PublishAfter(ctx context.Context, topic string, message queue.Message, delayMs int64) (retErr error) {
op := metrics.Begin(p.scope, "publish_after", metrics.NewTag("topic", topic))
defer func() { op.Complete(retErr) }()

p.mu.RLock()
closed := p.closed
p.mu.RUnlock()

if closed {
return ErrPublisherClosed
}

var visibleAfter int64
if delayMs > 0 {
visibleAfter = time.Now().UnixMilli() + delayMs
}

if err := p.messageStore.InsertDelayed(ctx, topic, []queue.Message{message}, visibleAfter); err != nil {
return fmt.Errorf("publish_after message store insert error: %w", err)
}

p.logger.Debugw("published delayed message", logTopic, topic, logMessageID, message.ID, "delay_ms", delayMs)

return nil
}

// Close gracefully shuts down the publisher
func (p *publisher) Close() error {
p.mu.Lock()
Expand Down
62 changes: 62 additions & 0 deletions extension/queue/mysql/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,68 @@ func TestPublisher_PublishAfterClose(t *testing.T) {
require.True(t, errors.Is(err, ErrPublisherClosed))
}

func TestPublisher_PublishAfter(t *testing.T) {
tests := []struct {
name string
delayMs int64
wantVisibleArg gomock.Matcher
}{
{
name: "positive delay binds future visible_after",
delayMs: 5000,
// Exact timestamp depends on wall clock; assert it's > 0.
wantVisibleArg: gomock.Cond(func(v any) bool {
ts, ok := v.(int64)
return ok && ts > 0
}),
},
{
name: "zero delay binds visible_after=0",
delayMs: 0,
wantVisibleArg: gomock.Eq(int64(0)),
},
{
name: "negative delay clamps to 0",
delayMs: -100,
wantVisibleArg: gomock.Eq(int64(0)),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockStore := NewMockmessageStore(ctrl)
mockStore.EXPECT().
InsertDelayed(gomock.Any(), "test_topic", gomock.Any(), tt.wantVisibleArg).
Return(nil).
Times(1)

pub := setupPublisherTest(t, mockStore)

msg := queue.NewMessage("msg-delayed", []byte("p"), "part1", nil)
err := pub.PublishAfter(context.Background(), "test_topic", msg, tt.delayMs)
require.NoError(t, err)
})
}
}

func TestPublisher_PublishAfterClosed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockStore := NewMockmessageStore(ctrl)
pub := setupPublisherTest(t, mockStore)

require.NoError(t, pub.Close())

msg := queue.NewMessage("msg1", []byte("p"), "part1", nil)
err := pub.PublishAfter(context.Background(), "test_topic", msg, 1000)
require.Error(t, err)
require.True(t, errors.Is(err, ErrPublisherClosed))
}

func TestPublisher_Close(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
7 changes: 7 additions & 0 deletions extension/queue/mysql/schema/queue_messages.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ CREATE TABLE IF NOT EXISTS queue_messages (
created_at BIGINT UNSIGNED NOT NULL,
published_at BIGINT UNSIGNED NOT NULL,

-- visible_after defers delivery: subscribers skip rows where visible_after > now.
-- 0 (the default) means immediately visible. Set by Publisher.PublishAfter
-- to schedule a fresh message for delivery at a future time without
-- consuming a delivery_state retry slot (used e.g. by the orchestrator's
-- buildstatus polling consumer to space out Status calls).
visible_after BIGINT UNSIGNED NOT NULL DEFAULT 0,

-- DLQ-specific fields (0/"" for normal messages, populated for DLQ messages)
failed_at BIGINT UNSIGNED NOT NULL,
-- failure_count stores how many times the message failed on the ORIGINAL topic before moving to DLQ
Expand Down
Loading
Loading