Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ deps = [
### Testing

- **Table-driven tests** — prefer table-driven tests with `t.Run` subtests over individual test functions.
- **Avoid asserting on error messages** — assert on error type or generic error.
- **Avoid asserting on error messages** — assert on error type or check the error with `require.Error`, do not `assert.Contains(t, err.Error(), message)`
- **No change detector tests** — don't assert on default values, internal structure, or implementation details that can change without affecting behavior. Test what the code *does*, not how it's constructed.
- **No `time.Sleep` for synchronization** — use channels, callbacks, condition variables.
- **Use testify** — `assert`/`require` instead of `t.Fatal()`.
Expand Down
15 changes: 13 additions & 2 deletions core/request/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,32 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "request",
srcs = ["request.go"],
srcs = [
"log.go",
"request.go",
],
importpath = "github.com/uber/submitqueue/core/request",
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//entity",
"//entity/queue",
"//extension/storage",
],
)

go_test(
name = "request_test",
srcs = ["request_test.go"],
srcs = [
"log_test.go",
"request_test.go",
],
embed = [":request"],
deps = [
"//core/consumer",
"//entity",
"//entity/queue",
"//extension/queue/mock",
"//extension/storage",
"//extension/storage/mock",
"@com_github_stretchr_testify//assert",
Expand Down
63 changes: 63 additions & 0 deletions core/request/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package request

import (
"context"
"fmt"

"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/entity"
entityqueue "github.com/uber/submitqueue/entity/queue"
)

// PublishLog publishes a single request log entry to the log topic for async persistence.
// The partitionKey ensures ordering of log entries for the same request; typically set to the request ID.
func PublishLog(ctx context.Context, registry consumer.TopicRegistry, logEntry entity.RequestLog, partitionKey string) error {
payload, err := logEntry.ToBytes()
if err != nil {
return fmt.Errorf("failed to serialize request log: %w", err)
}

msg := entityqueue.NewMessage(logEntry.RequestID, payload, partitionKey, nil)

q, ok := registry.Queue(consumer.TopicKeyLog)
if !ok {
return fmt.Errorf("no queue registered for topic key %s", consumer.TopicKeyLog)
}

topicName, ok := registry.TopicName(consumer.TopicKeyLog)
if !ok {
return fmt.Errorf("no topic name registered for topic key %s", consumer.TopicKeyLog)
}

if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}

return nil
}

// PublishBatchLogs publishes a request log entry for each request ID in the batch to the log topic.
// Each entry uses the request ID as the partition key to ensure per-request ordering.
func PublishBatchLogs(ctx context.Context, registry consumer.TopicRegistry, requestIDs []string, status entity.RequestStatus, metadata map[string]string) error {
for _, requestID := range requestIDs {
logEntry := entity.NewRequestLog(requestID, status, 0, "", metadata)
if err := PublishLog(ctx, registry, logEntry, requestID); err != nil {
return fmt.Errorf("failed to publish request log for request %s: %w", requestID, err)
}
}
return nil
}
115 changes: 115 additions & 0 deletions core/request/log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package request

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/entity/queue"
queuemock "github.com/uber/submitqueue/extension/queue/mock"
"go.uber.org/mock/gomock"
)

func newTestRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error) consumer.TopicRegistry {
mockPub := queuemock.NewMockPublisher(ctrl)
mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, topic string, msg queue.Message) error {
return publishErr
},
).AnyTimes()

mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes()

registry, err := consumer.NewTopicRegistry(
[]consumer.TopicConfig{{Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}},
)
require.NoError(t, err)
return registry
}

func TestPublishLog_Success(t *testing.T) {
ctrl := gomock.NewController(t)
registry := newTestRegistry(t, ctrl, nil)

logEntry := entity.NewRequestLog("req/1", entity.RequestStatusStarted, 1, "", nil)
err := PublishLog(context.Background(), registry, logEntry, "req/1")
require.NoError(t, err)
}

func TestPublishLog_PublishFailure(t *testing.T) {
ctrl := gomock.NewController(t)
registry := newTestRegistry(t, ctrl, fmt.Errorf("connection refused"))

logEntry := entity.NewRequestLog("req/1", entity.RequestStatusStarted, 1, "", nil)
err := PublishLog(context.Background(), registry, logEntry, "req/1")
require.Error(t, err)
}

func TestPublishBatchLogs_Success(t *testing.T) {
ctrl := gomock.NewController(t)
registry := newTestRegistry(t, ctrl, nil)

err := PublishBatchLogs(context.Background(), registry,
[]string{"req/1", "req/2", "req/3"},
entity.RequestStatusScored,
map[string]string{"batch_id": "b/1"},
)
require.NoError(t, err)
}

