diff --git a/CLAUDE.md b/CLAUDE.md index b34a6847..692d815b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -106,6 +106,20 @@ Vendor-agnostic, pluggable interfaces with implementations in subdirectories: 2. Implementations at `extension/{ext}/{impl}/` 3. Factory interface for dependency injection and lifecycle management +**Design interfaces for the technology *space*, not the implementation in front of you.** The interface is a contract every backend will have to satisfy — SQL, key-value (DynamoDB, Bigtable), document, message queue, search, RPC, in-memory, mocks. If the contract assumes a capability that some plausible backend can't provide cheaply, you've baked the current impl's strengths into the API. + +Common over-constraints to avoid: +- **Batch atomicity** (multi-row inserts as one transaction) — many KV stores can't do this. Prefer single-record primitives + caller loops + idempotency-on-retry. +- **Multi-key queries** (`WHERE x IN (...)`) — fine in SQL, awkward elsewhere. Prefer per-key reads. +- **Server-side filters** (joins, sub-queries, complex predicates) — push filtering and aggregation to the caller; keep the store responsible only for "get/put by key" semantics. +- **Transactions across entities** — virtually no distributed store offers this. Use eventual consistency + idempotency. +- **Strict ordering / exactly-once** in messaging — most queues are at-least-once with best-effort ordering. Make consumers idempotent. +- **Synchronous, low-latency calls** for things that may run remotely — design for retry/backoff and timeouts, not assumed-fast. + +The cost of "callers loop over a small batch" is usually negligible. The cost of forcing every future backend to fake a capability the API demanded is permanent. + +When in doubt, ask: *"If the next implementation were DynamoDB / Kafka / Bigtable / a remote RPC service / an in-memory map, could it satisfy this signature without contortion?"* If the answer is no, simplify the contract. + ### Import Paths - RPC Controllers: `github.com/uber/submitqueue/{service}/controller` diff --git a/Makefile b/Makefile index 0a309303..b75fa5ac 100644 --- a/Makefile +++ b/Makefile @@ -274,7 +274,7 @@ local-stovepipe-stop: ## Stop Stovepipe service mocks: ## Generate mock files using mockgen @echo "Generating mocks..." - @$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/pusher/... ./extension/scorer/... ./core/consumer/... + @$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/changestore/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/scorer/... ./core/consumer/... @echo "Mocks generated successfully!" proto: ## Generate protobuf files from .proto definitions diff --git a/entity/BUILD.bazel b/entity/BUILD.bazel index 09c5a360..5231f436 100644 --- a/entity/BUILD.bazel +++ b/entity/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "batch_dependent.go", "build.go", "change_provider.go", + "change_record.go", "land_request.go", "queue_config.go", "request.go", diff --git a/entity/change_record.go b/entity/change_record.go new file mode 100644 index 00000000..34997a78 --- /dev/null +++ b/entity/change_record.go @@ -0,0 +1,57 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +// ChangeRecord represents a single URI's claim by a request, persisted in the change store. +// The (Queue, URI, RequestID) triple is the identity and is immutable; Metadata may be +// updated over time as additional information about the change (e.g., PR title, author, +// mergeability) becomes available. +type ChangeRecord struct { + // URI identifies the change (RFC 3986). Same scheme/format as entity.Change.URIs. + // Example: "github://uber/submitqueue/pull/123/abc123def". + URI string `json:"uri"` + + // RequestID is the owning land request that claimed this URI. + // Format matches entity.Request.ID: "/". + // + // RequestID participates in the change-store primary key so that concurrent claims + // by different requests on the same URI coexist as distinct rows. Same-request + // retries collide on the PK and are absorbed idempotently; different-request + // collisions surface as additional rows that callers detect via FindOverlapping. + RequestID string `json:"request_id"` + + // Queue is the queue the owning request belongs to. It is the leading column of + // the change-store primary key, so queue-scoped duplicate checks become PK-prefix + // scans and the table is shardable by queue. + Queue string `json:"queue"` + + // Metadata is a JSON-encoded blob of provider-specific information about the change + // (e.g., PR title, author, mergeable state). Stored as `'{}'` when no metadata has + // been populated yet; updated by downstream enrichment. + Metadata string `json:"metadata,omitempty"` + + // CreatedAt is the Unix milliseconds timestamp when this record was first created. + CreatedAt int64 `json:"created_at"` + + // UpdatedAt is the Unix milliseconds timestamp when this record's Metadata was last updated. + // Equal to CreatedAt when the record has never been updated. + UpdatedAt int64 `json:"updated_at"` + + // Version is the optimistic-locking counter for mutable fields (Metadata). + // Starts at 1 on Create and is incremented by callers on every update. + // Mirrors the request-store convention: callers compute newVersion = oldVersion + 1 + // and pass both to the update method; the store performs a pure conditional write. + Version int32 `json:"version"` +} diff --git a/extension/changestore/BUILD.bazel b/extension/changestore/BUILD.bazel new file mode 100644 index 00000000..a80c5ce5 --- /dev/null +++ b/extension/changestore/BUILD.bazel @@ -0,0 +1,9 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "changestore", + srcs = ["change_store.go"], + importpath = "github.com/uber/submitqueue/extension/changestore", + visibility = ["//visibility:public"], + deps = ["//entity"], +) diff --git a/extension/changestore/README.md b/extension/changestore/README.md new file mode 100644 index 00000000..0d4d2aab --- /dev/null +++ b/extension/changestore/README.md @@ -0,0 +1,24 @@ +# ChangeStore + +Vendor-agnostic interface for tracking per-URI claims by in-flight land requests. + +Each record asserts that a specific URI (e.g., a GitHub PR) was claimed by a specific request, scoped to a queue. The store is read by the orchestrator's `validate` controller to detect duplicate requests — submissions whose URIs overlap with another in-flight request's URIs in the same queue. + +The interface is intentionally per-record / per-URI so any backend (SQL, DynamoDB, Bigtable, …) can implement it without needing batch atomicity or multi-key query support. Callers that have multiple URIs to claim or check loop over them; the typical request has a small number of URIs (a single PR or a short stack), so the loop overhead is negligible. + +## Semantics + +- **Identity is immutable.** A record is keyed by `(Queue, URI, RequestID)`; once written, that triple is never mutated. +- **Queue leads the key.** Backends should make `Queue` the leading column of the primary key (or partition key, in shardable stores). All reads are queue-scoped, so this turns lookups into PK-prefix scans and keeps the table shardable. +- **`RequestID` in the key is intentional.** Concurrent claims by different requests on the same URI coexist as distinct rows. Same-request retries collide on the PK and are absorbed idempotently; cross-request collisions show up as additional rows that callers detect via `GetByURI`. +- **Metadata is required and mutable.** The `Metadata` field is JSON. The store treats `'{}'` as the canonical "no metadata yet" value — callers that pass an empty Go string get `'{}'` written. Downstream enrichment can update it; `UpdatedAt` reflects the last update. +- **Per-record writes, idempotent.** `Create` writes a single record. A primary-key conflict is silently ignored, which makes queue-redelivery of the same request a safe no-op. There is no batch atomicity in the contract — callers with multiple URIs loop and rely on idempotency to converge under partial failure / retry. +- **Per-URI reads, no filtering.** `GetByURI` returns every record for a given `(queue, uri)`. The store does not filter by `request_id` or by the owning request's state. Callers that want to skip self filter by `RequestID`; callers that want only live owners consult `RequestStore` for liveness. +- **Versioned for safe metadata updates.** Each record carries a `Version` integer (starts at 1). Future `UpdateMetadata` operations follow the same caller-owned-arithmetic + conditional-write pattern as `RequestStore.UpdateState` — the caller passes `oldVersion` and `newVersion`, and the store performs a pure conditional write. +- **Append-only by design.** Records are not deleted when the owning request reaches a terminal state; the historical claim is preserved for audit. Duplicate detection filters terminals out at query time via the controller-side liveness check. + +## Implementing a Backend + +1. Create `extension/changestore/{backend}/` directory. +2. Implement the `ChangeStore` interface. +3. Add schema files under `extension/changestore/{backend}/schema/` if the backend requires them. diff --git a/extension/changestore/change_store.go b/extension/changestore/change_store.go new file mode 100644 index 00000000..6b531170 --- /dev/null +++ b/extension/changestore/change_store.go @@ -0,0 +1,51 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package changestore + +//go:generate mockgen -source=change_store.go -destination=mock/change_store_mock.go -package=mock + +import ( + "context" + + "github.com/uber/submitqueue/entity" +) + +// ChangeStore manages per-URI claim records for in-flight land requests. +// Each row records that a specific URI was claimed by a specific request, scoped to a queue. +// The (Queue, URI, RequestID) triple is the immutable identity of a record. Metadata may +// evolve over time. +// +// The interface is intentionally per-record / per-URI so that any backend (SQL, DynamoDB, +// Bigtable, …) can implement it without needing batch-atomicity or multi-key query support. +// Callers loop when they have multiple URIs to claim or check; the typical request has a +// small number of URIs (a single PR or a short stack), so the loop overhead is negligible. +type ChangeStore interface { + // Create persists a single ChangeRecord. A primary-key conflict on + // (Queue, URI, RequestID) is silently ignored, which makes the call + // idempotent under queue redeliveries of the same request. Records belonging + // to different requests do not conflict on the PK — cross-request overlap + // is detected by GetByURI, not by Create. + Create(ctx context.Context, record entity.ChangeRecord) error + + // GetByURI returns every ChangeRecord for the given (queue, uri). Multiple + // requests can have claimed the same URI over time, so the slice may have + // any number of entries; an empty slice means no claim has ever been + // recorded for this URI in this queue. + // + // The store does not filter by request_id or by the owning request's + // state — callers that want to skip self filter by RequestID, and callers + // that want only live owners consult RequestStore for liveness. + GetByURI(ctx context.Context, queue string, uri string) ([]entity.ChangeRecord, error) +} diff --git a/extension/changestore/mock/BUILD.bazel b/extension/changestore/mock/BUILD.bazel new file mode 100644 index 00000000..6d1bd2de --- /dev/null +++ b/extension/changestore/mock/BUILD.bazel @@ -0,0 +1,12 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "mock", + srcs = ["change_store_mock.go"], + importpath = "github.com/uber/submitqueue/extension/changestore/mock", + visibility = ["//visibility:public"], + deps = [ + "//entity", + "@org_uber_go_mock//gomock", + ], +) diff --git a/extension/changestore/mock/change_store_mock.go b/extension/changestore/mock/change_store_mock.go new file mode 100644 index 00000000..8308824a --- /dev/null +++ b/extension/changestore/mock/change_store_mock.go @@ -0,0 +1,71 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: change_store.go +// +// Generated by this command: +// +// mockgen -source=change_store.go -destination=mock/change_store_mock.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + entity "github.com/uber/submitqueue/entity" + gomock "go.uber.org/mock/gomock" +) + +// MockChangeStore is a mock of ChangeStore interface. +type MockChangeStore struct { + ctrl *gomock.Controller + recorder *MockChangeStoreMockRecorder + isgomock struct{} +} + +// MockChangeStoreMockRecorder is the mock recorder for MockChangeStore. +type MockChangeStoreMockRecorder struct { + mock *MockChangeStore +} + +// NewMockChangeStore creates a new mock instance. +func NewMockChangeStore(ctrl *gomock.Controller) *MockChangeStore { + mock := &MockChangeStore{ctrl: ctrl} + mock.recorder = &MockChangeStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockChangeStore) EXPECT() *MockChangeStoreMockRecorder { + return m.recorder +} + +// Create mocks base method. +func (m *MockChangeStore) Create(ctx context.Context, record entity.ChangeRecord) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Create", ctx, record) + ret0, _ := ret[0].(error) + return ret0 +} + +// Create indicates an expected call of Create. +func (mr *MockChangeStoreMockRecorder) Create(ctx, record any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockChangeStore)(nil).Create), ctx, record) +} + +// GetByURI mocks base method. +func (m *MockChangeStore) GetByURI(ctx context.Context, queue, uri string) ([]entity.ChangeRecord, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetByURI", ctx, queue, uri) + ret0, _ := ret[0].([]entity.ChangeRecord) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetByURI indicates an expected call of GetByURI. +func (mr *MockChangeStoreMockRecorder) GetByURI(ctx, queue, uri any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetByURI", reflect.TypeOf((*MockChangeStore)(nil).GetByURI), ctx, queue, uri) +} diff --git a/extension/changestore/mysql/BUILD.bazel b/extension/changestore/mysql/BUILD.bazel new file mode 100644 index 00000000..73f9f090 --- /dev/null +++ b/extension/changestore/mysql/BUILD.bazel @@ -0,0 +1,14 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "mysql", + srcs = ["change_store.go"], + importpath = "github.com/uber/submitqueue/extension/changestore/mysql", + visibility = ["//visibility:public"], + deps = [ + "//core/metrics", + "//entity", + "//extension/changestore", + "@com_github_uber_go_tally_v4//:tally", + ], +) diff --git a/extension/changestore/mysql/change_store.go b/extension/changestore/mysql/change_store.go new file mode 100644 index 00000000..929f527f --- /dev/null +++ b/extension/changestore/mysql/change_store.go @@ -0,0 +1,87 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "context" + "database/sql" + "fmt" + + "github.com/uber-go/tally/v4" + + "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/changestore" +) + +type changeStore struct { + db *sql.DB + scope tally.Scope +} + +// NewChangeStore creates a new MySQL-backed ChangeStore. +func NewChangeStore(db *sql.DB, scope tally.Scope) changestore.ChangeStore { + return &changeStore{db: db, scope: scope} +} + +// Create inserts a single ChangeRecord. A primary-key conflict on +// (queue, uri, request_id) is silently ignored via INSERT IGNORE, so +// queue-redelivery of the same request is a no-op. +func (s *changeStore) Create(ctx context.Context, record entity.ChangeRecord) (retErr error) { + op := metrics.Begin(s.scope, "create") + defer func() { op.Complete(retErr) }() + + // Use the empty JSON object as the canonical "no metadata yet" value. + // metadata is NOT NULL in the schema and the JSON column type rejects an empty string. + metadata := record.Metadata + if metadata == "" { + metadata = "{}" + } + + const query = "INSERT IGNORE INTO `change` (uri, request_id, queue, metadata, created_at, updated_at, version) VALUES (?, ?, ?, ?, ?, ?, ?)" + if _, err := s.db.ExecContext(ctx, query, + record.URI, record.RequestID, record.Queue, metadata, record.CreatedAt, record.UpdatedAt, record.Version, + ); err != nil { + return fmt.Errorf("failed to insert change record uri=%s request_id=%s: %w", record.URI, record.RequestID, err) + } + return nil +} + +// GetByURI returns every ChangeRecord for (queue, uri). queue leads the WHERE +// clause to align with the (queue, uri, request_id) PK so this is a PK-prefix scan. +func (s *changeStore) GetByURI(ctx context.Context, queue string, uri string) (ret []entity.ChangeRecord, retErr error) { + op := metrics.Begin(s.scope, "get_by_uri") + defer func() { op.Complete(retErr) }() + + const query = "SELECT uri, request_id, queue, metadata, created_at, updated_at, version FROM `change` WHERE queue = ? AND uri = ?" + rows, err := s.db.QueryContext(ctx, query, queue, uri) + if err != nil { + return nil, fmt.Errorf("failed to query change records for queue=%s uri=%s: %w", queue, uri, err) + } + defer rows.Close() + + var results []entity.ChangeRecord + for rows.Next() { + var rec entity.ChangeRecord + if err := rows.Scan(&rec.URI, &rec.RequestID, &rec.Queue, &rec.Metadata, &rec.CreatedAt, &rec.UpdatedAt, &rec.Version); err != nil { + return nil, fmt.Errorf("failed to scan change record for queue=%s uri=%s: %w", queue, uri, err) + } + results = append(results, rec) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("failed to iterate change records for queue=%s uri=%s: %w", queue, uri, err) + } + return results, nil +} diff --git a/extension/changestore/mysql/schema/BUILD.bazel b/extension/changestore/mysql/schema/BUILD.bazel new file mode 100644 index 00000000..3412d773 --- /dev/null +++ b/extension/changestore/mysql/schema/BUILD.bazel @@ -0,0 +1,5 @@ +filegroup( + name = "schema", + srcs = glob(["*.sql"]), + visibility = ["//visibility:public"], +) diff --git a/extension/changestore/mysql/schema/change.sql b/extension/changestore/mysql/schema/change.sql new file mode 100644 index 00000000..9492402f --- /dev/null +++ b/extension/changestore/mysql/schema/change.sql @@ -0,0 +1,15 @@ +-- request_id is part of the PK so concurrent claims by different requests on the +-- same URI coexist as distinct rows. Same-request retry → PK conflict (no-op via +-- INSERT IGNORE). Different-request collision → distinct row, surfaced by +-- FindOverlapping. Queue leads the PK so queue-scoped lookups are PK-prefix scans +-- and the table is shardable by queue. +CREATE TABLE IF NOT EXISTS `change` ( + uri VARCHAR(255) NOT NULL, + request_id VARCHAR(255) NOT NULL, + queue VARCHAR(255) NOT NULL, + metadata JSON NOT NULL, + created_at BIGINT NOT NULL, + updated_at BIGINT NOT NULL, + version INT NOT NULL, + PRIMARY KEY (queue, uri, request_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/test/e2e/BUILD.bazel b/test/e2e/BUILD.bazel index a6ac7943..97fc8d32 100644 --- a/test/e2e/BUILD.bazel +++ b/test/e2e/BUILD.bazel @@ -7,6 +7,7 @@ go_test( "//:MODULE.bazel", "//:go.mod", "//example/server:docker-compose.yml", + "//extension/changestore/mysql/schema", "//extension/counter/mysql/schema", "//extension/queue/mysql/schema", "//extension/storage/mysql/schema", diff --git a/test/e2e/suite_test.go b/test/e2e/suite_test.go index 17a74ed4..ee2e19c9 100644 --- a/test/e2e/suite_test.go +++ b/test/e2e/suite_test.go @@ -85,6 +85,7 @@ func (s *E2EIntegrationSuite) SetupSuite() { // Apply schemas programmatically to application database testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("extension/storage/mysql/schema")) testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("extension/counter/mysql/schema")) + testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("extension/changestore/mysql/schema")) // Apply schemas programmatically to queue database testutil.ApplySchema(t, s.log, s.queueDB, testutil.SchemaDir("extension/queue/mysql/schema")) diff --git a/test/integration/extension/changestore/mysql/BUILD.bazel b/test/integration/extension/changestore/mysql/BUILD.bazel new file mode 100644 index 00000000..f203f98f --- /dev/null +++ b/test/integration/extension/changestore/mysql/BUILD.bazel @@ -0,0 +1,22 @@ +load("@rules_go//go:def.bzl", "go_test") + +go_test( + name = "mysql_test", + srcs = ["changestore_test.go"], + data = [ + "docker-compose.yml", + "//extension/changestore/mysql/schema", + ], + tags = ["integration"], + deps = [ + "//entity", + "//extension/changestore", + "//extension/changestore/mysql", + "//test/testutil", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_stretchr_testify//suite", + "@com_github_uber_go_tally_v4//:tally", + ], +) diff --git a/test/integration/extension/changestore/mysql/changestore_test.go b/test/integration/extension/changestore/mysql/changestore_test.go new file mode 100644 index 00000000..25c67199 --- /dev/null +++ b/test/integration/extension/changestore/mysql/changestore_test.go @@ -0,0 +1,202 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "context" + "database/sql" + "sort" + "testing" + + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/uber-go/tally/v4" + + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/changestore" + mysqlchangestore "github.com/uber/submitqueue/extension/changestore/mysql" + "github.com/uber/submitqueue/test/testutil" +) + +// MySQLChangeStoreIntegrationSuite tests the MySQL ChangeStore implementation +// against a real MySQL instance launched via docker-compose. +type MySQLChangeStoreIntegrationSuite struct { + suite.Suite + stack *testutil.ComposeStack + db *sql.DB + store changestore.ChangeStore + log *testutil.TestLogger + ctx context.Context +} + +func TestMySQLChangeStoreIntegration(t *testing.T) { + suite.Run(t, new(MySQLChangeStoreIntegrationSuite)) +} + +func (s *MySQLChangeStoreIntegrationSuite) SetupSuite() { + t := s.T() + s.ctx = context.Background() + s.log = testutil.NewTestLogger(t) + + s.log.Logf("Starting MySQL ChangeStore integration test suite using docker-compose") + + s.stack = testutil.NewComposeStack( + t, + s.log, + s.ctx, + "docker-compose.yml", + "ext-changestore-mysql", + ) + + require.NoError(t, s.stack.Up(), "failed to start compose stack") + s.log.Logf("Compose stack started successfully") + + var err error + s.db, err = s.stack.ConnectMySQLService("mysql") + require.NoError(t, err, "failed to connect to MySQL") + + testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("extension/changestore/mysql/schema")) + s.log.Logf("Schemas applied successfully") + + s.store = mysqlchangestore.NewChangeStore(s.db, tally.NoopScope) + + t.Cleanup(func() { + if s.db != nil { + s.log.Logf("Closing MySQL connection") + s.db.Close() + } + }) +} + +// SetupTest truncates the change table between tests so cases stay isolated. +func (s *MySQLChangeStoreIntegrationSuite) SetupTest() { + _, err := s.db.ExecContext(s.ctx, "TRUNCATE TABLE `change`") + require.NoError(s.T(), err) +} + +func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndGet_NoMatch() { + t := s.T() + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, + })) + + got, err := s.store.GetByURI(s.ctx, "q", "github://uber/x/pull/2/bbb") + require.NoError(t, err) + assert.Empty(t, got) +} + +func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndGet_Match() { + t := s.T() + uri := "github://uber/x/pull/1/aaa" + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, + })) + + got, err := s.store.GetByURI(s.ctx, "q", uri) + require.NoError(t, err) + require.Len(t, got, 1) + assert.Equal(t, "q/1", got[0].RequestID) + assert.Equal(t, uri, got[0].URI) + assert.Equal(t, "q", got[0].Queue) + assert.Equal(t, int32(1), got[0].Version) +} + +func (s *MySQLChangeStoreIntegrationSuite) TestGetByURI_DoesNotExcludeSelf() { + // The store does not filter by request_id; callers filter self if they wish. + t := s.T() + uri := "github://uber/x/pull/1/aaa" + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, + })) + + got, err := s.store.GetByURI(s.ctx, "q", uri) + require.NoError(t, err) + require.Len(t, got, 1, "store returns the row even when caller might consider it self") + assert.Equal(t, "q/1", got[0].RequestID) +} + +func (s *MySQLChangeStoreIntegrationSuite) TestGetByURI_QueueScoped() { + t := s.T() + uri := "github://uber/x/pull/1/aaa" + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: uri, RequestID: "qA/1", Queue: "qA", CreatedAt: 1, UpdatedAt: 1, Version: 1, + })) + + got, err := s.store.GetByURI(s.ctx, "qB", uri) + require.NoError(t, err) + assert.Empty(t, got, "GetByURI must not return rows from a different queue") +} + +func (s *MySQLChangeStoreIntegrationSuite) TestCreate_Idempotent() { + t := s.T() + rec := entity.ChangeRecord{URI: "github://uber/x/pull/1/aaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1} + + require.NoError(t, s.store.Create(s.ctx, rec)) + require.NoError(t, s.store.Create(s.ctx, rec), "second insert with same PK must succeed (INSERT IGNORE)") + + var count int + require.NoError(t, s.db.QueryRowContext(s.ctx, "SELECT COUNT(*) FROM `change`").Scan(&count)) + assert.Equal(t, 1, count, "INSERT IGNORE must not duplicate rows") +} + +func (s *MySQLChangeStoreIntegrationSuite) TestCreate_DifferentRequestSameURI() { + t := s.T() + uri := "github://uber/x/pull/1/aaa" + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, + })) + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: uri, RequestID: "q/2", Queue: "q", CreatedAt: 2, UpdatedAt: 2, Version: 1, + })) + + got, err := s.store.GetByURI(s.ctx, "q", uri) + require.NoError(t, err) + require.Len(t, got, 2) + + ids := []string{got[0].RequestID, got[1].RequestID} + sort.Strings(ids) + assert.Equal(t, []string{"q/1", "q/2"}, ids) +} + +func (s *MySQLChangeStoreIntegrationSuite) TestCreate_PreservesMetadata() { + t := s.T() + const meta = `{"title":"add new feature"}` + uri := "github://uber/x/pull/1/aaa" + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: uri, RequestID: "q/1", Queue: "q", Metadata: meta, CreatedAt: 1, UpdatedAt: 1, Version: 1, + })) + + got, err := s.store.GetByURI(s.ctx, "q", uri) + require.NoError(t, err) + require.Len(t, got, 1) + assert.JSONEq(t, meta, got[0].Metadata) +} + +func (s *MySQLChangeStoreIntegrationSuite) TestCreate_EmptyMetadataStoredAsObject() { + // metadata is NOT NULL in the schema. The impl substitutes '{}' for an empty + // Metadata field so callers don't need to know about the column constraint. + t := s.T() + uri := "github://uber/x/pull/1/aaa" + require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ + URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, + })) + + got, err := s.store.GetByURI(s.ctx, "q", uri) + require.NoError(t, err) + require.Len(t, got, 1) + assert.JSONEq(t, "{}", got[0].Metadata) +} diff --git a/test/integration/extension/changestore/mysql/docker-compose.yml b/test/integration/extension/changestore/mysql/docker-compose.yml new file mode 100644 index 00000000..74241c77 --- /dev/null +++ b/test/integration/extension/changestore/mysql/docker-compose.yml @@ -0,0 +1,19 @@ +# Docker Compose for MySQL ChangeStore Library Tests +# Tests the changestore library's MySQL implementation in isolation. + +services: + mysql: + image: mysql:8.0 + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: submitqueue + ports: + - "3306" # Random ephemeral port to avoid conflicts + healthcheck: + # Use 127.0.0.1 (TCP) instead of localhost (Unix socket). MySQL treats + # "localhost" as a socket connection, which can be ready before the TCP + # listener — causing dependent services that connect over TCP to fail. + test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"] + interval: 5s + timeout: 5s + retries: 10