diff --git a/extension/queue/README.md b/extension/queue/README.md index 26685670..84b423f6 100644 --- a/extension/queue/README.md +++ b/extension/queue/README.md @@ -13,10 +13,18 @@ Publishes messages to topics. ```go type Publisher interface { Publish(ctx context.Context, topic string, message queue.Message) error + PublishAfter(ctx context.Context, topic string, message queue.Message, delayMs int64) error Close() error } ``` +**`PublishAfter`** inserts a fresh message that becomes visible to subscribers only after `delayMs`. It is distinct from `Nack(requeueAfterMillis)` even though both can produce "next delivery happens at T+delay": + +- `Nack` is "this delivery failed, try again" — it bumps `retry_count` and eventually trips DLQ. +- `PublishAfter` is "postpone this work" — `retry_count` resets to 0, DLQ stays available for true failures. + +Use `PublishAfter` for self-driven poll loops (e.g. the orchestrator's `buildsignal` consumer re-publishing itself between `Status` calls). Use `Nack` for processing failures. + ### Subscriber Consumes messages from topics with per-subscription configuration. diff --git a/extension/queue/mock/publisher_mock.go b/extension/queue/mock/publisher_mock.go index 6d09400b..213d3e2b 100644 --- a/extension/queue/mock/publisher_mock.go +++ b/extension/queue/mock/publisher_mock.go @@ -68,3 +68,17 @@ func (mr *MockPublisherMockRecorder) Publish(ctx, topic, message any) *gomock.Ca mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockPublisher)(nil).Publish), ctx, topic, message) } + +// PublishAfter mocks base method. +func (m *MockPublisher) PublishAfter(ctx context.Context, topic string, message queue.Message, delayMs int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PublishAfter", ctx, topic, message, delayMs) + ret0, _ := ret[0].(error) + return ret0 +} + +// PublishAfter indicates an expected call of PublishAfter. +func (mr *MockPublisherMockRecorder) PublishAfter(ctx, topic, message, delayMs any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishAfter", reflect.TypeOf((*MockPublisher)(nil).PublishAfter), ctx, topic, message, delayMs) +} diff --git a/extension/queue/mysql/README.md b/extension/queue/mysql/README.md index 1e6ce56c..c9015ec3 100644 --- a/extension/queue/mysql/README.md +++ b/extension/queue/mysql/README.md @@ -112,6 +112,8 @@ extension/queue/mysql/ | `queue_partition_leases` | Partition lease coordination | `(consumer_group, topic, partition_key)` | | `queue_subscriber_heartbeats` | Active subscriber tracking | `(consumer_group, topic, subscriber_name)` | +`queue_messages` has a `visible_after BIGINT UNSIGNED NOT NULL DEFAULT 0` column that supports `Publisher.PublishAfter`: subscribers' `FetchByOffset` skips rows where `visible_after > now`. Default 0 means immediately visible, so existing rows continue to behave as before — the column is back-compatible. + See `schema/` for full SQL definitions. See the [RFC](../../doc/rfc/sql-queue-rfc.md#database-schema) for field-level documentation. ### Store Architecture diff --git a/extension/queue/mysql/message_store.go b/extension/queue/mysql/message_store.go index 8afa0d31..fa290255 100644 --- a/extension/queue/mysql/message_store.go +++ b/extension/queue/mysql/message_store.go @@ -44,7 +44,15 @@ func newMessageStore(db *sql.DB, logger *zap.SugaredLogger, scope tally.Scope) m } } -// Insert inserts messages into the messages table. +// Insert inserts messages into the messages table with no visibility delay. +// Equivalent to InsertDelayed with visibleAfterMs == 0. +func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []queue.Message) error { + return s.InsertDelayed(ctx, topic, messages, 0) +} + +// InsertDelayed inserts messages into the messages table, optionally deferring +// delivery until visibleAfterMs (epoch milliseconds). 0 means immediately +// visible; FetchByOffset skips rows where visible_after > now. // // Publishes are idempotent on the (topic, partition_key, id) unique key: a // repeated publish for the same key is silently treated as success and does @@ -53,7 +61,7 @@ func newMessageStore(db *sql.DB, logger *zap.SugaredLogger, scope tally.Scope) m // idempotent publishes") and lets callers safely retry publishes (e.g. a // second Cancel RPC for the same request) without surfacing 1062 duplicate-key // errors. -func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []queue.Message) (retErr error) { +func (s *sqlmessageStore) InsertDelayed(ctx context.Context, topic string, messages []queue.Message, visibleAfterMs int64) (retErr error) { op := metrics.Begin(s.scope, "insert", metrics.NewTag("topic", topic)) defer func() { op.Complete(retErr) }() @@ -64,6 +72,7 @@ func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []q s.logger.Debugw("inserting messages", logTopic, topic, "count", len(messages), + "visible_after", visibleAfterMs, ) tx, err := s.db.BeginTx(ctx, nil) @@ -75,8 +84,8 @@ func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []q // ON DUPLICATE KEY UPDATE topic=topic is a no-op write that makes MySQL // swallow the unique-key violation without mutating the existing row. stmt, err := tx.PrepareContext(ctx, fmt.Sprintf(` - INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, failed_at, failure_count, last_error, original_topic) - VALUES (?, ?, ?, ?, ?, ?, ?, 0, 0, '', '') + INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, visible_after, failed_at, failure_count, last_error, original_topic) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0, 0, '', '') ON DUPLICATE KEY UPDATE topic = topic `, MessagesTableName)) if err != nil { @@ -102,6 +111,7 @@ func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []q msg.PartitionKey, now, msg.PublishedAt, + visibleAfterMs, ) if err != nil { return fmt.Errorf("insert message topic=%s message=%s partition=%s: %w", topic, msg.ID, msg.PartitionKey, err) @@ -137,18 +147,20 @@ func (s *sqlmessageStore) Delete(ctx context.Context, topic string, partitionKey } // FetchByOffset fetches messages with offset > currentOffset for a specific partition. +// Rows whose visible_after > nowMs are skipped — those are deferred deliveries +// (published via InsertDelayed) that should not yet be surfaced to subscribers. // Messages are fetched from the immutable log; no per-message mutation occurs. -func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, limit int) (_ []messageRow, retErr error) { +func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, nowMs int64, limit int) (_ []messageRow, retErr error) { op := metrics.Begin(s.scope, "fetch", metrics.NewTag("topic", topic)) defer func() { op.Complete(retErr) }() rows, err := s.db.QueryContext(ctx, fmt.Sprintf(` SELECT offset, id, payload, metadata, partition_key, published_at, failed_at, failure_count, last_error, original_topic FROM %s - WHERE topic = ? AND partition_key = ? AND offset > ? + WHERE topic = ? AND partition_key = ? AND offset > ? AND visible_after <= ? ORDER BY offset LIMIT ? - `, MessagesTableName), topic, partitionKey, currentOffset, limit) + `, MessagesTableName), topic, partitionKey, currentOffset, nowMs, limit) if err != nil { return nil, fmt.Errorf("query messages topic=%s partition=%s: %w", topic, partitionKey, err) } @@ -254,11 +266,13 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, partition return fmt.Errorf("fetch message for DLQ topic=%s partition=%s message=%s: %w", topic, partitionKey, messageID, err) } - // Insert into queue_messages table with DLQ topic name and DLQ-specific fields + // Insert into queue_messages table with DLQ topic name and DLQ-specific fields. + // DLQ messages are always immediately visible (visible_after=0); any delay on + // the original message has already been consumed by the time it failed. now := time.Now().UnixMilli() _, err = tx.ExecContext(ctx, fmt.Sprintf(` - INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, failed_at, failure_count, last_error, original_topic) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, visible_after, failed_at, failure_count, last_error, original_topic) + VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?, ?, ?, ?) `, MessagesTableName), dlqTopic, messageID, payload, metadataJSON, fetchPartKey, createdAtMilli, publishedAtMilli, now, failureCount, lastError, topic) if err != nil { diff --git a/extension/queue/mysql/message_store_test.go b/extension/queue/mysql/message_store_test.go index 85e401f5..a1d52d2c 100644 --- a/extension/queue/mysql/message_store_test.go +++ b/extension/queue/mysql/message_store_test.go @@ -141,6 +141,7 @@ func TestMessageStore_FetchByOffset(t *testing.T) { topic := "test_topic" partitionKey := "part1" currentOffset := int64(0) + nowMs := time.Now().UnixMilli() limit := 10 // Mock query results (no transaction, simple SELECT) @@ -148,16 +149,60 @@ func TestMessageStore_FetchByOffset(t *testing.T) { AddRow(int64(1), "msg1", []byte("payload1"), []byte("{}"), "part1", time.Now().UnixMilli(), int64(0), 0, "", "") mock.ExpectQuery("SELECT (.+) FROM queue_messages"). - WithArgs(topic, partitionKey, currentOffset, limit). + WithArgs(topic, partitionKey, currentOffset, nowMs, limit). WillReturnRows(rows) - results, err := store.FetchByOffset(ctx, topic, partitionKey, currentOffset, limit) + results, err := store.FetchByOffset(ctx, topic, partitionKey, currentOffset, nowMs, limit) require.NoError(t, err) require.Len(t, results, 1) require.Equal(t, "msg1", results[0].ID) require.NoError(t, mock.ExpectationsWereMet()) } +func TestMessageStore_FetchByOffset_SkipsDelayed(t *testing.T) { + db, mock, store := setupmessageStoreTest(t) + defer db.Close() + + ctx := context.Background() + topic := "test_topic" + partitionKey := "part1" + currentOffset := int64(0) + nowMs := int64(1000) + limit := 10 + + // The SQL filter (visible_after <= nowMs) is applied by the DB; sqlmock just + // verifies the parameter binding. An empty result row simulates the case + // where the only message is still deferred. + mock.ExpectQuery("SELECT (.+) FROM queue_messages"). + WithArgs(topic, partitionKey, currentOffset, nowMs, limit). + WillReturnRows(sqlmock.NewRows([]string{"offset", "id", "payload", "metadata", "partition_key", "published_at", "failed_at", "failure_count", "last_error", "original_topic"})) + + results, err := store.FetchByOffset(ctx, topic, partitionKey, currentOffset, nowMs, limit) + require.NoError(t, err) + require.Empty(t, results) + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestMessageStore_InsertDelayed(t *testing.T) { + db, mock, store := setupmessageStoreTest(t) + defer db.Close() + + ctx := context.Background() + visibleAfter := time.Now().UnixMilli() + 5000 + msg := queue.Message{ID: "msg-delayed", Payload: []byte("p"), PartitionKey: "part1", PublishedAt: time.Now().UnixMilli()} + + mock.ExpectBegin() + mock.ExpectPrepare("INSERT INTO queue_messages") + mock.ExpectExec("INSERT INTO queue_messages"). + WithArgs("test_topic", msg.ID, msg.Payload, []byte(nil), msg.PartitionKey, sqlmock.AnyArg(), msg.PublishedAt, visibleAfter). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + err := store.InsertDelayed(ctx, "test_topic", []queue.Message{msg}, visibleAfter) + require.NoError(t, err) + require.NoError(t, mock.ExpectationsWereMet()) +} + func TestMessageStore_MoveToDLQ(t *testing.T) { db, mock, store := setupmessageStoreTest(t) defer db.Close() diff --git a/extension/queue/mysql/mock_stores.go b/extension/queue/mysql/mock_stores.go index cf8477de..e2f9ed36 100644 --- a/extension/queue/mysql/mock_stores.go +++ b/extension/queue/mysql/mock_stores.go @@ -56,18 +56,18 @@ func (mr *MockmessageStoreMockRecorder) Delete(ctx, topic, partitionKey, message } // FetchByOffset mocks base method. -func (m *MockmessageStore) FetchByOffset(ctx context.Context, topic, partitionKey string, currentOffset int64, limit int) ([]messageRow, error) { +func (m *MockmessageStore) FetchByOffset(ctx context.Context, topic, partitionKey string, currentOffset, nowMs int64, limit int) ([]messageRow, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchByOffset", ctx, topic, partitionKey, currentOffset, limit) + ret := m.ctrl.Call(m, "FetchByOffset", ctx, topic, partitionKey, currentOffset, nowMs, limit) ret0, _ := ret[0].([]messageRow) ret1, _ := ret[1].(error) return ret0, ret1 } // FetchByOffset indicates an expected call of FetchByOffset. -func (mr *MockmessageStoreMockRecorder) FetchByOffset(ctx, topic, partitionKey, currentOffset, limit any) *gomock.Call { +func (mr *MockmessageStoreMockRecorder) FetchByOffset(ctx, topic, partitionKey, currentOffset, nowMs, limit any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchByOffset", reflect.TypeOf((*MockmessageStore)(nil).FetchByOffset), ctx, topic, partitionKey, currentOffset, limit) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchByOffset", reflect.TypeOf((*MockmessageStore)(nil).FetchByOffset), ctx, topic, partitionKey, currentOffset, nowMs, limit) } // GarbageCollect mocks base method. @@ -114,6 +114,20 @@ func (mr *MockmessageStoreMockRecorder) Insert(ctx, topic, messages any) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Insert", reflect.TypeOf((*MockmessageStore)(nil).Insert), ctx, topic, messages) } +// InsertDelayed mocks base method. +func (m *MockmessageStore) InsertDelayed(ctx context.Context, topic string, messages []queue.Message, visibleAfterMs int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InsertDelayed", ctx, topic, messages, visibleAfterMs) + ret0, _ := ret[0].(error) + return ret0 +} + +// InsertDelayed indicates an expected call of InsertDelayed. +func (mr *MockmessageStoreMockRecorder) InsertDelayed(ctx, topic, messages, visibleAfterMs any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertDelayed", reflect.TypeOf((*MockmessageStore)(nil).InsertDelayed), ctx, topic, messages, visibleAfterMs) +} + // MoveToDLQ mocks base method. func (m *MockmessageStore) MoveToDLQ(ctx context.Context, topic, partitionKey, messageID string, failureCount int, lastError, dlqTopicSuffix string) error { m.ctrl.T.Helper() diff --git a/extension/queue/mysql/publisher.go b/extension/queue/mysql/publisher.go index e0fe05a5..c487ed01 100644 --- a/extension/queue/mysql/publisher.go +++ b/extension/queue/mysql/publisher.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/uber-go/tally/v4" "go.uber.org/zap" @@ -66,6 +67,36 @@ func (p *publisher) Publish(ctx context.Context, topic string, message queue.Mes return nil } +// PublishAfter sends a message that becomes visible to subscribers only +// after delayMs from now. The message is inserted with visible_after = +// now + delayMs; FetchByOffset skips it until that timestamp. +// delayMs <= 0 is equivalent to Publish. +func (p *publisher) PublishAfter(ctx context.Context, topic string, message queue.Message, delayMs int64) (retErr error) { + op := metrics.Begin(p.scope, "publish_after", metrics.NewTag("topic", topic)) + defer func() { op.Complete(retErr) }() + + p.mu.RLock() + closed := p.closed + p.mu.RUnlock() + + if closed { + return ErrPublisherClosed + } + + var visibleAfter int64 + if delayMs > 0 { + visibleAfter = time.Now().UnixMilli() + delayMs + } + + if err := p.messageStore.InsertDelayed(ctx, topic, []queue.Message{message}, visibleAfter); err != nil { + return fmt.Errorf("publish_after message store insert error: %w", err) + } + + p.logger.Debugw("published delayed message", logTopic, topic, logMessageID, message.ID, "delay_ms", delayMs) + + return nil +} + // Close gracefully shuts down the publisher func (p *publisher) Close() error { p.mu.Lock() diff --git a/extension/queue/mysql/publisher_test.go b/extension/queue/mysql/publisher_test.go index 6138aabd..b4f46cda 100644 --- a/extension/queue/mysql/publisher_test.go +++ b/extension/queue/mysql/publisher_test.go @@ -160,6 +160,68 @@ func TestPublisher_PublishAfterClose(t *testing.T) { require.True(t, errors.Is(err, ErrPublisherClosed)) } +func TestPublisher_PublishAfter(t *testing.T) { + tests := []struct { + name string + delayMs int64 + wantVisibleArg gomock.Matcher + }{ + { + name: "positive delay binds future visible_after", + delayMs: 5000, + // Exact timestamp depends on wall clock; assert it's > 0. + wantVisibleArg: gomock.Cond(func(v any) bool { + ts, ok := v.(int64) + return ok && ts > 0 + }), + }, + { + name: "zero delay binds visible_after=0", + delayMs: 0, + wantVisibleArg: gomock.Eq(int64(0)), + }, + { + name: "negative delay clamps to 0", + delayMs: -100, + wantVisibleArg: gomock.Eq(int64(0)), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockStore := NewMockmessageStore(ctrl) + mockStore.EXPECT(). + InsertDelayed(gomock.Any(), "test_topic", gomock.Any(), tt.wantVisibleArg). + Return(nil). + Times(1) + + pub := setupPublisherTest(t, mockStore) + + msg := queue.NewMessage("msg-delayed", []byte("p"), "part1", nil) + err := pub.PublishAfter(context.Background(), "test_topic", msg, tt.delayMs) + require.NoError(t, err) + }) + } +} + +func TestPublisher_PublishAfterClosed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockStore := NewMockmessageStore(ctrl) + pub := setupPublisherTest(t, mockStore) + + require.NoError(t, pub.Close()) + + msg := queue.NewMessage("msg1", []byte("p"), "part1", nil) + err := pub.PublishAfter(context.Background(), "test_topic", msg, 1000) + require.Error(t, err) + require.True(t, errors.Is(err, ErrPublisherClosed)) +} + func TestPublisher_Close(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/extension/queue/mysql/schema/queue_messages.sql b/extension/queue/mysql/schema/queue_messages.sql index 50e887b2..8c2e7739 100644 --- a/extension/queue/mysql/schema/queue_messages.sql +++ b/extension/queue/mysql/schema/queue_messages.sql @@ -25,6 +25,13 @@ CREATE TABLE IF NOT EXISTS queue_messages ( created_at BIGINT UNSIGNED NOT NULL, published_at BIGINT UNSIGNED NOT NULL, + -- visible_after defers delivery: subscribers skip rows where visible_after > now. + -- 0 (the default) means immediately visible. Set by Publisher.PublishAfter + -- to schedule a fresh message for delivery at a future time without + -- consuming a delivery_state retry slot (used e.g. by the orchestrator's + -- buildstatus polling consumer to space out Status calls). + visible_after BIGINT UNSIGNED NOT NULL DEFAULT 0, + -- DLQ-specific fields (0/"" for normal messages, populated for DLQ messages) failed_at BIGINT UNSIGNED NOT NULL, -- failure_count stores how many times the message failed on the ORIGINAL topic before moving to DLQ diff --git a/extension/queue/mysql/stores.go b/extension/queue/mysql/stores.go index 10fa7e59..d3ea34c0 100644 --- a/extension/queue/mysql/stores.go +++ b/extension/queue/mysql/stores.go @@ -57,16 +57,21 @@ type messageRow struct { // messageStore handles message table operations (internal use only) type messageStore interface { - // Insert inserts messages into the topic table + // Insert inserts messages into the topic table. Insert(ctx context.Context, topic string, messages []queue.Message) error + // InsertDelayed inserts messages whose delivery is deferred until + // visibleAfterMs (epoch milliseconds). 0 means immediately visible + // and is equivalent to Insert. + InsertDelayed(ctx context.Context, topic string, messages []queue.Message, visibleAfterMs int64) error + // Delete deletes a message by topic, partition key, and ID Delete(ctx context.Context, topic string, partitionKey string, messageID string) error // FetchByOffset fetches messages with offset > currentOffset for a specific partition. - // Messages are returned from the immutable log; per-consumer-group visibility - // is handled by the deliveryStateStore. - FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, limit int) ([]messageRow, error) + // Rows whose visible_after > nowMs are skipped (deferred deliveries). + // Per-consumer-group visibility is handled by the deliveryStateStore. + FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, nowMs int64, limit int) ([]messageRow, error) // MoveToDLQ moves a message to the dead letter queue // dlqTopicSuffix is appended to the original topic to form the DLQ topic name diff --git a/extension/queue/mysql/subscriber.go b/extension/queue/mysql/subscriber.go index 3d9f342b..4c674a5c 100644 --- a/extension/queue/mysql/subscriber.go +++ b/extension/queue/mysql/subscriber.go @@ -771,8 +771,9 @@ func (w *partitionWorker) pollAndDeliver(ctx context.Context) error { return fmt.Errorf("get acked offset: %w", err) } - // Fetch messages from immutable log - rows, err := s.messageStore.FetchByOffset(ctx, sub.topic, partitionKey, currentOffset, cfg.BatchSize) + // Fetch messages from immutable log; defer-visible rows (visible_after > now) + // are skipped at the SQL layer. + rows, err := s.messageStore.FetchByOffset(ctx, sub.topic, partitionKey, currentOffset, time.Now().UnixMilli(), cfg.BatchSize) if err != nil { return fmt.Errorf("fetch messages: %w", err) } diff --git a/extension/queue/mysql/subscriber_test.go b/extension/queue/mysql/subscriber_test.go index 31e4d4b3..dd36359d 100644 --- a/extension/queue/mysql/subscriber_test.go +++ b/extension/queue/mysql/subscriber_test.go @@ -413,7 +413,7 @@ func TestSubscriber_ReconcilePartitionWorkers(t *testing.T) { // Allow offset initialization, fetch, and watermark calls from workers mockOffsetStore.EXPECT().Initialize(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockOffsetStore.EXPECT().GetAckedOffset(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(int64(0), nil).AnyTimes() - mockMessageStore.EXPECT().FetchByOffset(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + mockMessageStore.EXPECT().FetchByOffset(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() mockMessageStore.EXPECT().GetOffsetsAbove(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() mockMessageStore.EXPECT().GarbageCollect(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(int64(0), nil).AnyTimes() mockOffsetStore.EXPECT().GetMinAckedOffset(gomock.Any(), gomock.Any(), gomock.Any()).Return(int64(0), false, nil).AnyTimes() @@ -493,7 +493,7 @@ func TestSubscriber_PartitionWorkerPollAndDeliver(t *testing.T) { Payload: []byte("payload"), PublishedAt: time.Now().UnixMilli(), } - mockMessageStore.EXPECT().FetchByOffset(gomock.Any(), "test_topic", "part-1", int64(0), cfg.BatchSize). + mockMessageStore.EXPECT().FetchByOffset(gomock.Any(), "test_topic", "part-1", int64(0), gomock.Any(), cfg.BatchSize). Return([]messageRow{row}, nil) // Delivery state checks — GetDeliveryState returns not-found (new message) @@ -547,7 +547,7 @@ func TestSubscriber_StopAllWorkers(t *testing.T) { // Allow worker polling and watermark advancement mockOffsetStore.EXPECT().Initialize(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockOffsetStore.EXPECT().GetAckedOffset(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(int64(0), nil).AnyTimes() - mockMessageStore.EXPECT().FetchByOffset(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + mockMessageStore.EXPECT().FetchByOffset(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() mockMessageStore.EXPECT().GetOffsetsAbove(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() mockMessageStore.EXPECT().GarbageCollect(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(int64(0), nil).AnyTimes() mockOffsetStore.EXPECT().GetMinAckedOffset(gomock.Any(), gomock.Any(), gomock.Any()).Return(int64(0), false, nil).AnyTimes() diff --git a/extension/queue/publisher.go b/extension/queue/publisher.go index ee082a69..959b1531 100644 --- a/extension/queue/publisher.go +++ b/extension/queue/publisher.go @@ -28,6 +28,17 @@ type Publisher interface { // Publish sends a message to the specified topic. Publish(ctx context.Context, topic string, message queue.Message) error + // PublishAfter sends a message that becomes visible to subscribers only + // after delayMs from now. It is a fresh publish — not a redelivery — so + // it does not consume a delivery_state retry slot. delayMs <= 0 is + // equivalent to Publish. + // + // Use for "postpone this work" semantics (e.g. spacing out repeated + // poll cycles for a single key). Use Nack with a delay for "this + // delivery failed, try again" — the two signals stay separate so + // retry_count and DLQ behaviour remain meaningful. + PublishAfter(ctx context.Context, topic string, message queue.Message, delayMs int64) error + // Close gracefully shuts down the publisher, flushing pending messages. Close() error }