Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extension/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library")
go_library(
name = "storage",
srcs = [
"batch_store.go",
"change_provider_store.go",
"request_store.go",
"storage.go",
Expand Down
21 changes: 21 additions & 0 deletions extension/storage/batch_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package storage

import (
"context"

"github.com/uber/submitqueue/entity"
)

// BatchStore is an interface that defines methods for managing batches in the database.
type BatchStore interface {
// Get retrieves a batch by ID. Returns ErrNotFound if the batch is not found.
Get(ctx context.Context, id string) (entity.Batch, error)

// Create creates a new batch. The batch must have a unique ID already assigned.
// Returns ErrAlreadyExists if a batch with the same ID already exists.
Create(ctx context.Context, batch entity.Batch) error

// UpdateState updates the state of a batch if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch.
// The implementation should increment the version by 1 atomically with the state update.
UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error
}
1 change: 1 addition & 0 deletions extension/storage/mysql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library")
go_library(
name = "mysql",
srcs = [
"batch_store.go",
"change_provider_store.go",
"request_store.go",
"storage.go",
Expand Down
111 changes: 111 additions & 0 deletions extension/storage/mysql/batch_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package mysql

import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"

"github.com/go-sql-driver/mysql"

"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/storage"
)

type batchStore struct {
db *sql.DB
}

// NewBatchStore creates a new MySQL-backed BatchStore.
func NewBatchStore(db *sql.DB) storage.BatchStore {
return &batchStore{db: db}
}

// Get retrieves a batch by ID. Returns ErrNotFound if the batch is not found.
func (s *batchStore) Get(ctx context.Context, id string) (entity.Batch, error) {
var batch entity.Batch
var containsJSON []byte
var dependenciesJSON []byte

err := s.db.QueryRowContext(ctx,
"SELECT id, queue, contains, dependencies, state, version FROM batch WHERE id = ?",
id,
).Scan(&batch.ID, &batch.Queue, &containsJSON, &dependenciesJSON, &batch.State, &batch.Version)

if errors.Is(err, sql.ErrNoRows) {
return entity.Batch{}, storage.WrapNotFound(err)
}
if err != nil {
return entity.Batch{}, fmt.Errorf("failed to get batch entity id=%s from the database: %w", id, err)
}

if err := json.Unmarshal(containsJSON, &batch.Contains); err != nil {
return entity.Batch{}, fmt.Errorf("failed to unmarshal contains for batch entity id=%s from the database: %w", id, err)
}

if err := json.Unmarshal(dependenciesJSON, &batch.Dependencies); err != nil {
return entity.Batch{}, fmt.Errorf("failed to unmarshal dependencies for batch entity id=%s from the database: %w", id, err)
}

return batch, nil
}

// Create creates a new batch. The batch must have a unique ID already assigned. Returns ErrAlreadyExists if the batch ID already exists.
func (s *batchStore) Create(ctx context.Context, batch entity.Batch) error {
containsJSON, err := json.Marshal(batch.Contains)
if err != nil {
return fmt.Errorf("failed to marshal contains=%v id=%s for Create batch entity: %w", batch.Contains, batch.ID, err)
}

dependenciesJSON, err := json.Marshal(batch.Dependencies)
if err != nil {
return fmt.Errorf("failed to marshal dependencies=%v id=%s for Create batch entity: %w", batch.Dependencies, batch.ID, err)
}

_, err = s.db.ExecContext(ctx,
"INSERT INTO batch (id, queue, contains, dependencies, state, version) VALUES (?, ?, ?, ?, ?, ?)",
batch.ID, batch.Queue, containsJSON, dependenciesJSON, batch.State, batch.Version,
)
if err != nil {
var mysqlErr *mysql.MySQLError
if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 {
return fmt.Errorf("batch entity id=%s: %w", batch.ID, storage.ErrAlreadyExists)
}
return fmt.Errorf("failed to insert batch entity id=%s: %w", batch.ID, err)
}

return nil
}

// UpdateState updates the state of a batch if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch.
// The implementation increments the version by 1 atomically with the state update.
func (s *batchStore) UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error {
result, err := s.db.ExecContext(ctx,
"UPDATE batch SET state = ?, version = version + 1 WHERE id = ? AND version = ?",
newState, id, version,
)
if err != nil {
return fmt.Errorf(
"failed to update batch state for id=%q version=%d newState=%v: %w",
id, version, newState, err,
)
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf(
"failed to get rows affected from update for id=%q version=%d newState=%v: %w",
id, version, newState, err,
)
}

if rowsAffected != 1 {
Comment thread
manjari25 marked this conversation as resolved.
return fmt.Errorf(
"version mismatch for batch update: id=%q expected_version=%d newState=%v: %w",
id, version, newState, storage.ErrVersionMismatch,
)
}

return nil
}
7 changes: 7 additions & 0 deletions extension/storage/mysql/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type mysqlStorage struct {
db *sql.DB
requestStore storage.RequestStore
changeProviderStore storage.ChangeProviderStore
batchStore storage.BatchStore
}

// NewStorage creates a new MySQL storage.
Expand All @@ -20,6 +21,7 @@ func NewStorage(db *sql.DB) (storage.Storage, error) {
db: db,
requestStore: NewRequestStore(db),
changeProviderStore: NewChangeProviderStore(db),
batchStore: NewBatchStore(db),
}, nil
}

Expand All @@ -33,6 +35,11 @@ func (f *mysqlStorage) GetChangeProviderStore() storage.ChangeProviderStore {
return f.changeProviderStore
}

// GetBatchStore returns the MySQL-backed BatchStore.
func (f *mysqlStorage) GetBatchStore() storage.BatchStore {
return f.batchStore
}

// Close closes the underlying database connection.
func (f *mysqlStorage) Close() error {
return f.db.Close()
Expand Down
3 changes: 3 additions & 0 deletions extension/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type Storage interface {
// GetChangeProviderStore returns the ChangeProviderStore instance.
GetChangeProviderStore() ChangeProviderStore

// GetBatchStore returns the BatchStore instance.
GetBatchStore() BatchStore

// Close closes the storage and all underlying connections. Should only be called once at the end of the program.
Close() error
}
32 changes: 32 additions & 0 deletions gateway/controller/land_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,37 @@ func (m *mockChangeProviderStore) Create(ctx context.Context, changeProvider ent
return m.createFunc(ctx, changeProvider)
}

type mockBatchStore struct {
Comment thread
manjari25 marked this conversation as resolved.
createFunc func(ctx context.Context, batch entity.Batch) error
getFunc func(ctx context.Context, id string) (entity.Batch, error)
updateStateFunc func(ctx context.Context, id string, version int32, newState entity.BatchState) error
}

func (m *mockBatchStore) Get(ctx context.Context, id string) (entity.Batch, error) {
if m.getFunc != nil {
return m.getFunc(ctx, id)
}
return entity.Batch{}, nil
}

func (m *mockBatchStore) Create(ctx context.Context, batch entity.Batch) error {
if m.createFunc != nil {
return m.createFunc(ctx, batch)
}
return nil
}

func (m *mockBatchStore) UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error {
if m.updateStateFunc != nil {
return m.updateStateFunc(ctx, id, version, newState)
}
return nil
}

type mockStorage struct {
requestStore storage.RequestStore
changeProviderStore storage.ChangeProviderStore
batchStore storage.BatchStore
}

func (m *mockStorage) GetRequestStore() storage.RequestStore {
Expand All @@ -64,6 +92,10 @@ func (m *mockStorage) GetChangeProviderStore() storage.ChangeProviderStore {
return m.changeProviderStore
}

func (m *mockStorage) GetBatchStore() storage.BatchStore {
return m.batchStore
}

func (m *mockStorage) Close() error {
return nil
}
Expand Down