Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion entity/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "entity",
srcs = ["request.go"],
srcs = [
"change_provider.go",
"request.go",
],
importpath = "github.com/uber/submitqueue/entity",
visibility = ["//visibility:public"],
)
15 changes: 15 additions & 0 deletions entity/change_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package entity

// ChangeProvider represents a code change from an external provider (e.g., a GitHub pull request or Gerrit changelist)
// along with its associated metadata. The object is immutable after creation.
type ChangeProvider struct {
// RequestID is the globally unique identifier for the land request. Format: "<queue>/<counter_value>".
RequestID string
// ChangeProviderSrc defines the source of the change. For e.g. - Github, Gitlab etc.
ChangeProviderSrc string
// ChangeProviderID is the identifier specified by the change provider source. For e.g. - Github PR ID etc.
ChangeProviderID string
// Metadata is the interesting data from the change provider that we want to store.
// This is a freeform JSON object.
Metadata map[string]string
}
1 change: 1 addition & 0 deletions extension/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library")
go_library(
name = "storage",
srcs = [
"change_provider_store.go",
"request_store.go",
"storage.go",
],
Expand Down
26 changes: 26 additions & 0 deletions extension/storage/change_provider_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package storage

import (
"context"

"github.com/uber/submitqueue/entity"
)

// ChangeProviderStore is an interface that defines methods for managing change provider information in the database.
type ChangeProviderStore interface {
// Get retrieves information about a change by ID.
// Returns ErrNotFound if the change provider is not found.
//
// Note: The order of ChangeProvider entities here is not guaranteed to
// be the same as the request to which it belongs. The caller is repsonsible
// for inspecting and mapping the result of this function to the
// order of changes within the original request.
//
Get(ctx context.Context, requestID string) ([]entity.ChangeProvider, error)

// Create creates a new change provider.
Create(ctx context.Context, changeProvider entity.ChangeProvider) error

// There is no update function since once created, data is only ever read from this
// store.
}
1 change: 1 addition & 0 deletions extension/storage/mysql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library")
go_library(
name = "mysql",
srcs = [
"change_provider_store.go",
"request_store.go",
"storage.go",
],
Expand Down
88 changes: 88 additions & 0 deletions extension/storage/mysql/change_provider_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package mysql

import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"

"github.com/go-sql-driver/mysql"

"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/storage"
)

type changeProviderStore struct {
db *sql.DB
}

// NewChangeProviderStore creates a new MySQL-backed ChangeProviderStore.
func NewChangeProviderStore(db *sql.DB) storage.ChangeProviderStore {
return &changeProviderStore{db: db}
}

// Get retrieves change provider(s) by request ID. Returns ErrNotFound if the change provider is not found.
//
// Note: The order of ChangeProvider entities returned here is not guaranteed
// to be the same as the request to which it belongs. The caller is repsonsible
// for inspecting and mapping the result of this function to the
// order of changes within the original request.
//
func (s *changeProviderStore) Get(ctx context.Context, requestID string) ([]entity.ChangeProvider, error) {
rows, err := s.db.QueryContext(ctx,
"SELECT request_id, change_provider_src, change_provider_id, metadata FROM change_provider WHERE request_id = ?",
requestID,
)
if err != nil {
return nil, fmt.Errorf("failed to get change provider entities requestID=%s from the database: %w", requestID, err)
}
defer rows.Close()

var results []entity.ChangeProvider
for rows.Next() {
var cp entity.ChangeProvider
var metadataJSON []byte

if err := rows.Scan(&cp.RequestID, &cp.ChangeProviderSrc, &cp.ChangeProviderID, &metadataJSON); err != nil {
return nil, fmt.Errorf("failed to scan change provider entity requestID=%s from the database: %w", requestID, err)
}

if err := json.Unmarshal(metadataJSON, &cp.Metadata); err != nil {
return nil, fmt.Errorf("failed to unmarshal metadata for change provider entity requestID=%s from the database: %w", requestID, err)
}

results = append(results, cp)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate change provider entities requestID=%s from the database: %w", requestID, err)
}

if len(results) == 0 {
return nil, storage.WrapNotFound(fmt.Errorf("change provider entity requestID=%s", requestID))
}

return results, nil
}

// Create creates a new change provider. Returns ErrAlreadyExists if the entry already exists.
func (s *changeProviderStore) Create(ctx context.Context, changeProvider entity.ChangeProvider) error {
metadataJSON, err := json.Marshal(changeProvider.Metadata)
if err != nil {
return fmt.Errorf("failed to marshal metadata id=%s for Create change provider entity: %w", changeProvider.RequestID, err)
}

_, err = s.db.ExecContext(ctx,
"INSERT INTO change_provider (request_id, change_provider_src, change_provider_id, metadata) VALUES (?, ?, ?, ?)",
changeProvider.RequestID, changeProvider.ChangeProviderSrc, changeProvider.ChangeProviderID, metadataJSON,
)
if err != nil {
var mysqlErr *mysql.MySQLError
if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 {
return fmt.Errorf("change provider entity id=%s: %w", changeProvider.RequestID, storage.ErrAlreadyExists)
}
return fmt.Errorf("failed to insert change provider entity id=%s: %w", changeProvider.RequestID, err)
}

return nil
}
7 changes: 7 additions & 0 deletions extension/storage/mysql/schema/change_provider.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS change_provider (
request_id VARCHAR(255) NOT NULL,
change_provider_src VARCHAR(255) NOT NULL,
change_provider_id VARCHAR(255) NOT NULL,
metadata JSON NOT NUll,
PRIMARY KEY (request_id,change_provider_src,change_provider_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
15 changes: 11 additions & 4 deletions extension/storage/mysql/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ type MySQLParameters struct {
}

type mysqlStorage struct {
db *sql.DB
requestStore storage.RequestStore
db *sql.DB
requestStore storage.RequestStore
changeProviderStore storage.ChangeProviderStore
}

// NewStorage creates a new MySQL storage.
Expand All @@ -50,8 +51,9 @@ func NewStorage(p MySQLParameters) (storage.Storage, error) {
}

return &mysqlStorage{
db: db,
requestStore: NewRequestStore(db),
db: db,
requestStore: NewRequestStore(db),
changeProviderStore: NewChangeProviderStore(db),
}, nil
}

Expand All @@ -60,6 +62,11 @@ func (f *mysqlStorage) GetRequestStore() storage.RequestStore {
return f.requestStore
}

// GetChangeProviderStore returns the MySQL-backed ChangeProviderStore.
func (f *mysqlStorage) GetChangeProviderStore() storage.ChangeProviderStore {
return f.changeProviderStore
}

// Close closes the underlying database connection.
func (f *mysqlStorage) Close() error {
return f.db.Close()
Expand Down
3 changes: 3 additions & 0 deletions extension/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Storage interface {
// GetRequestStore returns the RequestStore instance.
GetRequestStore() RequestStore

// GetChangeProviderStore returns the ChangeProviderStore instance.
GetChangeProviderStore() ChangeProviderStore

// Close closes the storage and all underlying connections. Should only be called once at the end of the program.
Close() error
}
111 changes: 85 additions & 26 deletions gateway/controller/land_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,48 @@ func (m *mockRequestStore) UpdateState(ctx context.Context, id string, version i
return nil
}

type mockChangeProviderStore struct {
createFunc func(ctx context.Context, changeProvider entity.ChangeProvider) error
}

func (m *mockChangeProviderStore) Get(ctx context.Context, requestID string) ([]entity.ChangeProvider, error) {
return nil, nil
}

func (m *mockChangeProviderStore) Create(ctx context.Context, changeProvider entity.ChangeProvider) error {
return m.createFunc(ctx, changeProvider)
}

type mockStorage struct {
requestStore storage.RequestStore
requestStore storage.RequestStore
changeProviderStore storage.ChangeProviderStore
}

func (m *mockStorage) GetRequestStore() storage.RequestStore {
return m.requestStore
}

func (m *mockStorage) GetChangeProviderStore() storage.ChangeProviderStore {
return m.changeProviderStore
}

func (m *mockStorage) Close() error {
return nil
}

func TestNewLandController(t *testing.T) {
store := &mockStorage{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
store := &mockStorage{
requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
},
},
}}
changeProviderStore: &mockChangeProviderStore{
createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error {
return nil
},
},
}
cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) {
return 1, nil
}}
Expand All @@ -64,11 +88,18 @@ func TestNewLandController(t *testing.T) {
}

