Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extension/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library")
go_library(
name = "storage",
srcs = [
"batch_dependent_store.go",
"batch_store.go",
"change_provider_store.go",
"request_store.go",
Expand Down
21 changes: 21 additions & 0 deletions extension/storage/batch_dependent_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package storage

import (
"context"

"github.com/uber/submitqueue/entity"
)

// BatchDependentStore is an interface that defines methods for managing batch dependent information in the database.
type BatchDependentStore interface {
// Get retrieves the batch dependent by batch ID.
// Returns ErrNotFound if the batch dependent is not found.
Get(ctx context.Context, batchID string) (entity.BatchDependent, error)

// Create creates a new batch dependent.
// Returns ErrAlreadyExists if the entry already exists.
Create(ctx context.Context, batchDependent entity.BatchDependent) error

// There is no update function since once created, data is only ever read from this
// store.
}
1 change: 1 addition & 0 deletions extension/storage/mysql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library")
go_library(
name = "mysql",
srcs = [
"batch_dependent_store.go",
"batch_store.go",
"change_provider_store.go",
"request_store.go",
Expand Down
69 changes: 69 additions & 0 deletions extension/storage/mysql/batch_dependent_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package mysql

import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"

"github.com/go-sql-driver/mysql"

"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/storage"
)

type batchDependentStore struct {
db *sql.DB
}

// NewBatchDependentStore creates a new MySQL-backed BatchDependentStore.
func NewBatchDependentStore(db *sql.DB) storage.BatchDependentStore {
return &batchDependentStore{db: db}
}

// Get retrieves the batch dependent by batch ID. Returns ErrNotFound if the batch dependent is not found.
func (s *batchDependentStore) Get(ctx context.Context, batchID string) (entity.BatchDependent, error) {
var bd entity.BatchDependent
var dependentsJSON []byte

err := s.db.QueryRowContext(ctx,
"SELECT batch_id, dependents FROM batch_dependent WHERE batch_id = ?",
batchID,
).Scan(&bd.BatchID, &dependentsJSON)

if errors.Is(err, sql.ErrNoRows) {
return entity.BatchDependent{}, storage.WrapNotFound(err)
}
if err != nil {
return entity.BatchDependent{}, fmt.Errorf("failed to get batch dependent entity batchID=%s from the database: %w", batchID, err)
}

if err := json.Unmarshal(dependentsJSON, &bd.Dependents); err != nil {
return entity.BatchDependent{}, fmt.Errorf("failed to unmarshal dependents for batch dependent entity batchID=%s from the database: %w", batchID, err)
}

return bd, nil
}

// Create creates a new batch dependent. Returns ErrAlreadyExists if the entry already exists.
func (s *batchDependentStore) Create(ctx context.Context, batchDependent entity.BatchDependent) error {
dependentsJSON, err := json.Marshal(batchDependent.Dependents)
if err != nil {
return fmt.Errorf("failed to marshal dependents batchID=%s for Create batch dependent entity: %w", batchDependent.BatchID, err)
}

_, err = s.db.ExecContext(ctx,
"INSERT INTO batch_dependent (batch_id, dependents) VALUES (?, ?)",
batchDependent.BatchID, dependentsJSON,
)
if err != nil {
var mysqlErr *mysql.MySQLError
if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 {
return fmt.Errorf("batch dependent entity batchID=%s: %w", batchDependent.BatchID, storage.ErrAlreadyExists)
}
return fmt.Errorf("failed to insert batch dependent entity batchID=%s: %w", batchDependent.BatchID, err)
}

return nil
}
23 changes: 15 additions & 8 deletions extension/storage/mysql/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@ import (
)

type mysqlStorage struct {
db *sql.DB
requestStore storage.RequestStore
changeProviderStore storage.ChangeProviderStore
batchStore storage.BatchStore
db *sql.DB
requestStore storage.RequestStore
changeProviderStore storage.ChangeProviderStore
batchStore storage.BatchStore
batchDependentStore storage.BatchDependentStore
}

// NewStorage creates a new MySQL storage.
func NewStorage(db *sql.DB) (storage.Storage, error) {
return &mysqlStorage{
db: db,
requestStore: NewRequestStore(db),
changeProviderStore: NewChangeProviderStore(db),
batchStore: NewBatchStore(db),
db: db,
requestStore: NewRequestStore(db),
changeProviderStore: NewChangeProviderStore(db),
batchStore: NewBatchStore(db),
batchDependentStore: NewBatchDependentStore(db),
}, nil
}

Expand All @@ -40,6 +42,11 @@ func (f *mysqlStorage) GetBatchStore() storage.BatchStore {
return f.batchStore
}

// GetBatchDependentStore returns the MySQL-backed BatchDependentStore.
func (f *mysqlStorage) GetBatchDependentStore() storage.BatchDependentStore {
return f.batchDependentStore
}

// Close closes the underlying database connection.
func (f *mysqlStorage) Close() error {
return f.db.Close()
Expand Down
3 changes: 3 additions & 0 deletions extension/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Storage interface {
// GetBatchStore returns the BatchStore instance.
GetBatchStore() BatchStore

// GetBatchDependentStore returns the BatchDependentStore instance.
GetBatchDependentStore() BatchDependentStore

// Close closes the storage and all underlying connections. Should only be called once at the end of the program.
Close() error
}
30 changes: 27 additions & 3 deletions gateway/controller/land_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,30 @@ func (m *mockBatchStore) UpdateState(ctx context.Context, id string, version int
return nil
}

type mockBatchDependentStore struct {
createFunc func(ctx context.Context, batchDependent entity.BatchDependent) error
getFunc func(ctx context.Context, batchID string) (entity.BatchDependent, error)
}

func (m *mockBatchDependentStore) Get(ctx context.Context, batchID string) (entity.BatchDependent, error) {
if m.getFunc != nil {
return m.getFunc(ctx, batchID)
}
return entity.BatchDependent{}, nil
}

func (m *mockBatchDependentStore) Create(ctx context.Context, batchDependent entity.BatchDependent) error {
if m.createFunc != nil {
return m.createFunc(ctx, batchDependent)
}
return nil
}

type mockStorage struct {
requestStore storage.RequestStore
changeProviderStore storage.ChangeProviderStore
batchStore storage.BatchStore
requestStore storage.RequestStore
changeProviderStore storage.ChangeProviderStore
batchStore storage.BatchStore
batchDependentStore storage.BatchDependentStore
}

func (m *mockStorage) GetRequestStore() storage.RequestStore {
Expand All @@ -96,6 +116,10 @@ func (m *mockStorage) GetBatchStore() storage.BatchStore {
return m.batchStore
}

func (m *mockStorage) GetBatchDependentStore() storage.BatchDependentStore {
return m.batchDependentStore
}

func (m *mockStorage) Close() error {
return nil
}
Expand Down