Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ go_deps.from_file(go_mod = "//:go.mod")
# All *direct* Go dependencies of the module have to be listed explicitly
use_repo(
go_deps,
"com_github_go_sql_driver_mysql",
"com_github_gogo_protobuf",
"com_github_stretchr_testify",
"com_github_testcontainers_testcontainers_go",
"com_github_testcontainers_testcontainers_go_modules_mysql",
"com_github_uber_go_tally_v4",
"org_golang_google_grpc",
"org_golang_google_protobuf",
Expand Down
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ integration-test:
@echo "Running all service integration tests..."
@$(BAZEL) test //gateway/integration_tests:integration_tests_test //orchestrator/integration_tests:integration_tests_test //speculator/integration_tests:integration_tests_test --test_output=all

# Run end-to-end tests (requires all services to be running)
# Run end-to-end integration tests (hermetic, no manual server setup needed)
e2e-test:
@echo "Running end-to-end tests..."
@$(BAZEL) test //integration_tests:e2e_test --test_output=all
@echo "Running integration tests..."
@$(BAZEL) test //integration_tests:integration_test --test_output=all

# Clean generated files and binaries
clean:
Expand Down Expand Up @@ -186,7 +186,9 @@ help:
@echo " make integration-test-orchestrator - Test Orchestrator service"
@echo " make integration-test-speculator - Test Speculator service"
@echo " make integration-test - Test all services"
@echo " make e2e-test - Run end-to-end tests"
@echo ""
@echo "End-to-End Tests (hermetic, no setup needed):"
@echo " make e2e-test - Run integration tests with Testcontainers"
@echo ""
@echo "Run Clients:"
@echo " make run-client-gateway - Run gateway client"
Expand Down
15 changes: 9 additions & 6 deletions entities/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,34 @@ import (
// RequestLandStrategy defines the possible source control integration methods.
type RequestLandStrategy int

// do not use iota here, as values should be fixed and consistent across versions.
const (
// RequestLandStrategyDefault lets the server decide based on configuration.
RequestLandStrategyDefault RequestLandStrategy = 0
// RequestLandStrategyRebase rebases commits onto the target branch before landing.
RequestLandStrategyRebase = 1
RequestLandStrategyRebase RequestLandStrategy = 1
// RequestLandStrategySquashRebase squashes commits into a single commit before rebase.
RequestLandStrategySquashRebase = 2
RequestLandStrategySquashRebase RequestLandStrategy = 2
// RequestLandStrategyMerge merges commits into the target branch by creating a separate merge commit, preserving the commit history along with hashes.
RequestLandStrategyMerge = 3
RequestLandStrategyMerge RequestLandStrategy = 3
)

// RequestState defines the possible states of a land request.
type RequestState int

// TODO: define all states
// do not use iota here, as values should be fixed and consistent across versions.
const (
// RequestStateUnknown is the unreachable state. It is set by default when the structure is initialized. It should never be seen in the system.
RequestStateUnknown RequestState = 0
// RequestStateNew is the initial state of a land request. It is confirmed by the system but the processing is not started yet.
RequestStateNew RequestState = 1
// RequestStateProcessing is the state of a land request that is being processed.
RequestStateProcessing = 2
RequestStateProcessing RequestState = 2
// RequestStateLanded is the state of a land request that has been successfully processed and landed. This is the final state.
RequestStateLanded = 3
RequestStateLanded RequestState = 3
// RequestStateError is the state of a land request that has encountered an error. This is the final state.
RequestStateError = 4
RequestStateError RequestState = 4
)

// Change represents a set of related code changes identified by one or more IDs from a particular code change provider, like Github Pull Requests.
Expand Down
1 change: 1 addition & 0 deletions examples/server/gateway/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/uber/submitqueue/examples/server/gateway",
visibility = ["//visibility:private"],
deps = [
"//extensions/storage/mysql",
"//gateway/controller",
"//gateway/protopb",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
40 changes: 34 additions & 6 deletions examples/server/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/extensions/storage/mysql"
"github.com/uber/submitqueue/gateway/controller"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/zap"
Expand All @@ -21,12 +22,18 @@ import (
// GatewayServer wraps the controller and implements the gRPC service interface
type GatewayServer struct {
pb.UnimplementedSubmitQueueGatewayServer
controller *controller.PingController
pingController *controller.PingController
landController *controller.LandController
}

// Ping delegates to the controller
func (s *GatewayServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
return s.controller.Ping(ctx, req)
return s.pingController.Ping(ctx, req)
}

// Land delegates to the controller
func (s *GatewayServer) Land(ctx context.Context, req *pb.LandRequest) (*pb.LandResponse, error) {
return s.landController.Land(ctx, req)
}

func main() {
Expand Down Expand Up @@ -75,21 +82,42 @@ func run() error {
metricsWgDone.Wait()
}()

// Initialize MySQL storage factory
mysqlDSN := os.Getenv("MYSQL_DSN")
if mysqlDSN == "" {
mysqlDSN = "root:root@tcp(localhost:3306)/submitqueue?parseTime=true"
}
storeFactory, err := mysql.NewFactory(mysql.MySQLParameters{
DSN: mysqlDSN,
MaxOpenConns: 10,
MaxIdleConns: 5,
ConnMaxLifetime: 5 * time.Minute,
})
if err != nil {
return fmt.Errorf("failed to create MySQL storage factory: %w", err)
}
defer storeFactory.Close()

// Create gRPC server
grpcServer := grpc.NewServer()

// Create ping controller and wrap it for gRPC
// Create controllers and wrap them for gRPC
pingController := controller.NewPingController(logger, scope)
landController := controller.NewLandController(logger, scope, storeFactory)
gatewayServer := &GatewayServer{
controller: pingController,
pingController: pingController,
landController: landController,
}
pb.RegisterSubmitQueueGatewayServer(grpcServer, gatewayServer)

// Register reflection service for debugging with grpcurl
reflection.Register(grpcServer)

// Listen on port 8081
port := ":8081"
// Listen on configurable port
port := os.Getenv("PORT")
if port == "" {
port = ":8081"
}
listener, err := net.Listen("tcp", port)
if err != nil {
return fmt.Errorf("failed to listen on port %s: %w", port, err)
Expand Down
7 changes: 5 additions & 2 deletions examples/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ func run() error {
// Register reflection service for debugging with grpcurl
reflection.Register(grpcServer)

// Listen on port 8082
port := ":8082"
// Listen on configurable port
port := os.Getenv("PORT")
if port == "" {
port = ":8082"
}
listener, err := net.Listen("tcp", port)
if err != nil {
return fmt.Errorf("failed to listen on port %s: %w", port, err)
Expand Down
7 changes: 5 additions & 2 deletions examples/server/speculator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ func run() error {
// Register reflection service for debugging with grpcurl
reflection.Register(grpcServer)

// Listen on port 8083
port := ":8083"
// Listen on configurable port
port := os.Getenv("PORT")
if port == "" {
port = ":8083"
}
listener, err := net.Listen("tcp", port)
if err != nil {
return fmt.Errorf("failed to listen on port %s: %w", port, err)
Expand Down
1 change: 1 addition & 0 deletions extensions/storage/mysql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ go_library(
deps = [
"//entities",
"//extensions/storage",
"@com_github_go_sql_driver_mysql//:mysql",
],
)
61 changes: 52 additions & 9 deletions extensions/storage/mysql/factory.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,66 @@
package mysql

import "github.com/uber/submitqueue/extensions/storage"
import (
"database/sql"
"fmt"
"time"

// MySQLParameters defines the parameters for the MySQL storage factory.
_ "github.com/go-sql-driver/mysql"

"github.com/uber/submitqueue/extensions/storage"
)

// MySQLParameters defines the configuration for the MySQL storage factory.
// TODO: integrate with configuration system.
type MySQLParameters struct {
}
// DSN is the MySQL Data Source Name.
// Format: [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
DSN string

// NewFactory creates a new MySQL storage factory
func NewFactory(p MySQLParameters) (storage.StoreFactory, error) {
Comment thread
behinddwalls marked this conversation as resolved.
return &factory{
requestStore: NewRequestStore(),
}, nil
// MaxOpenConns sets the maximum number of open connections to the database. 0 means unlimited.
MaxOpenConns int

// MaxIdleConns sets the maximum number of idle connections in the pool. 0 means no idle connections are retained.
MaxIdleConns int

// ConnMaxLifetime sets the maximum amount of time a connection may be reused. 0 means connections are not closed due to age.
ConnMaxLifetime time.Duration
}

type factory struct {
db *sql.DB
requestStore storage.RequestStore
}

// GetRequestStore returns the MySQL-backed RequestStore
// NewFactory creates a new MySQL storage factory.
func NewFactory(p MySQLParameters) (storage.StoreFactory, error) {
db, err := sql.Open("mysql", p.DSN)
if err != nil {
return nil, fmt.Errorf("failed to open MySQL connection: %w", err)
}

if p.MaxOpenConns > 0 {
db.SetMaxOpenConns(p.MaxOpenConns)
}
if p.MaxIdleConns > 0 {
db.SetMaxIdleConns(p.MaxIdleConns)
}
if p.ConnMaxLifetime > 0 {
db.SetConnMaxLifetime(p.ConnMaxLifetime)
}
Comment thread
behinddwalls marked this conversation as resolved.

return &factory{
db: db,
requestStore: NewRequestStore(db),
}, nil
}

// GetRequestStore returns the MySQL-backed RequestStore.
func (f *factory) GetRequestStore() storage.RequestStore {
return f.requestStore
}

// Close closes the underlying database connection.
func (f *factory) Close() error {
return f.db.Close()
}
Loading