func TestLand_ReturnsSqid(t *testing.T) {
store := &mockStorage{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
store := &mockStorage{
requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
},
},
}}
changeProviderStore: &mockChangeProviderStore{
createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error {
return nil
},
},
}
cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) {
return 1, nil
}}
Expand All @@ -88,12 +119,19 @@ func TestLand_ReturnsSqid(t *testing.T) {
func TestLand_PassesCorrectParametersToStore(t *testing.T) {
var capturedRequest entity.Request

store := &mockStorage{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
capturedRequest = request
return nil
store := &mockStorage{
requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
capturedRequest = request
return nil
},
},
}}
changeProviderStore: &mockChangeProviderStore{
createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error {
return nil
},
},
}
cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) {
return 42, nil
}}
Expand All @@ -119,11 +157,18 @@ func TestLand_PassesCorrectParametersToStore(t *testing.T) {
}

func TestLand_ReturnsErrorOnStorageFailure(t *testing.T) {
store := &mockStorage{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return fmt.Errorf("database connection failed")
store := &mockStorage{
requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return fmt.Errorf("database connection failed")
},
},
}}
changeProviderStore: &mockChangeProviderStore{
createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error {
return nil
},
},
}
cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) {
return 1, nil
}}
Expand All @@ -140,11 +185,18 @@ func TestLand_ReturnsErrorOnStorageFailure(t *testing.T) {
}

func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) {
store := &mockStorage{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
store := &mockStorage{
requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
},
},
}}
changeProviderStore: &mockChangeProviderStore{
createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error {
return nil
},
},
}
cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) {
return 0, fmt.Errorf("counter unavailable")
}}
Expand All @@ -163,11 +215,18 @@ func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) {
func TestLand_CounterDomainIncludesQueue(t *testing.T) {
var capturedDomain string

store := &mockStorage{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
store := &mockStorage{
requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
},
},
}}
changeProviderStore: &mockChangeProviderStore{
createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error {
return nil
},
},
}
cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) {
capturedDomain = domain
return 1, nil
Expand Down
Loading