From 38ca120d2fd5d9d9ee08bbe698e076c9666c1fd9 Mon Sep 17 00:00:00 2001 From: manjari Date: Tue, 24 Feb 2026 03:42:27 +0000 Subject: [PATCH 1/2] feat(entities) Adds batch dependent store interface and mysql implementation --- extension/storage/BUILD.bazel | 1 + extension/storage/mysql/BUILD.bazel | 1 + extension/storage/mysql/storage.go | 23 ++++++++++++++-------- extension/storage/storage.go | 3 +++ gateway/controller/land_test.go | 30 ++++++++++++++++++++++++++--- 5 files changed, 47 insertions(+), 11 deletions(-) diff --git a/extension/storage/BUILD.bazel b/extension/storage/BUILD.bazel index 8e2c0693..dfc26dcd 100644 --- a/extension/storage/BUILD.bazel +++ b/extension/storage/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "storage", srcs = [ + "batch_dependent_store.go", "batch_store.go", "change_provider_store.go", "request_store.go", diff --git a/extension/storage/mysql/BUILD.bazel b/extension/storage/mysql/BUILD.bazel index 3304b7f2..f33770ab 100644 --- a/extension/storage/mysql/BUILD.bazel +++ b/extension/storage/mysql/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "mysql", srcs = [ + "batch_dependent_store.go", "batch_store.go", "change_provider_store.go", "request_store.go", diff --git a/extension/storage/mysql/storage.go b/extension/storage/mysql/storage.go index e876c59a..310b5e86 100644 --- a/extension/storage/mysql/storage.go +++ b/extension/storage/mysql/storage.go @@ -9,19 +9,21 @@ import ( ) type mysqlStorage struct { - db *sql.DB - requestStore storage.RequestStore - changeProviderStore storage.ChangeProviderStore - batchStore storage.BatchStore + db *sql.DB + requestStore storage.RequestStore + changeProviderStore storage.ChangeProviderStore + batchStore storage.BatchStore + batchDependentStore storage.BatchDependentStore } // NewStorage creates a new MySQL storage. func NewStorage(db *sql.DB) (storage.Storage, error) { return &mysqlStorage{ - db: db, - requestStore: NewRequestStore(db), - changeProviderStore: NewChangeProviderStore(db), - batchStore: NewBatchStore(db), + db: db, + requestStore: NewRequestStore(db), + changeProviderStore: NewChangeProviderStore(db), + batchStore: NewBatchStore(db), + batchDependentStore: NewBatchDependentStore(db), }, nil } @@ -40,6 +42,11 @@ func (f *mysqlStorage) GetBatchStore() storage.BatchStore { return f.batchStore } +// GetBatchDependentStore returns the MySQL-backed BatchDependentStore. +func (f *mysqlStorage) GetBatchDependentStore() storage.BatchDependentStore { + return f.batchDependentStore +} + // Close closes the underlying database connection. func (f *mysqlStorage) Close() error { return f.db.Close() diff --git a/extension/storage/storage.go b/extension/storage/storage.go index 72c94872..4ed8e809 100644 --- a/extension/storage/storage.go +++ b/extension/storage/storage.go @@ -35,6 +35,9 @@ type Storage interface { // GetBatchStore returns the BatchStore instance. GetBatchStore() BatchStore + // GetBatchDependentStore returns the BatchDependentStore instance. + GetBatchDependentStore() BatchDependentStore + // Close closes the storage and all underlying connections. Should only be called once at the end of the program. Close() error } diff --git a/gateway/controller/land_test.go b/gateway/controller/land_test.go index 01532a9c..a6b657fd 100644 --- a/gateway/controller/land_test.go +++ b/gateway/controller/land_test.go @@ -78,10 +78,30 @@ func (m *mockBatchStore) UpdateState(ctx context.Context, id string, version int return nil } +type mockBatchDependentStore struct { + createFunc func(ctx context.Context, batchDependent entity.BatchDependent) error + getFunc func(ctx context.Context, batchID string) (entity.BatchDependent, error) +} + +func (m *mockBatchDependentStore) Get(ctx context.Context, batchID string) (entity.BatchDependent, error) { + if m.getFunc != nil { + return m.getFunc(ctx, batchID) + } + return entity.BatchDependent{}, nil +} + +func (m *mockBatchDependentStore) Create(ctx context.Context, batchDependent entity.BatchDependent) error { + if m.createFunc != nil { + return m.createFunc(ctx, batchDependent) + } + return nil +} + type mockStorage struct { - requestStore storage.RequestStore - changeProviderStore storage.ChangeProviderStore - batchStore storage.BatchStore + requestStore storage.RequestStore + changeProviderStore storage.ChangeProviderStore + batchStore storage.BatchStore + batchDependentStore storage.BatchDependentStore } func (m *mockStorage) GetRequestStore() storage.RequestStore { @@ -96,6 +116,10 @@ func (m *mockStorage) GetBatchStore() storage.BatchStore { return m.batchStore } +func (m *mockStorage) GetBatchDependentStore() storage.BatchDependentStore { + return m.batchDependentStore +} + func (m *mockStorage) Close() error { return nil } From b7f08dbf8d70c832bacbc5146e8634d8bed73c47 Mon Sep 17 00:00:00 2001 From: manjari Date: Tue, 24 Feb 2026 03:44:45 +0000 Subject: [PATCH 2/2] add leftover files --- extension/storage/batch_dependent_store.go | 21 ++++++ .../storage/mysql/batch_dependent_store.go | 69 +++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 extension/storage/batch_dependent_store.go create mode 100644 extension/storage/mysql/batch_dependent_store.go diff --git a/extension/storage/batch_dependent_store.go b/extension/storage/batch_dependent_store.go new file mode 100644 index 00000000..4f120039 --- /dev/null +++ b/extension/storage/batch_dependent_store.go @@ -0,0 +1,21 @@ +package storage + +import ( + "context" + + "github.com/uber/submitqueue/entity" +) + +// BatchDependentStore is an interface that defines methods for managing batch dependent information in the database. +type BatchDependentStore interface { + // Get retrieves the batch dependent by batch ID. + // Returns ErrNotFound if the batch dependent is not found. + Get(ctx context.Context, batchID string) (entity.BatchDependent, error) + + // Create creates a new batch dependent. + // Returns ErrAlreadyExists if the entry already exists. + Create(ctx context.Context, batchDependent entity.BatchDependent) error + + // There is no update function since once created, data is only ever read from this + // store. +} diff --git a/extension/storage/mysql/batch_dependent_store.go b/extension/storage/mysql/batch_dependent_store.go new file mode 100644 index 00000000..7d81c0ba --- /dev/null +++ b/extension/storage/mysql/batch_dependent_store.go @@ -0,0 +1,69 @@ +package mysql + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + + "github.com/go-sql-driver/mysql" + + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/storage" +) + +type batchDependentStore struct { + db *sql.DB +} + +// NewBatchDependentStore creates a new MySQL-backed BatchDependentStore. +func NewBatchDependentStore(db *sql.DB) storage.BatchDependentStore { + return &batchDependentStore{db: db} +} + +// Get retrieves the batch dependent by batch ID. Returns ErrNotFound if the batch dependent is not found. +func (s *batchDependentStore) Get(ctx context.Context, batchID string) (entity.BatchDependent, error) { + var bd entity.BatchDependent + var dependentsJSON []byte + + err := s.db.QueryRowContext(ctx, + "SELECT batch_id, dependents FROM batch_dependent WHERE batch_id = ?", + batchID, + ).Scan(&bd.BatchID, &dependentsJSON) + + if errors.Is(err, sql.ErrNoRows) { + return entity.BatchDependent{}, storage.WrapNotFound(err) + } + if err != nil { + return entity.BatchDependent{}, fmt.Errorf("failed to get batch dependent entity batchID=%s from the database: %w", batchID, err) + } + + if err := json.Unmarshal(dependentsJSON, &bd.Dependents); err != nil { + return entity.BatchDependent{}, fmt.Errorf("failed to unmarshal dependents for batch dependent entity batchID=%s from the database: %w", batchID, err) + } + + return bd, nil +} + +// Create creates a new batch dependent. Returns ErrAlreadyExists if the entry already exists. +func (s *batchDependentStore) Create(ctx context.Context, batchDependent entity.BatchDependent) error { + dependentsJSON, err := json.Marshal(batchDependent.Dependents) + if err != nil { + return fmt.Errorf("failed to marshal dependents batchID=%s for Create batch dependent entity: %w", batchDependent.BatchID, err) + } + + _, err = s.db.ExecContext(ctx, + "INSERT INTO batch_dependent (batch_id, dependents) VALUES (?, ?)", + batchDependent.BatchID, dependentsJSON, + ) + if err != nil { + var mysqlErr *mysql.MySQLError + if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 { + return fmt.Errorf("batch dependent entity batchID=%s: %w", batchDependent.BatchID, storage.ErrAlreadyExists) + } + return fmt.Errorf("failed to insert batch dependent entity batchID=%s: %w", batchDependent.BatchID, err) + } + + return nil +}