func TestPublishBatchLogs_PartialFailure(t *testing.T) {
ctrl := gomock.NewController(t)

callCount := 0
mockPub := queuemock.NewMockPublisher(ctrl)
mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, topic string, msg queue.Message) error {
callCount++
if callCount == 2 {
return fmt.Errorf("publish failed")
}
return nil
},
).AnyTimes()

mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes()

registry, err := consumer.NewTopicRegistry(
[]consumer.TopicConfig{{Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}},
)
require.NoError(t, err)

err = PublishBatchLogs(context.Background(), registry,
[]string{"req/1", "req/2", "req/3"},
entity.RequestStatusScored,
map[string]string{"batch_id": "b/1"},
)
require.Error(t, err)
}

func TestPublishBatchLogs_Empty(t *testing.T) {
ctrl := gomock.NewController(t)
registry := newTestRegistry(t, ctrl, nil)

err := PublishBatchLogs(context.Background(), registry, nil, entity.RequestStatusScored, nil)
require.NoError(t, err)
}
6 changes: 6 additions & 0 deletions entity/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
BatchStateSucceeded BatchState = "succeeded"
// BatchStateFailed is the terminal state of a batch that has failed.
BatchStateFailed BatchState = "failed"
// BatchStateScored is the state of a batch that has been scored for build success probability.
BatchStateScored BatchState = "scored"
// BatchStateCancelled is the terminal state of a batch that was cancelled before completion.
BatchStateCancelled BatchState = "cancelled"
)
Expand Down Expand Up @@ -81,6 +83,10 @@ type Batch struct {
// - queueA/batch/3 will contain queueA/batch/1
Dependencies []string

// Score is the predicted probability of build success for this batch, ranging from 0.0 to 1.0.
// Set during the scoring phase. Zero value means the batch has not been scored yet.
Score float64

// The state of the batch lifecycle this batch is in. Updateable field with Version for optimistic locking.
State BatchState

Expand Down
3 changes: 3 additions & 0 deletions entity/request_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const (
// RequestStatusBatched indicates that the request has been included in a new batch and will be sent to speculation.
RequestStatusBatched RequestStatus = "batched"

// RequestStatusScored indicates that the batch containing the request has been scored for build success probability.
RequestStatusScored RequestStatus = "scored"

// RequestStatusSpeculating indicates that the request is currently being speculated (e.g., speculative merge/rebase, etc.).
RequestStatusSpeculating RequestStatus = "speculating"

Expand Down
2 changes: 2 additions & 0 deletions example/server/orchestrator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//core/consumer",
"//entity",
"//extension/counter",
"//extension/counter/mysql",
"//extension/mergechecker",
"//extension/mergechecker/github",
"//extension/queue",
"//extension/queue/mysql",
"//extension/scorer/heuristic",
"//extension/storage",
"//extension/storage/mysql",
"//orchestrator/controller",
Expand Down
15 changes: 15 additions & 0 deletions example/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/counter"
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
"github.com/uber/submitqueue/extension/mergechecker"
githubchecker "github.com/uber/submitqueue/extension/mergechecker/github"
extqueue "github.com/uber/submitqueue/extension/queue"
queueMySQL "github.com/uber/submitqueue/extension/queue/mysql"
"github.com/uber/submitqueue/extension/scorer/heuristic"
"github.com/uber/submitqueue/extension/storage"
mysqlstorage "github.com/uber/submitqueue/extension/storage/mysql"
"github.com/uber/submitqueue/orchestrator/controller"
Expand Down Expand Up @@ -417,6 +419,19 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
logger,
scope,
store,
// TODO: replace with a real scorer
heuristic.New(
[]heuristic.Bucket{
{Min: 0, Max: 1, Score: 0.95},
{Min: 2, Max: 5, Score: 0.80},
{Min: 6, Max: 20, Score: 0.60},
{Min: 21, Max: 1<<31 - 1, Score: 0.40},
},
func(_ context.Context, change entity.Change) (int, error) {
return len(change.URIs), nil
},
scope.SubScope("scorer"),
),
registry,
consumer.TopicKeyScore,
"orchestrator-score",
Expand Down
4 changes: 4 additions & 0 deletions extension/storage/batch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type BatchStore interface {
// The implementation should increment the version by 1 atomically with the state update.
UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error

// UpdateScoreAndState atomically updates the score and state of a batch if the current version matches the expected version.
// If versions do not match, returns ErrVersionMismatch. The implementation should increment the version by 1 atomically.
UpdateScoreAndState(ctx context.Context, id string, version int32, score float64, newState entity.BatchState) error

// GetByQueueAndStates retrieves all batches that belong to the given queue and are in the given states.
GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) ([]entity.Batch, error)
}
14 changes: 14 additions & 0 deletions extension/storage/mock/batch_store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading