From d754ba0f156eb7b0b2a652b8a7fe5dd8b0c68397 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Wed, 6 May 2026 15:50:55 -0700 Subject: [PATCH 1/6] add change store extension for per-URI request claims Introduce a dedicated extension that tracks (URI, request_id) claims with mutable metadata, scoped to queue. Foundation for cross-request URI overlap detection; no consumers in this commit. - entity/change_record.go: immutable identity (URI, RequestID), mutable metadata - extension/changestore/: ChangeStore interface, mysql impl, mock, schema - INSERT IGNORE on writes for retry idempotency on (uri, request_id) - FindOverlapping is single-table; callers do their own liveness check - integration test under test/integration/extension/changestore/mysql/ - e2e suite applies the change schema (harmless no-op until consumers land) --- Makefile | 2 +- entity/BUILD.bazel | 1 + entity/change_record.go | 45 ++++ extension/changestore/BUILD.bazel | 9 + extension/changestore/README.md | 19 ++ extension/changestore/change_store.go | 51 +++++ extension/changestore/mock/BUILD.bazel | 12 ++ .../changestore/mock/change_store_mock.go | 71 ++++++ extension/changestore/mysql/BUILD.bazel | 14 ++ extension/changestore/mysql/change_store.go | 118 ++++++++++ .../changestore/mysql/schema/BUILD.bazel | 5 + extension/changestore/mysql/schema/change.sql | 9 + test/e2e/suite_test.go | 1 + .../extension/changestore/mysql/BUILD.bazel | 22 ++ .../changestore/mysql/changestore_test.go | 202 ++++++++++++++++++ .../changestore/mysql/docker-compose.yml | 19 ++ 16 files changed, 599 insertions(+), 1 deletion(-) create mode 100644 entity/change_record.go create mode 100644 extension/changestore/BUILD.bazel create mode 100644 extension/changestore/README.md create mode 100644 extension/changestore/change_store.go create mode 100644 extension/changestore/mock/BUILD.bazel create mode 100644 extension/changestore/mock/change_store_mock.go create mode 100644 extension/changestore/mysql/BUILD.bazel create mode 100644 extension/changestore/mysql/change_store.go create mode 100644 extension/changestore/mysql/schema/BUILD.bazel create mode 100644 extension/changestore/mysql/schema/change.sql create mode 100644 test/integration/extension/changestore/mysql/BUILD.bazel create mode 100644 test/integration/extension/changestore/mysql/changestore_test.go create mode 100644 test/integration/extension/changestore/mysql/docker-compose.yml diff --git a/Makefile b/Makefile index 622364cd..c9518c43 100644 --- a/Makefile +++ b/Makefile @@ -245,7 +245,7 @@ local-stop: ## Stop all services (keep data) mocks: ## Generate mock files using mockgen @echo "Generating mocks..." - @$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/scorer/... ./core/consumer/... + @$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/changestore/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/scorer/... ./core/consumer/... @echo "Mocks generated successfully!" proto: ## Generate protobuf files from .proto definitions diff --git a/entity/BUILD.bazel b/entity/BUILD.bazel index 09c5a360..5231f436 100644 --- a/entity/BUILD.bazel +++ b/entity/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "batch_dependent.go", "build.go", "change_provider.go", + "change_record.go", "land_request.go", "queue_config.go", "request.go", diff --git a/entity/change_record.go b/entity/change_record.go new file mode 100644 index 00000000..76f2757b --- /dev/null +++ b/entity/change_record.go @@ -0,0 +1,45 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +// ChangeRecord represents a single URI's claim by a request, persisted in the change store. +// The (URI, RequestID) pair is the identity and is immutable; Metadata may be updated over +// time as additional information about the change (e.g., PR title, author, mergeability) +// becomes available. +type ChangeRecord struct { + // URI identifies the change (RFC 3986). Same scheme/format as entity.Change.URIs. + // Example: "github://uber/submitqueue/pull/123/abc123def". + URI string `json:"uri"` + + // RequestID is the owning land request that claimed this URI. + // Format matches entity.Request.ID: "/". + RequestID string `json:"request_id"` + + // Queue is the queue scope for the owning request. Denormalized from the request + // to allow queue-scoped duplicate checks without a join. + Queue string `json:"queue"` + + // Metadata is a JSON-encoded blob of provider-specific information about the change + // (e.g., PR title, author, mergeable state). Empty when the record is first written; + // populated and updated by downstream enrichment. + Metadata string `json:"metadata,omitempty"` + + // CreatedAt is the Unix milliseconds timestamp when this record was first created. + CreatedAt int64 `json:"created_at"` + + // UpdatedAt is the Unix milliseconds timestamp when this record's Metadata was last updated. + // Equal to CreatedAt when the record has never been updated. + UpdatedAt int64 `json:"updated_at"` +} diff --git a/extension/changestore/BUILD.bazel b/extension/changestore/BUILD.bazel new file mode 100644 index 00000000..a80c5ce5 --- /dev/null +++ b/extension/changestore/BUILD.bazel @@ -0,0 +1,9 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "changestore", + srcs = ["change_store.go"], + importpath = "github.com/uber/submitqueue/extension/changestore", + visibility = ["//visibility:public"], + deps = ["//entity"], +) diff --git a/extension/changestore/README.md b/extension/changestore/README.md new file mode 100644 index 00000000..4a4533b6 --- /dev/null +++ b/extension/changestore/README.md @@ -0,0 +1,19 @@ +# ChangeStore + +Vendor-agnostic interface for tracking per-URI claims by in-flight land requests. + +Each record asserts that a specific URI (e.g., a GitHub PR) was claimed by a specific request, scoped to a queue. The store is consulted by the orchestrator's `start` controller to detect duplicate requests — submissions whose URIs overlap with another in-flight request's URIs in the same queue. + +## Semantics + +- **Identity is immutable.** A record is keyed by `(URI, RequestID)`; once written, that pair is never mutated. +- **Metadata is mutable.** The `Metadata` field is intended for provider-specific information about the change (PR title, author, mergeability, etc.) that may be enriched after the record is first written. `UpdatedAt` reflects the last metadata change; `CreatedAt` is fixed at write time. +- **Idempotent writes.** `Create` ignores primary-key conflicts so queue-redelivery of the same request is a safe no-op. +- **No liveness filter.** `FindOverlapping` returns records regardless of whether the owning request is still in flight. Callers must check liveness against `RequestStore` themselves — the store boundary is intentionally one query, one table, no joins. +- **Append-only by design.** Records are not deleted when the owning request reaches a terminal state; the historical claim is preserved for audit. Duplicate detection filters terminals out at query time via the controller-side liveness check. + +## Implementing a Backend + +1. Create `extension/changestore/{backend}/` directory. +2. Implement the `ChangeStore` interface. +3. Add schema files under `extension/changestore/{backend}/schema/` if the backend requires them. diff --git a/extension/changestore/change_store.go b/extension/changestore/change_store.go new file mode 100644 index 00000000..45332ea9 --- /dev/null +++ b/extension/changestore/change_store.go @@ -0,0 +1,51 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package changestore + +//go:generate mockgen -source=change_store.go -destination=mock/change_store_mock.go -package=mock + +import ( + "context" + + "github.com/uber/submitqueue/entity" +) + +// ChangeStore manages per-URI claim records for in-flight land requests. +// Each row records that a specific URI was claimed by a specific request, scoped to a queue. +// The (URI, RequestID) pair is the immutable identity of a record. Metadata may evolve over time. +// +// The store is the source of truth for "which URIs are or were associated with which requests". +// Liveness of an owning request is NOT tracked here — callers must consult RequestStore separately +// to determine whether an owner is in a terminal state. +type ChangeStore interface { + // Create persists a batch of ChangeRecords. Conflicts on the (URI, RequestID) primary key + // are silently ignored, making the call idempotent under queue redeliveries of the same request. + // Records belonging to different requests do not conflict — overlap between requests is detected + // by FindOverlapping, not by Create. + Create(ctx context.Context, records []entity.ChangeRecord) error + + // FindOverlapping returns ChangeRecords whose URI is in the given set, scoped to queue, + // excluding any records belonging to excludeRequestID (so callers can skip self when checking + // for duplicates of a freshly-claimed request). Returns an empty slice when there is no overlap. + // + // Liveness of the returned records' owning requests is NOT filtered here — the caller is + // responsible for consulting RequestStore to skip terminal owners. + FindOverlapping( + ctx context.Context, + queue string, + uris []string, + excludeRequestID string, + ) ([]entity.ChangeRecord, error) +} diff --git a/extension/changestore/mock/BUILD.bazel b/extension/changestore/mock/BUILD.bazel new file mode 100644 index 00000000..6d1bd2de --- /dev/null +++ b/extension/changestore/mock/BUILD.bazel @@ -0,0 +1,12 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "mock", + srcs = ["change_store_mock.go"], + importpath = "github.com/uber/submitqueue/extension/changestore/mock", + visibility = ["//visibility:public"], + deps = [ + "//entity", + "@org_uber_go_mock//gomock", + ], +) diff --git a/extension/changestore/mock/change_store_mock.go b/extension/changestore/mock/change_store_mock.go new file mode 100644 index 00000000..a1826380 --- /dev/null +++ b/extension/changestore/mock/change_store_mock.go @@ -0,0 +1,71 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: change_store.go +// +// Generated by this command: +// +// mockgen -source=change_store.go -destination=mock/change_store_mock.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + entity "github.com/uber/submitqueue/entity" + gomock "go.uber.org/mock/gomock" +) + +// MockChangeStore is a mock of ChangeStore interface. +type MockChangeStore struct { + ctrl *gomock.Controller + recorder *MockChangeStoreMockRecorder + isgomock struct{} +} + +// MockChangeStoreMockRecorder is the mock recorder for MockChangeStore. +type MockChangeStoreMockRecorder struct { + mock *MockChangeStore +} + +// NewMockChangeStore creates a new mock instance. +func NewMockChangeStore(ctrl *gomock.Controller) *MockChangeStore { + mock := &MockChangeStore{ctrl: ctrl} + mock.recorder = &MockChangeStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockChangeStore) EXPECT() *MockChangeStoreMockRecorder { + return m.recorder +} + +// Create mocks base method. +func (m *MockChangeStore) Create(ctx context.Context, records []entity.ChangeRecord) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Create", ctx, records) + ret0, _ := ret[0].(error) + return ret0 +} + +// Create indicates an expected call of Create. +func (mr *MockChangeStoreMockRecorder) Create(ctx, records any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockChangeStore)(nil).Create), ctx, records) +} + +// FindOverlapping mocks base method. +func (m *MockChangeStore) FindOverlapping(ctx context.Context, queue string, uris []string, excludeRequestID string) ([]entity.ChangeRecord, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FindOverlapping", ctx, queue, uris, excludeRequestID) + ret0, _ := ret[0].([]entity.ChangeRecord) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FindOverlapping indicates an expected call of FindOverlapping. +func (mr *MockChangeStoreMockRecorder) FindOverlapping(ctx, queue, uris, excludeRequestID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindOverlapping", reflect.TypeOf((*MockChangeStore)(nil).FindOverlapping), ctx, queue, uris, excludeRequestID) +} diff --git a/extension/changestore/mysql/BUILD.bazel b/extension/changestore/mysql/BUILD.bazel new file mode 100644 index 00000000..73f9f090 --- /dev/null +++ b/extension/changestore/mysql/BUILD.bazel @@ -0,0 +1,14 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "mysql", + srcs = ["change_store.go"], + importpath = "github.com/uber/submitqueue/extension/changestore/mysql", + visibility = ["//visibility:public"], + deps = [ + "//core/metrics", + "//entity", + "//extension/changestore", + "@com_github_uber_go_tally_v4//:tally", + ], +) diff --git a/extension/changestore/mysql/change_store.go b/extension/changestore/mysql/change_store.go new file mode 100644 index 00000000..d8a36852 --- /dev/null +++ b/extension/changestore/mysql/change_store.go @@ -0,0 +1,118 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/uber-go/tally/v4" + + "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/changestore" +) + +type changeStore struct { + db *sql.DB + scope tally.Scope +} + +// NewChangeStore creates a new MySQL-backed ChangeStore. +func NewChangeStore(db *sql.DB, scope tally.Scope) changestore.ChangeStore { + return &changeStore{db: db, scope: scope} +} + +// Create inserts a batch of ChangeRecords. Primary-key conflicts on (uri, request_id) +// are silently ignored via INSERT IGNORE so queue-redelivery of the same request is a no-op. +func (s *changeStore) Create(ctx context.Context, records []entity.ChangeRecord) (retErr error) { + op := metrics.Begin(s.scope, "create") + defer func() { op.Complete(retErr) }() + + if len(records) == 0 { + return nil + } + + const cols = 6 + placeholders := strings.Repeat("(?, ?, ?, ?, ?, ?), ", len(records)) + placeholders = placeholders[:len(placeholders)-2] // trim trailing ", " + + args := make([]any, 0, len(records)*cols) + for _, r := range records { + // Pass empty Metadata as NULL — JSON column rejects empty string but accepts NULL. + var metadata any + if r.Metadata != "" { + metadata = r.Metadata + } + args = append(args, r.URI, r.RequestID, r.Queue, metadata, r.CreatedAt, r.UpdatedAt) + } + + query := "INSERT IGNORE INTO `change` (uri, request_id, queue, metadata, created_at, updated_at) VALUES " + placeholders + if _, err := s.db.ExecContext(ctx, query, args...); err != nil { + return fmt.Errorf("failed to insert change records (count=%d): %w", len(records), err) + } + return nil +} + +// FindOverlapping returns ChangeRecords whose uri is in the given set, scoped to queue, +// excluding any belonging to excludeRequestID. +func (s *changeStore) FindOverlapping( + ctx context.Context, + queue string, + uris []string, + excludeRequestID string, +) (ret []entity.ChangeRecord, retErr error) { + op := metrics.Begin(s.scope, "find_overlapping") + defer func() { op.Complete(retErr) }() + + if len(uris) == 0 { + return nil, nil + } + + uriPlaceholders := "?" + strings.Repeat(", ?", len(uris)-1) + query := "SELECT uri, request_id, queue, metadata, created_at, updated_at FROM `change` " + + "WHERE uri IN (" + uriPlaceholders + ") AND queue = ? AND request_id != ?" + + args := make([]any, 0, len(uris)+2) + for _, u := range uris { + args = append(args, u) + } + args = append(args, queue, excludeRequestID) + + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("failed to query overlapping changes for queue=%s: %w", queue, err) + } + defer rows.Close() + + var results []entity.ChangeRecord + for rows.Next() { + var rec entity.ChangeRecord + var metadata sql.NullString + if err := rows.Scan(&rec.URI, &rec.RequestID, &rec.Queue, &metadata, &rec.CreatedAt, &rec.UpdatedAt); err != nil { + return nil, fmt.Errorf("failed to scan change record for queue=%s: %w", queue, err) + } + if metadata.Valid { + rec.Metadata = metadata.String + } + results = append(results, rec) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("failed to iterate change records for queue=%s: %w", queue, err) + } + return results, nil +} diff --git a/extension/changestore/mysql/schema/BUILD.bazel b/extension/changestore/mysql/schema/BUILD.bazel new file mode 100644 index 00000000..3412d773 --- /dev/null +++ b/extension/changestore/mysql/schema/BUILD.bazel @@ -0,0 +1,5 @@ +filegroup( + name = "schema", + srcs = glob(["*.sql"]), + visibility = ["//visibility:public"], +) diff --git a/extension/changestore/mysql/schema/change.sql b/extension/changestore/mysql/schema/change.sql new file mode 100644 index 00000000..4c92c28c --- /dev/null +++ b/extension/changestore/mysql/schema/change.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS `change` ( + uri VARCHAR(255) NOT NULL, + request_id VARCHAR(255) NOT NULL, + queue VARCHAR(255) NOT NULL, + metadata JSON, + created_at BIGINT NOT NULL, + updated_at BIGINT NOT NULL, + PRIMARY KEY (uri, request_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/test/e2e/suite_test.go b/test/e2e/suite_test.go index 17a74ed4..ee2e19c9 100644 --- a/test/e2e/suite_test.go +++ b/test/e2e/suite_test.go @@ -85,6 +85,7 @@ func (s *E2EIntegrationSuite) SetupSuite() { // Apply schemas programmatically to application database testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("extension/storage/mysql/schema")) testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("extension/counter/mysql/schema")) + testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("extension/changestore/mysql/schema")) // Apply schemas programmatically to queue database testutil.ApplySchema(t, s.log, s.queueDB, testutil.SchemaDir("extension/queue/mysql/schema")) diff --git a/test/integration/extension/changestore/mysql/BUILD.bazel b/test/integration/extension/changestore/mysql/BUILD.bazel new file mode 100644 index 00000000..f203f98f --- /dev/null +++ b/test/integration/extension/changestore/mysql/BUILD.bazel @@ -0,0 +1,22 @@ +load("@rules_go//go:def.bzl", "go_test") + +go_test( + name = "mysql_test", + srcs = ["changestore_test.go"], + data = [ + "docker-compose.yml", + "//extension/changestore/mysql/schema", + ], + tags = ["integration"], + deps = [ + "//entity", + "//extension/changestore", + "//extension/changestore/mysql", + "//test/testutil", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_stretchr_testify//suite", + "@com_github_uber_go_tally_v4//:tally", + ], +) diff --git a/test/integration/extension/changestore/mysql/changestore_test.go b/test/integration/extension/changestore/mysql/changestore_test.go new file mode 100644 index 00000000..594b7425 --- /dev/null +++ b/test/integration/extension/changestore/mysql/changestore_test.go @@ -0,0 +1,202 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "context" + "database/sql" + "sort" + "testing" + + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/uber-go/tally/v4" + + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/changestore" + mysqlchangestore "github.com/uber/submitqueue/extension/changestore/mysql" + "github.com/uber/submitqueue/test/testutil" +) + +// MySQLChangeStoreIntegrationSuite tests the MySQL ChangeStore implementation +// against a real MySQL instance launched via docker-compose. +type MySQLChangeStoreIntegrationSuite struct { + suite.Suite + stack *testutil.ComposeStack + db *sql.DB + store changestore.ChangeStore + log *testutil.TestLogger + ctx context.Context +} + +func TestMySQLChangeStoreIntegration(t *testing.T) { + suite.Run(t, new(MySQLChangeStoreIntegrationSuite)) +} + +func (s *MySQLChangeStoreIntegrationSuite) SetupSuite() { + t := s.T() + s.ctx = context.Background() + s.log = testutil.NewTestLogger(t) + + s.log.Logf("Starting MySQL ChangeStore integration test suite using docker-compose") + + s.stack = testutil.NewComposeStack( + t, + s.log, + s.ctx, + "docker-compose.yml", + "ext-changestore-mysql", + ) + + require.NoError(t, s.stack.Up(), "failed to start compose stack") + s.log.Logf("Compose stack started successfully") + + var err error + s.db, err = s.stack.ConnectMySQLService("mysql") + require.NoError(t, err, "failed to connect to MySQL") + + testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("extension/changestore/mysql/schema")) + s.log.Logf("Schemas applied successfully") + + s.store = mysqlchangestore.NewChangeStore(s.db, tally.NoopScope) + + t.Cleanup(func() { + if s.db != nil { + s.log.Logf("Closing MySQL connection") + s.db.Close() + } + }) +} + +// SetupTest truncates the change table between tests so cases stay isolated. +func (s *MySQLChangeStoreIntegrationSuite) SetupTest() { + _, err := s.db.ExecContext(s.ctx, "TRUNCATE TABLE `change`") + require.NoError(s.T(), err) +} + +func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndFind_NoOverlap() { + t := s.T() + require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ + {URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, + })) + + got, err := s.store.FindOverlapping(s.ctx, "q", []string{"github://uber/x/pull/2/bbb"}, "q/2") + require.NoError(t, err) + assert.Empty(t, got) +} + +func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndFind_Overlap() { + t := s.T() + uri := "github://uber/x/pull/1/aaa" + require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ + {URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, + })) + + got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}, "q/2") + require.NoError(t, err) + require.Len(t, got, 1) + assert.Equal(t, "q/1", got[0].RequestID) + assert.Equal(t, uri, got[0].URI) + assert.Equal(t, "q", got[0].Queue) +} + +func (s *MySQLChangeStoreIntegrationSuite) TestFindOverlapping_ExcludesSelf() { + t := s.T() + uri := "github://uber/x/pull/1/aaa" + require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ + {URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, + })) + + got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}, "q/1") + require.NoError(t, err) + assert.Empty(t, got, "FindOverlapping must not return rows for excludeRequestID") +} + +func (s *MySQLChangeStoreIntegrationSuite) TestFindOverlapping_QueueScoped() { + t := s.T() + uri := "github://uber/x/pull/1/aaa" + require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ + {URI: uri, RequestID: "qA/1", Queue: "qA", CreatedAt: 1, UpdatedAt: 1}, + })) + + got, err := s.store.FindOverlapping(s.ctx, "qB", []string{uri}, "qB/1") + require.NoError(t, err) + assert.Empty(t, got, "FindOverlapping must not return rows from a different queue") +} + +func (s *MySQLChangeStoreIntegrationSuite) TestCreate_Idempotent() { + t := s.T() + rec := entity.ChangeRecord{URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1} + + require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{rec})) + require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{rec}), "second insert with same PK must succeed (INSERT IGNORE)") + + var count int + require.NoError(t, s.db.QueryRowContext(s.ctx, "SELECT COUNT(*) FROM `change`").Scan(&count)) + assert.Equal(t, 1, count, "INSERT IGNORE must not duplicate rows") +} + +func (s *MySQLChangeStoreIntegrationSuite) TestCreate_DifferentRequestSameURI() { + t := s.T() + uri := "github://uber/x/pull/1/aaa" + require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ + {URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, + {URI: uri, RequestID: "q/2", Queue: "q", CreatedAt: 2, UpdatedAt: 2}, + })) + + got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}, "q/3") + require.NoError(t, err) + require.Len(t, got, 2) + + ids := []string{got[0].RequestID, got[1].RequestID} + sort.Strings(ids) + assert.Equal(t, []string{"q/1", "q/2"}, ids) +} + +func (s *MySQLChangeStoreIntegrationSuite) TestCreate_PreservesMetadata() { + t := s.T() + const meta = `{"title":"add new feature"}` + uri := "github://uber/x/pull/1/aaa" + require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ + {URI: uri, RequestID: "q/1", Queue: "q", Metadata: meta, CreatedAt: 1, UpdatedAt: 1}, + })) + + got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}, "q/other") + require.NoError(t, err) + require.Len(t, got, 1) + assert.JSONEq(t, meta, got[0].Metadata) +} + +func (s *MySQLChangeStoreIntegrationSuite) TestCreate_EmptyIsNoOp() { + t := s.T() + require.NoError(t, s.store.Create(s.ctx, nil)) + + var count int + require.NoError(t, s.db.QueryRowContext(s.ctx, "SELECT COUNT(*) FROM `change`").Scan(&count)) + assert.Zero(t, count) +} + +func (s *MySQLChangeStoreIntegrationSuite) TestFindOverlapping_EmptyURIsIsNoOp() { + t := s.T() + require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ + {URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, + })) + + got, err := s.store.FindOverlapping(s.ctx, "q", nil, "q/2") + require.NoError(t, err) + assert.Empty(t, got) +} diff --git a/test/integration/extension/changestore/mysql/docker-compose.yml b/test/integration/extension/changestore/mysql/docker-compose.yml new file mode 100644 index 00000000..74241c77 --- /dev/null +++ b/test/integration/extension/changestore/mysql/docker-compose.yml @@ -0,0 +1,19 @@ +# Docker Compose for MySQL ChangeStore Library Tests +# Tests the changestore library's MySQL implementation in isolation. + +services: + mysql: + image: mysql:8.0 + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: submitqueue + ports: + - "3306" # Random ephemeral port to avoid conflicts + healthcheck: + # Use 127.0.0.1 (TCP) instead of localhost (Unix socket). MySQL treats + # "localhost" as a socket connection, which can be ready before the TCP + # listener — causing dependent services that connect over TCP to fail. + test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"] + interval: 5s + timeout: 5s + retries: 10 From cb3306f6000212f5be6c0a286fcfde8150491363 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Wed, 6 May 2026 20:19:02 -0700 Subject: [PATCH 2/6] test/e2e: include changestore schema in bazel data deps The e2e suite_test applies the change schema, but the bazel test target was missing //extension/changestore/mysql/schema in its data list, so runfiles didn't ship the .sql file and ApplySchema couldn't find it. --- test/e2e/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/test/e2e/BUILD.bazel b/test/e2e/BUILD.bazel index a6ac7943..97fc8d32 100644 --- a/test/e2e/BUILD.bazel +++ b/test/e2e/BUILD.bazel @@ -7,6 +7,7 @@ go_test( "//:MODULE.bazel", "//:go.mod", "//example/server:docker-compose.yml", + "//extension/changestore/mysql/schema", "//extension/counter/mysql/schema", "//extension/queue/mysql/schema", "//extension/storage/mysql/schema", From ff53ce6419d231891405d33ccc83ae898a95b7fe Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Thu, 7 May 2026 23:22:48 -0700 Subject: [PATCH 3/6] address review comments: queue-leading PK, NOT NULL metadata, cleaner API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - schema: PRIMARY KEY (queue, uri, request_id) — queue-scoped lookups become PK-prefix scans and the table is shardable by queue. Comment in the schema explains why request_id stays in the PK (concurrent claims by different requests coexist as distinct rows; same-request retries collide on the PK). - schema: metadata JSON NOT NULL. The mysql impl writes '{}' for empty metadata so callers don't need to know about the column constraint. - interface: drop excludeRequestID from FindOverlapping. Callers that want to skip self filter the result by RequestID themselves. Documented Create's batch atomicity (single multi-row INSERT, all-or-nothing). - mysql: FindOverlapping query now leads with `WHERE queue = ?` to align with the new PK order. - entity: expanded RequestID/Queue field comments to explain their PK roles. - README: documents the new key shape and metadata semantics. - integration tests: drop the 4th arg, replace TestFindOverlapping_ExcludesSelf with a test that asserts the store does NOT exclude self, and add an assertion that empty metadata round-trips as '{}'. --- entity/change_record.go | 20 +++++++---- extension/changestore/README.md | 10 +++--- extension/changestore/change_store.go | 25 ++++++++----- .../changestore/mock/change_store_mock.go | 8 ++--- extension/changestore/mysql/change_store.go | 36 ++++++++++--------- extension/changestore/mysql/schema/change.sql | 9 +++-- .../changestore/mysql/changestore_test.go | 35 +++++++++++++----- 7 files changed, 91 insertions(+), 52 deletions(-) diff --git a/entity/change_record.go b/entity/change_record.go index 76f2757b..c5f1c28f 100644 --- a/entity/change_record.go +++ b/entity/change_record.go @@ -15,9 +15,9 @@ package entity // ChangeRecord represents a single URI's claim by a request, persisted in the change store. -// The (URI, RequestID) pair is the identity and is immutable; Metadata may be updated over -// time as additional information about the change (e.g., PR title, author, mergeability) -// becomes available. +// The (Queue, URI, RequestID) triple is the identity and is immutable; Metadata may be +// updated over time as additional information about the change (e.g., PR title, author, +// mergeability) becomes available. type ChangeRecord struct { // URI identifies the change (RFC 3986). Same scheme/format as entity.Change.URIs. // Example: "github://uber/submitqueue/pull/123/abc123def". @@ -25,15 +25,21 @@ type ChangeRecord struct { // RequestID is the owning land request that claimed this URI. // Format matches entity.Request.ID: "/". + // + // RequestID participates in the change-store primary key so that concurrent claims + // by different requests on the same URI coexist as distinct rows. Same-request + // retries collide on the PK and are absorbed idempotently; different-request + // collisions surface as additional rows that callers detect via FindOverlapping. RequestID string `json:"request_id"` - // Queue is the queue scope for the owning request. Denormalized from the request - // to allow queue-scoped duplicate checks without a join. + // Queue is the queue the owning request belongs to. It is the leading column of + // the change-store primary key, so queue-scoped duplicate checks become PK-prefix + // scans and the table is shardable by queue. Queue string `json:"queue"` // Metadata is a JSON-encoded blob of provider-specific information about the change - // (e.g., PR title, author, mergeable state). Empty when the record is first written; - // populated and updated by downstream enrichment. + // (e.g., PR title, author, mergeable state). Stored as `'{}'` when no metadata has + // been populated yet; updated by downstream enrichment. Metadata string `json:"metadata,omitempty"` // CreatedAt is the Unix milliseconds timestamp when this record was first created. diff --git a/extension/changestore/README.md b/extension/changestore/README.md index 4a4533b6..68509301 100644 --- a/extension/changestore/README.md +++ b/extension/changestore/README.md @@ -6,10 +6,12 @@ Each record asserts that a specific URI (e.g., a GitHub PR) was claimed by a spe ## Semantics -- **Identity is immutable.** A record is keyed by `(URI, RequestID)`; once written, that pair is never mutated. -- **Metadata is mutable.** The `Metadata` field is intended for provider-specific information about the change (PR title, author, mergeability, etc.) that may be enriched after the record is first written. `UpdatedAt` reflects the last metadata change; `CreatedAt` is fixed at write time. -- **Idempotent writes.** `Create` ignores primary-key conflicts so queue-redelivery of the same request is a safe no-op. -- **No liveness filter.** `FindOverlapping` returns records regardless of whether the owning request is still in flight. Callers must check liveness against `RequestStore` themselves — the store boundary is intentionally one query, one table, no joins. +- **Identity is immutable.** A record is keyed by `(Queue, URI, RequestID)`; once written, that triple is never mutated. +- **Queue leads the key.** Backends should make `Queue` the leading column of the primary key (or partition key, in shardable stores). All reads are queue-scoped, so this turns lookups into PK-prefix scans and keeps the table shardable. +- **`RequestID` in the key is intentional.** Concurrent claims by different requests on the same URI coexist as distinct rows. Same-request retries collide on the PK and are absorbed idempotently; cross-request collisions show up as additional rows that callers detect via `FindOverlapping`. +- **Metadata is required and mutable.** The `Metadata` field is JSON. The store treats `'{}'` as the canonical "no metadata yet" value — callers that pass an empty Go string get `'{}'` written. Downstream enrichment can update it; `UpdatedAt` reflects the last update. +- **Idempotent writes, atomic batches.** `Create` ignores primary-key conflicts so queue-redelivery of the same request is a safe no-op. The whole batch is one underlying multi-row INSERT — partial success is not exposed. +- **No filtering at the store layer.** `FindOverlapping` returns every matching row, including ones owned by the caller's own request. Callers that want to skip self filter the result by `RequestID` themselves. Liveness is also the caller's job — consult `RequestStore` to skip terminal owners. The store boundary is intentionally one query, one table, no joins. - **Append-only by design.** Records are not deleted when the owning request reaches a terminal state; the historical claim is preserved for audit. Duplicate detection filters terminals out at query time via the controller-side liveness check. ## Implementing a Backend diff --git a/extension/changestore/change_store.go b/extension/changestore/change_store.go index 45332ea9..aa2b06e7 100644 --- a/extension/changestore/change_store.go +++ b/extension/changestore/change_store.go @@ -30,22 +30,29 @@ import ( // Liveness of an owning request is NOT tracked here — callers must consult RequestStore separately // to determine whether an owner is in a terminal state. type ChangeStore interface { - // Create persists a batch of ChangeRecords. Conflicts on the (URI, RequestID) primary key - // are silently ignored, making the call idempotent under queue redeliveries of the same request. - // Records belonging to different requests do not conflict — overlap between requests is detected + // Create persists a batch of ChangeRecords as a single atomic operation: either all + // records are written or none are. The batch corresponds to one underlying multi-row + // INSERT, so partial success is never observable. + // + // Primary-key conflicts on (queue, uri, request_id) are silently ignored, which makes + // the call idempotent under queue redeliveries of the same request. Records belonging + // to different requests do not conflict on the PK — cross-request overlap is detected // by FindOverlapping, not by Create. Create(ctx context.Context, records []entity.ChangeRecord) error - // FindOverlapping returns ChangeRecords whose URI is in the given set, scoped to queue, - // excluding any records belonging to excludeRequestID (so callers can skip self when checking - // for duplicates of a freshly-claimed request). Returns an empty slice when there is no overlap. + // FindOverlapping returns ChangeRecords whose URI is in the given set, scoped to queue. + // Returns an empty slice when there is no overlap. + // + // The store does NOT exclude any specific request_id — if the caller wants to skip + // rows belonging to its own in-flight request (the common case when checking for + // duplicates of a freshly-claimed request), it should filter the returned records by + // RequestID itself. // - // Liveness of the returned records' owning requests is NOT filtered here — the caller is - // responsible for consulting RequestStore to skip terminal owners. + // Liveness of the returned records' owning requests is also NOT filtered here — the + // caller is responsible for consulting RequestStore to skip terminal owners. FindOverlapping( ctx context.Context, queue string, uris []string, - excludeRequestID string, ) ([]entity.ChangeRecord, error) } diff --git a/extension/changestore/mock/change_store_mock.go b/extension/changestore/mock/change_store_mock.go index a1826380..e2389f53 100644 --- a/extension/changestore/mock/change_store_mock.go +++ b/extension/changestore/mock/change_store_mock.go @@ -56,16 +56,16 @@ func (mr *MockChangeStoreMockRecorder) Create(ctx, records any) *gomock.Call { } // FindOverlapping mocks base method. -func (m *MockChangeStore) FindOverlapping(ctx context.Context, queue string, uris []string, excludeRequestID string) ([]entity.ChangeRecord, error) { +func (m *MockChangeStore) FindOverlapping(ctx context.Context, queue string, uris []string) ([]entity.ChangeRecord, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FindOverlapping", ctx, queue, uris, excludeRequestID) + ret := m.ctrl.Call(m, "FindOverlapping", ctx, queue, uris) ret0, _ := ret[0].([]entity.ChangeRecord) ret1, _ := ret[1].(error) return ret0, ret1 } // FindOverlapping indicates an expected call of FindOverlapping. -func (mr *MockChangeStoreMockRecorder) FindOverlapping(ctx, queue, uris, excludeRequestID any) *gomock.Call { +func (mr *MockChangeStoreMockRecorder) FindOverlapping(ctx, queue, uris any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindOverlapping", reflect.TypeOf((*MockChangeStore)(nil).FindOverlapping), ctx, queue, uris, excludeRequestID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindOverlapping", reflect.TypeOf((*MockChangeStore)(nil).FindOverlapping), ctx, queue, uris) } diff --git a/extension/changestore/mysql/change_store.go b/extension/changestore/mysql/change_store.go index d8a36852..2a2e1315 100644 --- a/extension/changestore/mysql/change_store.go +++ b/extension/changestore/mysql/change_store.go @@ -37,8 +37,10 @@ func NewChangeStore(db *sql.DB, scope tally.Scope) changestore.ChangeStore { return &changeStore{db: db, scope: scope} } -// Create inserts a batch of ChangeRecords. Primary-key conflicts on (uri, request_id) -// are silently ignored via INSERT IGNORE so queue-redelivery of the same request is a no-op. +// Create inserts a batch of ChangeRecords as a single multi-row INSERT IGNORE. +// Primary-key conflicts on (queue, uri, request_id) are silently ignored so +// queue-redelivery of the same request is a no-op. The whole batch is one +// statement, so partial success is not observable. func (s *changeStore) Create(ctx context.Context, records []entity.ChangeRecord) (retErr error) { op := metrics.Begin(s.scope, "create") defer func() { op.Complete(retErr) }() @@ -53,10 +55,12 @@ func (s *changeStore) Create(ctx context.Context, records []entity.ChangeRecord) args := make([]any, 0, len(records)*cols) for _, r := range records { - // Pass empty Metadata as NULL — JSON column rejects empty string but accepts NULL. - var metadata any - if r.Metadata != "" { - metadata = r.Metadata + // Use the empty JSON object as the canonical "no metadata yet" value. + // metadata is NOT NULL in the schema, and an empty Go string would be + // rejected by the JSON column type. + metadata := r.Metadata + if metadata == "" { + metadata = "{}" } args = append(args, r.URI, r.RequestID, r.Queue, metadata, r.CreatedAt, r.UpdatedAt) } @@ -68,13 +72,13 @@ func (s *changeStore) Create(ctx context.Context, records []entity.ChangeRecord) return nil } -// FindOverlapping returns ChangeRecords whose uri is in the given set, scoped to queue, -// excluding any belonging to excludeRequestID. +// FindOverlapping returns ChangeRecords whose uri is in the given set, scoped to queue. +// The store does not filter by request_id; callers that want to skip self should do so +// after the call. Liveness checks against the request store are also the caller's job. func (s *changeStore) FindOverlapping( ctx context.Context, queue string, uris []string, - excludeRequestID string, ) (ret []entity.ChangeRecord, retErr error) { op := metrics.Begin(s.scope, "find_overlapping") defer func() { op.Complete(retErr) }() @@ -84,14 +88,16 @@ func (s *changeStore) FindOverlapping( } uriPlaceholders := "?" + strings.Repeat(", ?", len(uris)-1) + // queue leads the WHERE clause to align with the (queue, uri, request_id) PK, + // so this is a PK-prefix scan. query := "SELECT uri, request_id, queue, metadata, created_at, updated_at FROM `change` " + - "WHERE uri IN (" + uriPlaceholders + ") AND queue = ? AND request_id != ?" + "WHERE queue = ? AND uri IN (" + uriPlaceholders + ")" - args := make([]any, 0, len(uris)+2) + args := make([]any, 0, 1+len(uris)) + args = append(args, queue) for _, u := range uris { args = append(args, u) } - args = append(args, queue, excludeRequestID) rows, err := s.db.QueryContext(ctx, query, args...) if err != nil { @@ -102,13 +108,9 @@ func (s *changeStore) FindOverlapping( var results []entity.ChangeRecord for rows.Next() { var rec entity.ChangeRecord - var metadata sql.NullString - if err := rows.Scan(&rec.URI, &rec.RequestID, &rec.Queue, &metadata, &rec.CreatedAt, &rec.UpdatedAt); err != nil { + if err := rows.Scan(&rec.URI, &rec.RequestID, &rec.Queue, &rec.Metadata, &rec.CreatedAt, &rec.UpdatedAt); err != nil { return nil, fmt.Errorf("failed to scan change record for queue=%s: %w", queue, err) } - if metadata.Valid { - rec.Metadata = metadata.String - } results = append(results, rec) } if err := rows.Err(); err != nil { diff --git a/extension/changestore/mysql/schema/change.sql b/extension/changestore/mysql/schema/change.sql index 4c92c28c..757ad142 100644 --- a/extension/changestore/mysql/schema/change.sql +++ b/extension/changestore/mysql/schema/change.sql @@ -1,9 +1,14 @@ +-- request_id is part of the PK so concurrent claims by different requests on the +-- same URI coexist as distinct rows. Same-request retry → PK conflict (no-op via +-- INSERT IGNORE). Different-request collision → distinct row, surfaced by +-- FindOverlapping. Queue leads the PK so queue-scoped lookups are PK-prefix scans +-- and the table is shardable by queue. CREATE TABLE IF NOT EXISTS `change` ( uri VARCHAR(255) NOT NULL, request_id VARCHAR(255) NOT NULL, queue VARCHAR(255) NOT NULL, - metadata JSON, + metadata JSON NOT NULL, created_at BIGINT NOT NULL, updated_at BIGINT NOT NULL, - PRIMARY KEY (uri, request_id) + PRIMARY KEY (queue, uri, request_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/test/integration/extension/changestore/mysql/changestore_test.go b/test/integration/extension/changestore/mysql/changestore_test.go index 594b7425..9ee397e2 100644 --- a/test/integration/extension/changestore/mysql/changestore_test.go +++ b/test/integration/extension/changestore/mysql/changestore_test.go @@ -94,7 +94,7 @@ func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndFind_NoOverlap() { {URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, })) - got, err := s.store.FindOverlapping(s.ctx, "q", []string{"github://uber/x/pull/2/bbb"}, "q/2") + got, err := s.store.FindOverlapping(s.ctx, "q", []string{"github://uber/x/pull/2/bbb"}) require.NoError(t, err) assert.Empty(t, got) } @@ -106,7 +106,7 @@ func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndFind_Overlap() { {URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, })) - got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}, "q/2") + got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}) require.NoError(t, err) require.Len(t, got, 1) assert.Equal(t, "q/1", got[0].RequestID) @@ -114,16 +114,18 @@ func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndFind_Overlap() { assert.Equal(t, "q", got[0].Queue) } -func (s *MySQLChangeStoreIntegrationSuite) TestFindOverlapping_ExcludesSelf() { +func (s *MySQLChangeStoreIntegrationSuite) TestFindOverlapping_ReturnsAllOwners() { + // The store does not exclude any specific request_id; callers filter self if they wish. t := s.T() uri := "github://uber/x/pull/1/aaa" require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ {URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, })) - got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}, "q/1") + got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}) require.NoError(t, err) - assert.Empty(t, got, "FindOverlapping must not return rows for excludeRequestID") + require.Len(t, got, 1, "store returns the row even when caller might consider it self") + assert.Equal(t, "q/1", got[0].RequestID) } func (s *MySQLChangeStoreIntegrationSuite) TestFindOverlapping_QueueScoped() { @@ -133,7 +135,7 @@ func (s *MySQLChangeStoreIntegrationSuite) TestFindOverlapping_QueueScoped() { {URI: uri, RequestID: "qA/1", Queue: "qA", CreatedAt: 1, UpdatedAt: 1}, })) - got, err := s.store.FindOverlapping(s.ctx, "qB", []string{uri}, "qB/1") + got, err := s.store.FindOverlapping(s.ctx, "qB", []string{uri}) require.NoError(t, err) assert.Empty(t, got, "FindOverlapping must not return rows from a different queue") } @@ -158,7 +160,7 @@ func (s *MySQLChangeStoreIntegrationSuite) TestCreate_DifferentRequestSameURI() {URI: uri, RequestID: "q/2", Queue: "q", CreatedAt: 2, UpdatedAt: 2}, })) - got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}, "q/3") + got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}) require.NoError(t, err) require.Len(t, got, 2) @@ -175,12 +177,27 @@ func (s *MySQLChangeStoreIntegrationSuite) TestCreate_PreservesMetadata() { {URI: uri, RequestID: "q/1", Queue: "q", Metadata: meta, CreatedAt: 1, UpdatedAt: 1}, })) - got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}, "q/other") + got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}) require.NoError(t, err) require.Len(t, got, 1) assert.JSONEq(t, meta, got[0].Metadata) } +func (s *MySQLChangeStoreIntegrationSuite) TestCreate_EmptyMetadataStoredAsObject() { + // metadata is NOT NULL in the schema. The impl substitutes '{}' for an empty + // Metadata field so callers don't need to know about the column constraint. + t := s.T() + uri := "github://uber/x/pull/1/aaa" + require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ + {URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, + })) + + got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}) + require.NoError(t, err) + require.Len(t, got, 1) + assert.JSONEq(t, "{}", got[0].Metadata) +} + func (s *MySQLChangeStoreIntegrationSuite) TestCreate_EmptyIsNoOp() { t := s.T() require.NoError(t, s.store.Create(s.ctx, nil)) @@ -196,7 +213,7 @@ func (s *MySQLChangeStoreIntegrationSuite) TestFindOverlapping_EmptyURIsIsNoOp() {URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, })) - got, err := s.store.FindOverlapping(s.ctx, "q", nil, "q/2") + got, err := s.store.FindOverlapping(s.ctx, "q", nil) require.NoError(t, err) assert.Empty(t, got) } From 17ff35e8f43e58ed057db6b748f836fd752c88fd Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Thu, 7 May 2026 23:34:23 -0700 Subject: [PATCH 4/6] reshape ChangeStore API to per-record / per-URI for backend portability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous interface assumed a SQL-style backend that gives batch atomicity and multi-key WHERE IN queries for free, which over-constrains alternatives like DynamoDB or Bigtable. Move to single-record, single-URI primitives that any backend can implement cheaply; callers loop when they have multiple URIs (typically 1-5 per request). - Create(record ChangeRecord) — was Create([]ChangeRecord). No batch atomicity contract; same-PK conflict is INSERT IGNORE on the mysql impl, callers retry whole loops to converge. - GetByURI(queue, uri) — was FindOverlapping(queue, []uris). Returns every ChangeRecord for the (queue, uri) pair; caller loops over its URIs and filters by RequestID for self-exclusion. - README and integration tests updated for the new shape. --- extension/changestore/README.md | 10 ++- extension/changestore/change_store.go | 47 +++++------ .../changestore/mock/change_store_mock.go | 20 ++--- extension/changestore/mysql/change_store.go | 79 ++++++------------ .../changestore/mysql/changestore_test.go | 82 ++++++++----------- 5 files changed, 91 insertions(+), 147 deletions(-) diff --git a/extension/changestore/README.md b/extension/changestore/README.md index 68509301..0b4d1bef 100644 --- a/extension/changestore/README.md +++ b/extension/changestore/README.md @@ -2,16 +2,18 @@ Vendor-agnostic interface for tracking per-URI claims by in-flight land requests. -Each record asserts that a specific URI (e.g., a GitHub PR) was claimed by a specific request, scoped to a queue. The store is consulted by the orchestrator's `start` controller to detect duplicate requests — submissions whose URIs overlap with another in-flight request's URIs in the same queue. +Each record asserts that a specific URI (e.g., a GitHub PR) was claimed by a specific request, scoped to a queue. The store is read by the orchestrator's `validate` controller to detect duplicate requests — submissions whose URIs overlap with another in-flight request's URIs in the same queue. + +The interface is intentionally per-record / per-URI so any backend (SQL, DynamoDB, Bigtable, …) can implement it without needing batch atomicity or multi-key query support. Callers that have multiple URIs to claim or check loop over them; the typical request has a small number of URIs (a single PR or a short stack), so the loop overhead is negligible. ## Semantics - **Identity is immutable.** A record is keyed by `(Queue, URI, RequestID)`; once written, that triple is never mutated. - **Queue leads the key.** Backends should make `Queue` the leading column of the primary key (or partition key, in shardable stores). All reads are queue-scoped, so this turns lookups into PK-prefix scans and keeps the table shardable. -- **`RequestID` in the key is intentional.** Concurrent claims by different requests on the same URI coexist as distinct rows. Same-request retries collide on the PK and are absorbed idempotently; cross-request collisions show up as additional rows that callers detect via `FindOverlapping`. +- **`RequestID` in the key is intentional.** Concurrent claims by different requests on the same URI coexist as distinct rows. Same-request retries collide on the PK and are absorbed idempotently; cross-request collisions show up as additional rows that callers detect via `GetByURI`. - **Metadata is required and mutable.** The `Metadata` field is JSON. The store treats `'{}'` as the canonical "no metadata yet" value — callers that pass an empty Go string get `'{}'` written. Downstream enrichment can update it; `UpdatedAt` reflects the last update. -- **Idempotent writes, atomic batches.** `Create` ignores primary-key conflicts so queue-redelivery of the same request is a safe no-op. The whole batch is one underlying multi-row INSERT — partial success is not exposed. -- **No filtering at the store layer.** `FindOverlapping` returns every matching row, including ones owned by the caller's own request. Callers that want to skip self filter the result by `RequestID` themselves. Liveness is also the caller's job — consult `RequestStore` to skip terminal owners. The store boundary is intentionally one query, one table, no joins. +- **Per-record writes, idempotent.** `Create` writes a single record. A primary-key conflict is silently ignored, which makes queue-redelivery of the same request a safe no-op. There is no batch atomicity in the contract — callers with multiple URIs loop and rely on idempotency to converge under partial failure / retry. +- **Per-URI reads, no filtering.** `GetByURI` returns every record for a given `(queue, uri)`. The store does not filter by `request_id` or by the owning request's state. Callers that want to skip self filter by `RequestID`; callers that want only live owners consult `RequestStore` for liveness. - **Append-only by design.** Records are not deleted when the owning request reaches a terminal state; the historical claim is preserved for audit. Duplicate detection filters terminals out at query time via the controller-side liveness check. ## Implementing a Backend diff --git a/extension/changestore/change_store.go b/extension/changestore/change_store.go index aa2b06e7..6b531170 100644 --- a/extension/changestore/change_store.go +++ b/extension/changestore/change_store.go @@ -24,35 +24,28 @@ import ( // ChangeStore manages per-URI claim records for in-flight land requests. // Each row records that a specific URI was claimed by a specific request, scoped to a queue. -// The (URI, RequestID) pair is the immutable identity of a record. Metadata may evolve over time. +// The (Queue, URI, RequestID) triple is the immutable identity of a record. Metadata may +// evolve over time. // -// The store is the source of truth for "which URIs are or were associated with which requests". -// Liveness of an owning request is NOT tracked here — callers must consult RequestStore separately -// to determine whether an owner is in a terminal state. +// The interface is intentionally per-record / per-URI so that any backend (SQL, DynamoDB, +// Bigtable, …) can implement it without needing batch-atomicity or multi-key query support. +// Callers loop when they have multiple URIs to claim or check; the typical request has a +// small number of URIs (a single PR or a short stack), so the loop overhead is negligible. type ChangeStore interface { - // Create persists a batch of ChangeRecords as a single atomic operation: either all - // records are written or none are. The batch corresponds to one underlying multi-row - // INSERT, so partial success is never observable. - // - // Primary-key conflicts on (queue, uri, request_id) are silently ignored, which makes - // the call idempotent under queue redeliveries of the same request. Records belonging - // to different requests do not conflict on the PK — cross-request overlap is detected - // by FindOverlapping, not by Create. - Create(ctx context.Context, records []entity.ChangeRecord) error + // Create persists a single ChangeRecord. A primary-key conflict on + // (Queue, URI, RequestID) is silently ignored, which makes the call + // idempotent under queue redeliveries of the same request. Records belonging + // to different requests do not conflict on the PK — cross-request overlap + // is detected by GetByURI, not by Create. + Create(ctx context.Context, record entity.ChangeRecord) error - // FindOverlapping returns ChangeRecords whose URI is in the given set, scoped to queue. - // Returns an empty slice when there is no overlap. - // - // The store does NOT exclude any specific request_id — if the caller wants to skip - // rows belonging to its own in-flight request (the common case when checking for - // duplicates of a freshly-claimed request), it should filter the returned records by - // RequestID itself. + // GetByURI returns every ChangeRecord for the given (queue, uri). Multiple + // requests can have claimed the same URI over time, so the slice may have + // any number of entries; an empty slice means no claim has ever been + // recorded for this URI in this queue. // - // Liveness of the returned records' owning requests is also NOT filtered here — the - // caller is responsible for consulting RequestStore to skip terminal owners. - FindOverlapping( - ctx context.Context, - queue string, - uris []string, - ) ([]entity.ChangeRecord, error) + // The store does not filter by request_id or by the owning request's + // state — callers that want to skip self filter by RequestID, and callers + // that want only live owners consult RequestStore for liveness. + GetByURI(ctx context.Context, queue string, uri string) ([]entity.ChangeRecord, error) } diff --git a/extension/changestore/mock/change_store_mock.go b/extension/changestore/mock/change_store_mock.go index e2389f53..8308824a 100644 --- a/extension/changestore/mock/change_store_mock.go +++ b/extension/changestore/mock/change_store_mock.go @@ -42,30 +42,30 @@ func (m *MockChangeStore) EXPECT() *MockChangeStoreMockRecorder { } // Create mocks base method. -func (m *MockChangeStore) Create(ctx context.Context, records []entity.ChangeRecord) error { +func (m *MockChangeStore) Create(ctx context.Context, record entity.ChangeRecord) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Create", ctx, records) + ret := m.ctrl.Call(m, "Create", ctx, record) ret0, _ := ret[0].(error) return ret0 } // Create indicates an expected call of Create. -func (mr *MockChangeStoreMockRecorder) Create(ctx, records any) *gomock.Call { +func (mr *MockChangeStoreMockRecorder) Create(ctx, record any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockChangeStore)(nil).Create), ctx, records) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockChangeStore)(nil).Create), ctx, record) } -// FindOverlapping mocks base method. -func (m *MockChangeStore) FindOverlapping(ctx context.Context, queue string, uris []string) ([]entity.ChangeRecord, error) { +// GetByURI mocks base method. +func (m *MockChangeStore) GetByURI(ctx context.Context, queue, uri string) ([]entity.ChangeRecord, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FindOverlapping", ctx, queue, uris) + ret := m.ctrl.Call(m, "GetByURI", ctx, queue, uri) ret0, _ := ret[0].([]entity.ChangeRecord) ret1, _ := ret[1].(error) return ret0, ret1 } -// FindOverlapping indicates an expected call of FindOverlapping. -func (mr *MockChangeStoreMockRecorder) FindOverlapping(ctx, queue, uris any) *gomock.Call { +// GetByURI indicates an expected call of GetByURI. +func (mr *MockChangeStoreMockRecorder) GetByURI(ctx, queue, uri any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindOverlapping", reflect.TypeOf((*MockChangeStore)(nil).FindOverlapping), ctx, queue, uris) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetByURI", reflect.TypeOf((*MockChangeStore)(nil).GetByURI), ctx, queue, uri) } diff --git a/extension/changestore/mysql/change_store.go b/extension/changestore/mysql/change_store.go index 2a2e1315..098b3e1e 100644 --- a/extension/changestore/mysql/change_store.go +++ b/extension/changestore/mysql/change_store.go @@ -18,7 +18,6 @@ import ( "context" "database/sql" "fmt" - "strings" "github.com/uber-go/tally/v4" @@ -37,71 +36,39 @@ func NewChangeStore(db *sql.DB, scope tally.Scope) changestore.ChangeStore { return &changeStore{db: db, scope: scope} } -// Create inserts a batch of ChangeRecords as a single multi-row INSERT IGNORE. -// Primary-key conflicts on (queue, uri, request_id) are silently ignored so -// queue-redelivery of the same request is a no-op. The whole batch is one -// statement, so partial success is not observable. -func (s *changeStore) Create(ctx context.Context, records []entity.ChangeRecord) (retErr error) { +// Create inserts a single ChangeRecord. A primary-key conflict on +// (queue, uri, request_id) is silently ignored via INSERT IGNORE, so +// queue-redelivery of the same request is a no-op. +func (s *changeStore) Create(ctx context.Context, record entity.ChangeRecord) (retErr error) { op := metrics.Begin(s.scope, "create") defer func() { op.Complete(retErr) }() - if len(records) == 0 { - return nil + // Use the empty JSON object as the canonical "no metadata yet" value. + // metadata is NOT NULL in the schema and the JSON column type rejects an empty string. + metadata := record.Metadata + if metadata == "" { + metadata = "{}" } - const cols = 6 - placeholders := strings.Repeat("(?, ?, ?, ?, ?, ?), ", len(records)) - placeholders = placeholders[:len(placeholders)-2] // trim trailing ", " - - args := make([]any, 0, len(records)*cols) - for _, r := range records { - // Use the empty JSON object as the canonical "no metadata yet" value. - // metadata is NOT NULL in the schema, and an empty Go string would be - // rejected by the JSON column type. - metadata := r.Metadata - if metadata == "" { - metadata = "{}" - } - args = append(args, r.URI, r.RequestID, r.Queue, metadata, r.CreatedAt, r.UpdatedAt) - } - - query := "INSERT IGNORE INTO `change` (uri, request_id, queue, metadata, created_at, updated_at) VALUES " + placeholders - if _, err := s.db.ExecContext(ctx, query, args...); err != nil { - return fmt.Errorf("failed to insert change records (count=%d): %w", len(records), err) + const query = "INSERT IGNORE INTO `change` (uri, request_id, queue, metadata, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)" + if _, err := s.db.ExecContext(ctx, query, + record.URI, record.RequestID, record.Queue, metadata, record.CreatedAt, record.UpdatedAt, + ); err != nil { + return fmt.Errorf("failed to insert change record uri=%s request_id=%s: %w", record.URI, record.RequestID, err) } return nil } -// FindOverlapping returns ChangeRecords whose uri is in the given set, scoped to queue. -// The store does not filter by request_id; callers that want to skip self should do so -// after the call. Liveness checks against the request store are also the caller's job. -func (s *changeStore) FindOverlapping( - ctx context.Context, - queue string, - uris []string, -) (ret []entity.ChangeRecord, retErr error) { - op := metrics.Begin(s.scope, "find_overlapping") +// GetByURI returns every ChangeRecord for (queue, uri). queue leads the WHERE +// clause to align with the (queue, uri, request_id) PK so this is a PK-prefix scan. +func (s *changeStore) GetByURI(ctx context.Context, queue string, uri string) (ret []entity.ChangeRecord, retErr error) { + op := metrics.Begin(s.scope, "get_by_uri") defer func() { op.Complete(retErr) }() - if len(uris) == 0 { - return nil, nil - } - - uriPlaceholders := "?" + strings.Repeat(", ?", len(uris)-1) - // queue leads the WHERE clause to align with the (queue, uri, request_id) PK, - // so this is a PK-prefix scan. - query := "SELECT uri, request_id, queue, metadata, created_at, updated_at FROM `change` " + - "WHERE queue = ? AND uri IN (" + uriPlaceholders + ")" - - args := make([]any, 0, 1+len(uris)) - args = append(args, queue) - for _, u := range uris { - args = append(args, u) - } - - rows, err := s.db.QueryContext(ctx, query, args...) + const query = "SELECT uri, request_id, queue, metadata, created_at, updated_at FROM `change` WHERE queue = ? AND uri = ?" + rows, err := s.db.QueryContext(ctx, query, queue, uri) if err != nil { - return nil, fmt.Errorf("failed to query overlapping changes for queue=%s: %w", queue, err) + return nil, fmt.Errorf("failed to query change records for queue=%s uri=%s: %w", queue, uri, err) } defer rows.Close() @@ -109,12 +76,12 @@ func (s *changeStore) FindOverlapping( for rows.Next() { var rec entity.ChangeRecord if err := rows.Scan(&rec.URI, &rec.RequestID, &rec.Queue, &rec.Metadata, &rec.CreatedAt, &rec.UpdatedAt); err != nil { - return nil, fmt.Errorf("failed to scan change record for queue=%s: %w", queue, err) + return nil, fmt.Errorf("failed to scan change record for queue=%s uri=%s: %w", queue, uri, err) } results = append(results, rec) } if err := rows.Err(); err != nil { - return nil, fmt.Errorf("failed to iterate change records for queue=%s: %w", queue, err) + return nil, fmt.Errorf("failed to iterate change records for queue=%s uri=%s: %w", queue, uri, err) } return results, nil } diff --git a/test/integration/extension/changestore/mysql/changestore_test.go b/test/integration/extension/changestore/mysql/changestore_test.go index 9ee397e2..13bd5568 100644 --- a/test/integration/extension/changestore/mysql/changestore_test.go +++ b/test/integration/extension/changestore/mysql/changestore_test.go @@ -88,25 +88,25 @@ func (s *MySQLChangeStoreIntegrationSuite) SetupTest() { require.NoError(s.T(), err) } -func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndFind_NoOverlap() { +func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndGet_NoMatch() { t := s.T() - require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ - {URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, })) - got, err := s.store.FindOverlapping(s.ctx, "q", []string{"github://uber/x/pull/2/bbb"}) + got, err := s.store.GetByURI(s.ctx, "q", "github://uber/x/pull/2/bbb") require.NoError(t, err) assert.Empty(t, got) } -func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndFind_Overlap() { +func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndGet_Match() { t := s.T() uri := "github://uber/x/pull/1/aaa" - require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ - {URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, })) - got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}) + got, err := s.store.GetByURI(s.ctx, "q", uri) require.NoError(t, err) require.Len(t, got, 1) assert.Equal(t, "q/1", got[0].RequestID) @@ -114,38 +114,38 @@ func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndFind_Overlap() { assert.Equal(t, "q", got[0].Queue) } -func (s *MySQLChangeStoreIntegrationSuite) TestFindOverlapping_ReturnsAllOwners() { - // The store does not exclude any specific request_id; callers filter self if they wish. +func (s *MySQLChangeStoreIntegrationSuite) TestGetByURI_DoesNotExcludeSelf() { + // The store does not filter by request_id; callers filter self if they wish. t := s.T() uri := "github://uber/x/pull/1/aaa" - require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ - {URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, })) - got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}) + got, err := s.store.GetByURI(s.ctx, "q", uri) require.NoError(t, err) require.Len(t, got, 1, "store returns the row even when caller might consider it self") assert.Equal(t, "q/1", got[0].RequestID) } -func (s *MySQLChangeStoreIntegrationSuite) TestFindOverlapping_QueueScoped() { +func (s *MySQLChangeStoreIntegrationSuite) TestGetByURI_QueueScoped() { t := s.T() uri := "github://uber/x/pull/1/aaa" - require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ - {URI: uri, RequestID: "qA/1", Queue: "qA", CreatedAt: 1, UpdatedAt: 1}, + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: uri, RequestID: "qA/1", Queue: "qA", CreatedAt: 1, UpdatedAt: 1, })) - got, err := s.store.FindOverlapping(s.ctx, "qB", []string{uri}) + got, err := s.store.GetByURI(s.ctx, "qB", uri) require.NoError(t, err) - assert.Empty(t, got, "FindOverlapping must not return rows from a different queue") + assert.Empty(t, got, "GetByURI must not return rows from a different queue") } func (s *MySQLChangeStoreIntegrationSuite) TestCreate_Idempotent() { t := s.T() rec := entity.ChangeRecord{URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1} - require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{rec})) - require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{rec}), "second insert with same PK must succeed (INSERT IGNORE)") + require.NoError(t, s.store.Create(s.ctx, rec)) + require.NoError(t, s.store.Create(s.ctx, rec), "second insert with same PK must succeed (INSERT IGNORE)") var count int require.NoError(t, s.db.QueryRowContext(s.ctx, "SELECT COUNT(*) FROM `change`").Scan(&count)) @@ -155,12 +155,14 @@ func (s *MySQLChangeStoreIntegrationSuite) TestCreate_Idempotent() { func (s *MySQLChangeStoreIntegrationSuite) TestCreate_DifferentRequestSameURI() { t := s.T() uri := "github://uber/x/pull/1/aaa" - require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ - {URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, - {URI: uri, RequestID: "q/2", Queue: "q", CreatedAt: 2, UpdatedAt: 2}, + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, + })) + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: uri, RequestID: "q/2", Queue: "q", CreatedAt: 2, UpdatedAt: 2, })) - got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}) + got, err := s.store.GetByURI(s.ctx, "q", uri) require.NoError(t, err) require.Len(t, got, 2) @@ -173,11 +175,11 @@ func (s *MySQLChangeStoreIntegrationSuite) TestCreate_PreservesMetadata() { t := s.T() const meta = `{"title":"add new feature"}` uri := "github://uber/x/pull/1/aaa" - require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ - {URI: uri, RequestID: "q/1", Queue: "q", Metadata: meta, CreatedAt: 1, UpdatedAt: 1}, + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: uri, RequestID: "q/1", Queue: "q", Metadata: meta, CreatedAt: 1, UpdatedAt: 1, })) - got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}) + got, err := s.store.GetByURI(s.ctx, "q", uri) require.NoError(t, err) require.Len(t, got, 1) assert.JSONEq(t, meta, got[0].Metadata) @@ -188,32 +190,12 @@ func (s *MySQLChangeStoreIntegrationSuite) TestCreate_EmptyMetadataStoredAsObjec // Metadata field so callers don't need to know about the column constraint. t := s.T() uri := "github://uber/x/pull/1/aaa" - require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ - {URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, })) - got, err := s.store.FindOverlapping(s.ctx, "q", []string{uri}) + got, err := s.store.GetByURI(s.ctx, "q", uri) require.NoError(t, err) require.Len(t, got, 1) assert.JSONEq(t, "{}", got[0].Metadata) } - -func (s *MySQLChangeStoreIntegrationSuite) TestCreate_EmptyIsNoOp() { - t := s.T() - require.NoError(t, s.store.Create(s.ctx, nil)) - - var count int - require.NoError(t, s.db.QueryRowContext(s.ctx, "SELECT COUNT(*) FROM `change`").Scan(&count)) - assert.Zero(t, count) -} - -func (s *MySQLChangeStoreIntegrationSuite) TestFindOverlapping_EmptyURIsIsNoOp() { - t := s.T() - require.NoError(t, s.store.Create(s.ctx, []entity.ChangeRecord{ - {URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1}, - })) - - got, err := s.store.FindOverlapping(s.ctx, "q", nil) - require.NoError(t, err) - assert.Empty(t, got) -} From 786a151ddc77f3b1f908d75666ca19ad2e389067 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Thu, 7 May 2026 23:38:59 -0700 Subject: [PATCH 5/6] CLAUDE.md: add backend-agnosticism guideline for extensions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Captures the design lesson from PR #152: interfaces should be shaped for the technology space (SQL, KV, document, queues, RPC, …), not for the one implementation we're adding. Lists common over-constraints to avoid (batch atomicity, multi-key queries, server-side filters, cross-entity transactions, strict ordering / exactly-once, sync low-latency assumptions) and proposes the test: 'could DynamoDB / Kafka / Bigtable / a remote RPC / an in-memory map satisfy this signature without contortion?' --- CLAUDE.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/CLAUDE.md b/CLAUDE.md index b34a6847..692d815b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -106,6 +106,20 @@ Vendor-agnostic, pluggable interfaces with implementations in subdirectories: 2. Implementations at `extension/{ext}/{impl}/` 3. Factory interface for dependency injection and lifecycle management +**Design interfaces for the technology *space*, not the implementation in front of you.** The interface is a contract every backend will have to satisfy — SQL, key-value (DynamoDB, Bigtable), document, message queue, search, RPC, in-memory, mocks. If the contract assumes a capability that some plausible backend can't provide cheaply, you've baked the current impl's strengths into the API. + +Common over-constraints to avoid: +- **Batch atomicity** (multi-row inserts as one transaction) — many KV stores can't do this. Prefer single-record primitives + caller loops + idempotency-on-retry. +- **Multi-key queries** (`WHERE x IN (...)`) — fine in SQL, awkward elsewhere. Prefer per-key reads. +- **Server-side filters** (joins, sub-queries, complex predicates) — push filtering and aggregation to the caller; keep the store responsible only for "get/put by key" semantics. +- **Transactions across entities** — virtually no distributed store offers this. Use eventual consistency + idempotency. +- **Strict ordering / exactly-once** in messaging — most queues are at-least-once with best-effort ordering. Make consumers idempotent. +- **Synchronous, low-latency calls** for things that may run remotely — design for retry/backoff and timeouts, not assumed-fast. + +The cost of "callers loop over a small batch" is usually negligible. The cost of forcing every future backend to fake a capability the API demanded is permanent. + +When in doubt, ask: *"If the next implementation were DynamoDB / Kafka / Bigtable / a remote RPC service / an in-memory map, could it satisfy this signature without contortion?"* If the answer is no, simplify the contract. + ### Import Paths - RPC Controllers: `github.com/uber/submitqueue/{service}/controller` From 21cb6053094926bbcdf7b01b8d618958a170fd50 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Fri, 8 May 2026 00:00:07 -0700 Subject: [PATCH 6/6] add Version field to ChangeRecord for optimistic locking on metadata MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the request-store pattern (RequestStore.UpdateState takes oldVersion + newVersion). The schema gains 'version INT NOT NULL'; ChangeRecord gains 'Version int32'; mysql impl writes/reads it. No UpdateMetadata method yet — added when the first metadata-enrichment writer arrives. --- entity/change_record.go | 6 ++++++ extension/changestore/README.md | 1 + extension/changestore/mysql/change_store.go | 8 ++++---- extension/changestore/mysql/schema/change.sql | 1 + .../changestore/mysql/changestore_test.go | 19 ++++++++++--------- 5 files changed, 22 insertions(+), 13 deletions(-) diff --git a/entity/change_record.go b/entity/change_record.go index c5f1c28f..34997a78 100644 --- a/entity/change_record.go +++ b/entity/change_record.go @@ -48,4 +48,10 @@ type ChangeRecord struct { // UpdatedAt is the Unix milliseconds timestamp when this record's Metadata was last updated. // Equal to CreatedAt when the record has never been updated. UpdatedAt int64 `json:"updated_at"` + + // Version is the optimistic-locking counter for mutable fields (Metadata). + // Starts at 1 on Create and is incremented by callers on every update. + // Mirrors the request-store convention: callers compute newVersion = oldVersion + 1 + // and pass both to the update method; the store performs a pure conditional write. + Version int32 `json:"version"` } diff --git a/extension/changestore/README.md b/extension/changestore/README.md index 0b4d1bef..0d4d2aab 100644 --- a/extension/changestore/README.md +++ b/extension/changestore/README.md @@ -14,6 +14,7 @@ The interface is intentionally per-record / per-URI so any backend (SQL, DynamoD - **Metadata is required and mutable.** The `Metadata` field is JSON. The store treats `'{}'` as the canonical "no metadata yet" value — callers that pass an empty Go string get `'{}'` written. Downstream enrichment can update it; `UpdatedAt` reflects the last update. - **Per-record writes, idempotent.** `Create` writes a single record. A primary-key conflict is silently ignored, which makes queue-redelivery of the same request a safe no-op. There is no batch atomicity in the contract — callers with multiple URIs loop and rely on idempotency to converge under partial failure / retry. - **Per-URI reads, no filtering.** `GetByURI` returns every record for a given `(queue, uri)`. The store does not filter by `request_id` or by the owning request's state. Callers that want to skip self filter by `RequestID`; callers that want only live owners consult `RequestStore` for liveness. +- **Versioned for safe metadata updates.** Each record carries a `Version` integer (starts at 1). Future `UpdateMetadata` operations follow the same caller-owned-arithmetic + conditional-write pattern as `RequestStore.UpdateState` — the caller passes `oldVersion` and `newVersion`, and the store performs a pure conditional write. - **Append-only by design.** Records are not deleted when the owning request reaches a terminal state; the historical claim is preserved for audit. Duplicate detection filters terminals out at query time via the controller-side liveness check. ## Implementing a Backend diff --git a/extension/changestore/mysql/change_store.go b/extension/changestore/mysql/change_store.go index 098b3e1e..929f527f 100644 --- a/extension/changestore/mysql/change_store.go +++ b/extension/changestore/mysql/change_store.go @@ -50,9 +50,9 @@ func (s *changeStore) Create(ctx context.Context, record entity.ChangeRecord) (r metadata = "{}" } - const query = "INSERT IGNORE INTO `change` (uri, request_id, queue, metadata, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)" + const query = "INSERT IGNORE INTO `change` (uri, request_id, queue, metadata, created_at, updated_at, version) VALUES (?, ?, ?, ?, ?, ?, ?)" if _, err := s.db.ExecContext(ctx, query, - record.URI, record.RequestID, record.Queue, metadata, record.CreatedAt, record.UpdatedAt, + record.URI, record.RequestID, record.Queue, metadata, record.CreatedAt, record.UpdatedAt, record.Version, ); err != nil { return fmt.Errorf("failed to insert change record uri=%s request_id=%s: %w", record.URI, record.RequestID, err) } @@ -65,7 +65,7 @@ func (s *changeStore) GetByURI(ctx context.Context, queue string, uri string) (r op := metrics.Begin(s.scope, "get_by_uri") defer func() { op.Complete(retErr) }() - const query = "SELECT uri, request_id, queue, metadata, created_at, updated_at FROM `change` WHERE queue = ? AND uri = ?" + const query = "SELECT uri, request_id, queue, metadata, created_at, updated_at, version FROM `change` WHERE queue = ? AND uri = ?" rows, err := s.db.QueryContext(ctx, query, queue, uri) if err != nil { return nil, fmt.Errorf("failed to query change records for queue=%s uri=%s: %w", queue, uri, err) @@ -75,7 +75,7 @@ func (s *changeStore) GetByURI(ctx context.Context, queue string, uri string) (r var results []entity.ChangeRecord for rows.Next() { var rec entity.ChangeRecord - if err := rows.Scan(&rec.URI, &rec.RequestID, &rec.Queue, &rec.Metadata, &rec.CreatedAt, &rec.UpdatedAt); err != nil { + if err := rows.Scan(&rec.URI, &rec.RequestID, &rec.Queue, &rec.Metadata, &rec.CreatedAt, &rec.UpdatedAt, &rec.Version); err != nil { return nil, fmt.Errorf("failed to scan change record for queue=%s uri=%s: %w", queue, uri, err) } results = append(results, rec) diff --git a/extension/changestore/mysql/schema/change.sql b/extension/changestore/mysql/schema/change.sql index 757ad142..9492402f 100644 --- a/extension/changestore/mysql/schema/change.sql +++ b/extension/changestore/mysql/schema/change.sql @@ -10,5 +10,6 @@ CREATE TABLE IF NOT EXISTS `change` ( metadata JSON NOT NULL, created_at BIGINT NOT NULL, updated_at BIGINT NOT NULL, + version INT NOT NULL, PRIMARY KEY (queue, uri, request_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/test/integration/extension/changestore/mysql/changestore_test.go b/test/integration/extension/changestore/mysql/changestore_test.go index 13bd5568..25c67199 100644 --- a/test/integration/extension/changestore/mysql/changestore_test.go +++ b/test/integration/extension/changestore/mysql/changestore_test.go @@ -91,7 +91,7 @@ func (s *MySQLChangeStoreIntegrationSuite) SetupTest() { func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndGet_NoMatch() { t := s.T() require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, + URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, })) got, err := s.store.GetByURI(s.ctx, "q", "github://uber/x/pull/2/bbb") @@ -103,7 +103,7 @@ func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndGet_Match() { t := s.T() uri := "github://uber/x/pull/1/aaa" require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, + URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, })) got, err := s.store.GetByURI(s.ctx, "q", uri) @@ -112,6 +112,7 @@ func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndGet_Match() { assert.Equal(t, "q/1", got[0].RequestID) assert.Equal(t, uri, got[0].URI) assert.Equal(t, "q", got[0].Queue) + assert.Equal(t, int32(1), got[0].Version) } func (s *MySQLChangeStoreIntegrationSuite) TestGetByURI_DoesNotExcludeSelf() { @@ -119,7 +120,7 @@ func (s *MySQLChangeStoreIntegrationSuite) TestGetByURI_DoesNotExcludeSelf() { t := s.T() uri := "github://uber/x/pull/1/aaa" require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, + URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, })) got, err := s.store.GetByURI(s.ctx, "q", uri) @@ -132,7 +133,7 @@ func (s *MySQLChangeStoreIntegrationSuite) TestGetByURI_QueueScoped() { t := s.T() uri := "github://uber/x/pull/1/aaa" require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: uri, RequestID: "qA/1", Queue: "qA", CreatedAt: 1, UpdatedAt: 1, + URI: uri, RequestID: "qA/1", Queue: "qA", CreatedAt: 1, UpdatedAt: 1, Version: 1, })) got, err := s.store.GetByURI(s.ctx, "qB", uri) @@ -142,7 +143,7 @@ func (s *MySQLChangeStoreIntegrationSuite) TestGetByURI_QueueScoped() { func (s *MySQLChangeStoreIntegrationSuite) TestCreate_Idempotent() { t := s.T() - rec := entity.ChangeRecord{URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1} + rec := entity.ChangeRecord{URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1} require.NoError(t, s.store.Create(s.ctx, rec)) require.NoError(t, s.store.Create(s.ctx, rec), "second insert with same PK must succeed (INSERT IGNORE)") @@ -156,10 +157,10 @@ func (s *MySQLChangeStoreIntegrationSuite) TestCreate_DifferentRequestSameURI() t := s.T() uri := "github://uber/x/pull/1/aaa" require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, + URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, })) require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: uri, RequestID: "q/2", Queue: "q", CreatedAt: 2, UpdatedAt: 2, + URI: uri, RequestID: "q/2", Queue: "q", CreatedAt: 2, UpdatedAt: 2, Version: 1, })) got, err := s.store.GetByURI(s.ctx, "q", uri) @@ -176,7 +177,7 @@ func (s *MySQLChangeStoreIntegrationSuite) TestCreate_PreservesMetadata() { const meta = `{"title":"add new feature"}` uri := "github://uber/x/pull/1/aaa" require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: uri, RequestID: "q/1", Queue: "q", Metadata: meta, CreatedAt: 1, UpdatedAt: 1, + URI: uri, RequestID: "q/1", Queue: "q", Metadata: meta, CreatedAt: 1, UpdatedAt: 1, Version: 1, })) got, err := s.store.GetByURI(s.ctx, "q", uri) @@ -191,7 +192,7 @@ func (s *MySQLChangeStoreIntegrationSuite) TestCreate_EmptyMetadataStoredAsObjec t := s.T() uri := "github://uber/x/pull/1/aaa" require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, + URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, })) got, err := s.store.GetByURI(s.ctx, "q", uri)