Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use_repo(
"com_github_go_sql_driver_mysql",
"com_github_gogo_protobuf",
"com_github_stretchr_testify",
"com_github_testcontainers_testcontainers_go",
"com_github_testcontainers_testcontainers_go_modules_mysql",
"com_github_uber_go_tally_v4",
"org_golang_google_grpc",
"org_golang_google_protobuf",
Expand Down
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ integration-test:
@echo "Running all service integration tests..."
@$(BAZEL) test //gateway/integration_tests:integration_tests_test //orchestrator/integration_tests:integration_tests_test //speculator/integration_tests:integration_tests_test --test_output=all

# Run end-to-end tests (requires all services to be running)
# Run end-to-end integration tests (hermetic, no manual server setup needed)
e2e-test:
@echo "Running end-to-end tests..."
@$(BAZEL) test //integration_tests:e2e_test --test_output=all
@echo "Running integration tests..."
@$(BAZEL) test //integration_tests:integration_test --test_output=all

# Clean generated files and binaries
clean:
Expand Down Expand Up @@ -186,7 +186,9 @@ help:
@echo " make integration-test-orchestrator - Test Orchestrator service"
@echo " make integration-test-speculator - Test Speculator service"
@echo " make integration-test - Test all services"
@echo " make e2e-test - Run end-to-end tests"
@echo ""
@echo "End-to-End Tests (hermetic, no setup needed):"
@echo " make e2e-test - Run integration tests with Testcontainers"
@echo ""
@echo "Run Clients:"
@echo " make run-client-gateway - Run gateway client"
Expand Down
1 change: 1 addition & 0 deletions examples/server/gateway/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/uber/submitqueue/examples/server/gateway",
visibility = ["//visibility:private"],
deps = [
"//extensions/storage/mysql",
"//gateway/controller",
"//gateway/protopb",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
40 changes: 34 additions & 6 deletions examples/server/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/extensions/storage/mysql"
"github.com/uber/submitqueue/gateway/controller"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/zap"
Expand All @@ -21,12 +22,18 @@ import (
// GatewayServer wraps the controller and implements the gRPC service interface
type GatewayServer struct {
pb.UnimplementedSubmitQueueGatewayServer
controller *controller.PingController
pingController *controller.PingController
landController *controller.LandController
}

// Ping delegates to the controller
func (s *GatewayServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
return s.controller.Ping(ctx, req)
return s.pingController.Ping(ctx, req)
}

// Land delegates to the controller
func (s *GatewayServer) Land(ctx context.Context, req *pb.LandRequest) (*pb.LandResponse, error) {
return s.landController.Land(ctx, req)
}

func main() {
Expand Down Expand Up @@ -75,21 +82,42 @@ func run() error {
metricsWgDone.Wait()
}()

// Initialize MySQL storage factory
mysqlDSN := os.Getenv("MYSQL_DSN")
if mysqlDSN == "" {
mysqlDSN = "root:root@tcp(localhost:3306)/submitqueue?parseTime=true"
}
storeFactory, err := mysql.NewFactory(mysql.MySQLParameters{
DSN: mysqlDSN,
MaxOpenConns: 10,
MaxIdleConns: 5,
ConnMaxLifetime: 5 * time.Minute,
})
if err != nil {
return fmt.Errorf("failed to create MySQL storage factory: %w", err)
}
defer storeFactory.Close()

// Create gRPC server
grpcServer := grpc.NewServer()

// Create ping controller and wrap it for gRPC
// Create controllers and wrap them for gRPC
pingController := controller.NewPingController(logger, scope)
landController := controller.NewLandController(logger, scope, storeFactory)
gatewayServer := &GatewayServer{
controller: pingController,
pingController: pingController,
landController: landController,
}
pb.RegisterSubmitQueueGatewayServer(grpcServer, gatewayServer)

// Register reflection service for debugging with grpcurl
reflection.Register(grpcServer)

// Listen on port 8081
port := ":8081"
// Listen on configurable port
port := os.Getenv("PORT")
if port == "" {
port = ":8081"
}
listener, err := net.Listen("tcp", port)
if err != nil {
return fmt.Errorf("failed to listen on port %s: %w", port, err)
Expand Down
7 changes: 5 additions & 2 deletions examples/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ func run() error {
// Register reflection service for debugging with grpcurl
reflection.Register(grpcServer)

// Listen on port 8082
port := ":8082"
// Listen on configurable port
port := os.Getenv("PORT")
if port == "" {
port = ":8082"
}
listener, err := net.Listen("tcp", port)
if err != nil {
return fmt.Errorf("failed to listen on port %s: %w", port, err)
Expand Down
7 changes: 5 additions & 2 deletions examples/server/speculator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ func run() error {
// Register reflection service for debugging with grpcurl
reflection.Register(grpcServer)

// Listen on port 8083
port := ":8083"
// Listen on configurable port
port := os.Getenv("PORT")
if port == "" {
port = ":8083"
}
listener, err := net.Listen("tcp", port)
if err != nil {
return fmt.Errorf("failed to listen on port %s: %w", port, err)
Expand Down
5 changes: 0 additions & 5 deletions extensions/storage/mysql/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ func NewFactory(p MySQLParameters) (storage.StoreFactory, error) {
db.SetConnMaxLifetime(p.ConnMaxLifetime)
}

if err := db.Ping(); err != nil {
db.Close()
return nil, fmt.Errorf("failed to ping MySQL: %w", err)
}

return &factory{
db: db,
requestStore: NewRequestStore(db),
Expand Down
5 changes: 5 additions & 0 deletions extensions/storage/mysql/schema/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
filegroup(
name = "schema",
srcs = glob(["*.sql"]),
visibility = ["//visibility:public"],
)
4 changes: 2 additions & 2 deletions extensions/storage/mysql/schema/request.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ CREATE TABLE IF NOT EXISTS request (
seq BIGINT NOT NULL,
change_source VARCHAR(255) NOT NULL,
change_ids JSON NOT NULL,
land_strategy INT NOT NULL DEFAULT 0,
land_strategy INT NOT NULL,
state INT NOT NULL,
version INT NOT NULL DEFAULT 1,
version INT NOT NULL,
PRIMARY KEY (queue, seq)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
4 changes: 4 additions & 0 deletions gateway/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ go_library(
importpath = "github.com/uber/submitqueue/gateway/controller",
visibility = ["//visibility:public"],
deps = [
"//entities",
"//extensions/storage",
"//gateway/protopb",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//:zap",
Expand All @@ -23,6 +25,8 @@ go_test(
],
embed = [":controller"],
deps = [
"//entities",
"//extensions/storage",
"//gateway/protopb",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
20 changes: 17 additions & 3 deletions gateway/controller/land.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/entities"
"github.com/uber/submitqueue/extensions/storage"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/zap"
)
Expand All @@ -14,13 +16,15 @@ import (
type LandController struct {
logger *zap.Logger
metricsScope tally.Scope
storeFactory storage.StoreFactory
}

// NewLandController creates a new instance of the gateway land controller
func NewLandController(logger *zap.Logger, scope tally.Scope) *LandController {
func NewLandController(logger *zap.Logger, scope tally.Scope, storeFactory storage.StoreFactory) *LandController {
return &LandController{
logger: logger,
metricsScope: scope,
storeFactory: storeFactory,
}
}

Expand All @@ -33,8 +37,18 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan

c.metricsScope.Counter("land_request_count").Inc(1)

// TODO: Implement proper SQID generation and send the request to the appropriate queue. So far unix time to make it sequential.
sqid := fmt.Sprintf("%d", time.Now().Unix())
change := entities.Change{
Source: req.Change.GetSource(),
IDs: req.Change.GetIds(),
}
strategy := entities.RequestLandStrategy(int(req.Strategy))

request, err := c.storeFactory.GetRequestStore().Create(ctx, req.Queue, change, strategy, entities.RequestStateNew)
if err != nil {
return nil, fmt.Errorf("LandController failed to create request for queue=%s: %w", req.Queue, err)
}

sqid := request.GetID()

c.logger.Debug("land request received",
zap.String("queue", req.Queue),
Expand Down
114 changes: 111 additions & 3 deletions gateway/controller/land_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,70 @@ package controller

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/entities"
"github.com/uber/submitqueue/extensions/storage"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/zap"
)

type mockRequestStore struct {
createFunc func(ctx context.Context, queue string, change entities.Change, strategy entities.RequestLandStrategy, state entities.RequestState) (entities.Request, error)
}

func (m *mockRequestStore) Get(ctx context.Context, id string) (entities.Request, error) {
return entities.Request{}, nil
}

func (m *mockRequestStore) Create(ctx context.Context, queue string, change entities.Change, strategy entities.RequestLandStrategy, state entities.RequestState) (entities.Request, error) {
return m.createFunc(ctx, queue, change, strategy, state)
}

func (m *mockRequestStore) UpdateState(ctx context.Context, id string, version int32, newState entities.RequestState) error {
return nil
}

type mockStoreFactory struct {
requestStore storage.RequestStore
}

func (m *mockStoreFactory) GetRequestStore() storage.RequestStore {
return m.requestStore
}

func (m *mockStoreFactory) Close() error {
return nil
}

func TestNewLandController(t *testing.T) {
controller := NewLandController(zap.NewNop(), tally.NoopScope)
factory := &mockStoreFactory{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, queue string, change entities.Change, strategy entities.RequestLandStrategy, state entities.RequestState) (entities.Request, error) {
return entities.Request{}, nil
},
}}
controller := NewLandController(zap.NewNop(), tally.NoopScope, factory)
require.NotNil(t, controller)
}

func TestLand_ReturnsSqid(t *testing.T) {
controller := NewLandController(zap.NewNop(), tally.NoopScope)
factory := &mockStoreFactory{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, queue string, change entities.Change, strategy entities.RequestLandStrategy, state entities.RequestState) (entities.Request, error) {
return entities.Request{
Queue: queue,
Seq: 1,
Change: change,
LandStrategy: strategy,
State: state,
Version: 1,
}, nil
},
}}
controller := NewLandController(zap.NewNop(), tally.NoopScope, factory)
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -26,5 +75,64 @@ func TestLand_ReturnsSqid(t *testing.T) {
resp, err := controller.Land(ctx, req)

require.NoError(t, err)
require.NotEmpty(t, resp.Sqid)
assert.Equal(t, "test-queue/1", resp.Sqid)
}

func TestLand_PassesCorrectParametersToStore(t *testing.T) {
var capturedQueue string
var capturedChange entities.Change
var capturedStrategy entities.RequestLandStrategy
var capturedState entities.RequestState

factory := &mockStoreFactory{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, queue string, change entities.Change, strategy entities.RequestLandStrategy, state entities.RequestState) (entities.Request, error) {
capturedQueue = queue
capturedChange = change
capturedStrategy = strategy
capturedState = state
return entities.Request{
Queue: queue,
Seq: 42,
Change: change,
LandStrategy: strategy,
State: state,
Version: 1,
}, nil
},
}}
controller := NewLandController(zap.NewNop(), tally.NoopScope, factory)
ctx := context.Background()

req := &pb.LandRequest{
Queue: "my-queue",
Change: &pb.Change{Source: "github", Ids: []string{"pr-1", "pr-2"}},
Strategy: pb.Strategy_STRATEGY_REBASE,
}
resp, err := controller.Land(ctx, req)

require.NoError(t, err)
assert.Equal(t, "my-queue", capturedQueue)
assert.Equal(t, "github", capturedChange.Source)
assert.Equal(t, []string{"pr-1", "pr-2"}, capturedChange.IDs)
assert.Equal(t, entities.RequestLandStrategyRebase, capturedStrategy)
assert.Equal(t, entities.RequestStateNew, capturedState)
assert.Equal(t, "my-queue/42", resp.Sqid)
}

func TestLand_ReturnsErrorOnStorageFailure(t *testing.T) {
factory := &mockStoreFactory{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, queue string, change entities.Change, strategy entities.RequestLandStrategy, state entities.RequestState) (entities.Request, error) {
return entities.Request{}, fmt.Errorf("database connection failed")
},
}}
controller := NewLandController(zap.NewNop(), tally.NoopScope, factory)
ctx := context.Background()

req := &pb.LandRequest{
Queue: "test-queue",
Change: &pb.Change{Source: "github", Ids: []string{"123"}},
}
_, err := controller.Land(ctx, req)

require.Error(t, err)
}
Loading