From d8f1797ecbd2f56fc14f283b8bd48030591a50bf Mon Sep 17 00:00:00 2001 From: sxwebdev Date: Thu, 9 Apr 2026 17:36:06 +0300 Subject: [PATCH] feat: add service startup priority --- README.md | 37 + go.mod | 10 +- go.sum | 28 +- launcher/launcher.go | 73 +- launcher/launcher_test.go | 1435 +++++++++++++++++ launcher/service.go | 27 +- launcher/service_options.go | 15 + skills/mx/SKILL.md | 1 + skills/mx/architecture.md | 3 +- skills/mx/service-checklist.md | 26 +- .../mx/templates/launcher/launcher-setup.md | 14 + 11 files changed, 1635 insertions(+), 34 deletions(-) create mode 100644 launcher/launcher_test.go diff --git a/README.md b/README.md index 1e1d6db..84abdc0 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ skills repo add tkcrm/mx | Startup timeout | `WithStartupTimeout(d)` | Fail the service if `StartFn` does not signal ready within `d` | | Shutdown timeout (per service) | `WithShutdownTimeout(d)` | Max time to wait for a service to stop | | Global shutdown timeout | `WithGlobalShutdownTimeout(d)` | Hard deadline for the entire graceful shutdown phase | +| Startup priority | `WithStartupPriority(n)` | Group-based startup ordering: same priority starts concurrently, groups run in ascending order | | Stop sequence | `WithRunnerServicesSequence(...)` | `None` (parallel) / `Fifo` / `Lifo` | | Service lookup | `ServicesRunner().Get(name)` | Retrieve a registered service by name at runtime | | Health checker | `types.HealthChecker` interface | Periodic per-service health check, polled on a configurable interval | @@ -164,6 +165,42 @@ func main() { } ``` +### Startup priority + +Services can be assigned a startup priority to control initialization order. Services with the same priority start concurrently within a group. Groups are started sequentially in ascending priority order. Priority 0 (default) services start last, concurrently, after all prioritized groups are ready. + +```go +ln.ServicesRunner().Register( + // Priority 1: DB layer — start concurrently, both must be ready before next group + launcher.NewService( + launcher.WithServiceName("postgres"), + launcher.WithStartupPriority(1), + launcher.WithService(pgService), + ), + launcher.NewService( + launcher.WithServiceName("redis"), + launcher.WithStartupPriority(1), + launcher.WithService(redisService), + ), + // Priority 2: message broker — waits for DB layer to be ready + launcher.NewService( + launcher.WithServiceName("rabbitmq"), + launcher.WithStartupPriority(2), + launcher.WithService(rabbitService), + ), + // Priority 0 (default): application services — start concurrently after all groups + launcher.NewService( + launcher.WithServiceName("http-server"), + launcher.WithService(httpService), + ), + launcher.NewService( + launcher.WithServiceName("grpc-server"), + launcher.WithService(grpcService), + ), +) +// Start order: (postgres + redis) → rabbitmq → (http + grpc concurrently) +``` + ### Graceful shutdown The first signal (SIGTERM / SIGINT / SIGQUIT) starts a graceful shutdown. A second signal forces immediate exit. diff --git a/go.mod b/go.mod index 7d38f86..ac573c9 100644 --- a/go.mod +++ b/go.mod @@ -19,16 +19,16 @@ require ( github.com/prometheus/common v0.67.5 // indirect github.com/prometheus/procfs v0.20.1 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/otel v1.42.0 // indirect - go.opentelemetry.io/otel/metric v1.42.0 // indirect - go.opentelemetry.io/otel/trace v1.42.0 // indirect + go.opentelemetry.io/otel v1.43.0 // indirect + go.opentelemetry.io/otel/metric v1.43.0 // indirect + go.opentelemetry.io/otel/trace v1.43.0 // indirect go.yaml.in/yaml/v2 v2.4.4 // indirect - golang.org/x/sys v0.42.0 // indirect + golang.org/x/sys v0.43.0 // indirect google.golang.org/protobuf v1.36.11 // indirect ) require ( github.com/prometheus/client_golang v1.23.2 - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 go.uber.org/multierr v1.11.0 // indirect ) diff --git a/go.sum b/go.sum index 8995d22..f719884 100644 --- a/go.sum +++ b/go.sum @@ -37,18 +37,18 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0 h1:OyrsyzuttWTSur2qN/Lm0m2a8yqyIjUVBZcxFPuXq2o= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0/go.mod h1:C2NGBr+kAB4bk3xtMXfZ94gqFDtg/GkI7e9zqGh5Beg= -go.opentelemetry.io/otel v1.42.0 h1:lSQGzTgVR3+sgJDAU/7/ZMjN9Z+vUip7leaqBKy4sho= -go.opentelemetry.io/otel v1.42.0/go.mod h1:lJNsdRMxCUIWuMlVJWzecSMuNjE7dOYyWlqOXWkdqCc= -go.opentelemetry.io/otel/metric v1.42.0 h1:2jXG+3oZLNXEPfNmnpxKDeZsFI5o4J+nz6xUlaFdF/4= -go.opentelemetry.io/otel/metric v1.42.0/go.mod h1:RlUN/7vTU7Ao/diDkEpQpnz3/92J9ko05BIwxYa2SSI= -go.opentelemetry.io/otel/sdk v1.42.0 h1:LyC8+jqk6UJwdrI/8VydAq/hvkFKNHZVIWuslJXYsDo= -go.opentelemetry.io/otel/sdk v1.42.0/go.mod h1:rGHCAxd9DAph0joO4W6OPwxjNTYWghRWmkHuGbayMts= -go.opentelemetry.io/otel/sdk/metric v1.42.0 h1:D/1QR46Clz6ajyZ3G8SgNlTJKBdGp84q9RKCAZ3YGuA= -go.opentelemetry.io/otel/sdk/metric v1.42.0/go.mod h1:Ua6AAlDKdZ7tdvaQKfSmnFTdHx37+J4ba8MwVCYM5hc= -go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4LenLmOYY= -go.opentelemetry.io/otel/trace v1.42.0/go.mod h1:f3K9S+IFqnumBkKhRJMeaZeNk9epyhnCmQh/EysQCdc= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 h1:CqXxU8VOmDefoh0+ztfGaymYbhdB/tT3zs79QaZTNGY= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0/go.mod h1:BuhAPThV8PBHBvg8ZzZ/Ok3idOdhWIodywz2xEcRbJo= +go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= +go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0= +go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM= +go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY= +go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg= +go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg= +go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw= +go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A= +go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A= +go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -59,8 +59,8 @@ go.yaml.in/yaml/v2 v2.4.4 h1:tuyd0P+2Ont/d6e2rl3be67goVK4R6deVxCUX5vyPaQ= go.yaml.in/yaml/v2 v2.4.4/go.mod h1:gMZqIpDtDqOfM0uNfy0SkpRhvUryYH0Z6wdMYcacYXQ= golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= -golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= -golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/launcher/launcher.go b/launcher/launcher.go index 39a004e..1f4962e 100644 --- a/launcher/launcher.go +++ b/launcher/launcher.go @@ -9,6 +9,7 @@ import ( "os/signal" "slices" "sync" + "time" "github.com/tkcrm/mx/launcher/ops" "golang.org/x/sync/errgroup" @@ -80,18 +81,74 @@ func (l *launcher) Run() error { //nolint:cyclop } } - // start service + // group services by startup priority + groups := make(map[int][]*Service) + for _, svc := range l.servicesRunner.Services() { + p := svc.Options().StartupPriority + groups[p] = append(groups[p], svc) + } + + // collect and sort unique priorities (excluding 0) + var priorities []int + for p := range groups { + if p > 0 { + priorities = append(priorities, p) + } + } + slices.Sort(priorities) + errChan := make(chan error, len(l.servicesRunner.Services())) graceWait := new(sync.WaitGroup) - graceWait.Add(len(l.servicesRunner.Services())) - for i := range l.servicesRunner.Services() { - go func(svc *Service) { + + startSvc := func(svc *Service) { + graceWait.Add(1) + go func() { defer graceWait.Done() if err := svc.Start(); err != nil { - err := fmt.Errorf("failed to start service [%s]: %w", svc.Name(), err) - errChan <- err + errChan <- fmt.Errorf("failed to start service [%s]: %w", svc.Name(), err) + } + }() + } + + // start priority groups sequentially; within each group — concurrently + for _, p := range priorities { + group := groups[p] + + for _, svc := range group { + startSvc(svc) + } + + // wait for ALL services in this group to become ready + for _, svc := range group { + select { + case <-svc.Ready(): + case err := <-errChan: + l.cancelFn() + graceWait.Wait() + return err + case <-l.opts.Context.Done(): + graceWait.Wait() + return l.opts.Context.Err() } - }(l.servicesRunner.Services()[i]) + } + + // Yield to let any immediate service failures propagate. + // A service that fails synchronously closes readyCh (via deferred Start) + // before the error reaches errChan. This sleep gives startSvc goroutines + // time to deliver the error after Start() returns. + time.Sleep(time.Nanosecond) + select { + case err := <-errChan: + l.cancelFn() + graceWait.Wait() + return err + default: + } + } + + // start priority-0 services — all concurrently + for _, svc := range groups[0] { + startSvc(svc) } if l.opts.AppStartStopLog { @@ -101,6 +158,8 @@ func (l *launcher) Run() error { //nolint:cyclop // after start for _, fn := range l.opts.AfterStart { if err := fn(); err != nil { + l.cancelFn() + graceWait.Wait() return err } } diff --git a/launcher/launcher_test.go b/launcher/launcher_test.go new file mode 100644 index 0000000..62cb08a --- /dev/null +++ b/launcher/launcher_test.go @@ -0,0 +1,1435 @@ +package launcher + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "testing" + "testing/synctest" + "time" +) + +// newTestLauncher creates a launcher suitable for synctest (signals disabled). +func newTestLauncher(opts ...Option) ILauncher { + defaults := []Option{WithSignal(false)} + return New(append(defaults, opts...)...) +} + +// blockingStart is a StartFn that blocks until ctx is cancelled. +func blockingStart(ctx context.Context) error { + <-ctx.Done() + return nil +} + +// noopStop is a StopFn that does nothing. +func noopStop(_ context.Context) error { return nil } + +// --- Service lifecycle tests --- + +func TestService_StartStop_Basic(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + svc := NewService( + WithServiceName("basic"), + WithStart(blockingStart), + WithStop(noopStop), + ) + + if svc.State() != ServiceStateIdle { + t.Fatalf("initial state = %v; want idle", svc.State()) + } + + ctx, cancel := context.WithCancel(context.Background()) + svc.Options().Context = ctx + + errCh := make(chan error, 1) + go func() { errCh <- svc.Start() }() + + synctest.Wait() + + if svc.State() != ServiceStateRunning { + t.Fatalf("after start, state = %v; want running", svc.State()) + } + + select { + case <-svc.Ready(): + default: + t.Fatal("Ready() channel not closed after start") + } + + cancel() + synctest.Wait() + + if err := <-errCh; err != nil { + t.Fatalf("Start returned error: %v", err) + } + + if err := svc.Stop(); err != nil { + t.Fatalf("Stop returned error: %v", err) + } + + if svc.State() != ServiceStateStopped { + t.Fatalf("after stop, state = %v; want stopped", svc.State()) + } + }) +} + +func TestService_NilStartFn(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + svc := NewService(WithServiceName("nil-start")) + if err := svc.Start(); err != nil { + t.Fatalf("Start with nil StartFn should return nil, got: %v", err) + } + select { + case <-svc.Ready(): + default: + t.Fatal("Ready() not closed after nil StartFn") + } + }) +} + +func TestService_Disabled(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + started := false + svc := NewService( + WithServiceName("disabled"), + WithStart(func(ctx context.Context) error { + started = true + <-ctx.Done() + return nil + }), + WithStop(noopStop), + WithEnabled(false), + ) + svc.Options().Context = context.Background() + + if err := svc.Start(); err != nil { + t.Fatalf("Start returned error: %v", err) + } + if started { + t.Fatal("disabled service should not have started") + } + }) +} + +func TestService_DoubleStart_Ignored(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var startCount atomic.Int32 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + svc := NewService( + WithServiceName("double"), + WithStart(func(ctx context.Context) error { + startCount.Add(1) + <-ctx.Done() + return nil + }), + WithStop(noopStop), + ) + svc.Options().Context = ctx + + go func() { _ = svc.Start() }() + synctest.Wait() + + if err := svc.Start(); err != nil { + t.Fatalf("second Start returned error: %v", err) + } + + if startCount.Load() != 1 { + t.Fatalf("StartFn called %d times; want 1", startCount.Load()) + } + }) +} + +func TestService_StopIdempotent(t *testing.T) { + svc := NewService( + WithServiceName("idempotent-stop"), + WithStart(blockingStart), + WithStop(noopStop), + ) + if err := svc.Stop(); err != nil { + t.Fatalf("Stop on idle service returned error: %v", err) + } +} + +// --- Hooks tests --- + +func TestService_BeforeStartHook_Error(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + hookErr := errors.New("before start failed") + svc := NewService( + WithServiceName("hook-err"), + WithStart(blockingStart), + WithStop(noopStop), + WithServiceBeforeStart(func() error { return hookErr }), + ) + svc.Options().Context = context.Background() + + err := svc.Start() + if !errors.Is(err, hookErr) { + t.Fatalf("Start error = %v; want %v", err, hookErr) + } + if svc.State() != ServiceStateFailed { + t.Fatalf("state = %v; want failed", svc.State()) + } + + }) +} + +func TestService_AfterStartHook_Error(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + hookErr := errors.New("after start failed") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + svc := NewService( + WithServiceName("after-hook-err"), + WithStart(blockingStart), + WithStop(noopStop), + WithServiceAfterStart(func() error { return hookErr }), + ) + svc.Options().Context = ctx + + err := svc.Start() + if !errors.Is(err, hookErr) { + t.Fatalf("Start error = %v; want %v", err, hookErr) + } + if svc.State() != ServiceStateFailed { + t.Fatalf("state = %v; want failed", svc.State()) + } + }) +} + +func TestService_BeforeStopAfterStop_Hooks(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var order []string + + ctx, cancel := context.WithCancel(context.Background()) + + svc := NewService( + WithServiceName("stop-hooks"), + WithStart(blockingStart), + WithStop(noopStop), + WithServiceBeforeStop(func() error { + order = append(order, "before") + return nil + }), + WithServiceAfterStop(func() error { + order = append(order, "after") + return nil + }), + ) + svc.Options().Context = ctx + + go func() { _ = svc.Start() }() + synctest.Wait() + + cancel() + synctest.Wait() + + if err := svc.Stop(); err != nil { + t.Fatalf("Stop error: %v", err) + } + + if len(order) != 2 || order[0] != "before" || order[1] != "after" { + t.Fatalf("hook order = %v; want [before after]", order) + } + }) +} + +func TestService_AfterStartFinished_Hook(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + called := false + svc := NewService( + WithServiceName("after-finished"), + WithStart(func(_ context.Context) error { return nil }), + WithStop(noopStop), + WithServiceAfterStartFinished(func() error { + called = true + return nil + }), + ) + svc.Options().Context = context.Background() + + if err := svc.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + if !called { + t.Fatal("AfterStartFinished hook was not called") + } + }) +} + +// --- Startup timeout --- + +func TestService_StartupTimeout(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + svc := NewService( + WithServiceName("slow-start"), + WithStart(func(ctx context.Context) error { + // Never finishes on its own — only stops via context cancel + <-ctx.Done() + return nil + }), + WithStop(noopStop), + WithStartupTimeout(5*time.Second), + ) + svc.Options().Context = ctx + + errCh := make(chan error, 1) + go func() { errCh <- svc.Start() }() + + synctest.Wait() + + err := <-errCh + if err == nil { + t.Fatal("expected startup timeout error") + } + if svc.State() != ServiceStateFailed { + t.Fatalf("state = %v; want failed", svc.State()) + } + }) +} + +// --- Shutdown timeout --- + +func TestService_ShutdownTimeout(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + svc := NewService( + WithServiceName("slow-stop"), + WithStart(blockingStart), + WithStop(func(ctx context.Context) error { + <-ctx.Done() + return nil + }), + WithShutdownTimeout(3*time.Second), + ) + + ctx, cancel := context.WithCancel(context.Background()) + svc.Options().Context = ctx + + go func() { _ = svc.Start() }() + synctest.Wait() + + cancel() + synctest.Wait() + + if err := svc.Stop(); err != nil { + t.Fatalf("Stop error: %v", err) + } + if svc.State() != ServiceStateStopped { + t.Fatalf("state = %v; want stopped", svc.State()) + } + }) +} + +// --- Restart policy --- + +func TestService_RestartOnFailure(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var attempts atomic.Int32 + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + svc := NewService( + WithServiceName("restart-fail"), + WithStart(func(ctx context.Context) error { + n := attempts.Add(1) + if n <= 2 { + return errors.New("transient error") + } + <-ctx.Done() + return nil + }), + WithStop(noopStop), + WithRestartPolicy(RestartPolicy{ + Mode: RestartOnFailure, + Delay: time.Second, + MaxDelay: 4 * time.Second, + }), + ) + svc.Options().Context = ctx + + errCh := make(chan error, 1) + go func() { errCh <- svc.Start() }() + + // Advance fake time past all backoff delays (1s + 2s = 3s) + time.Sleep(10 * time.Second) + synctest.Wait() + + if svc.State() != ServiceStateRunning { + t.Fatalf("state = %v; want running", svc.State()) + } + if attempts.Load() != 3 { + t.Fatalf("attempts = %d; want 3", attempts.Load()) + } + + cancel() + synctest.Wait() + + if err := <-errCh; err != nil { + t.Fatalf("Start returned error: %v", err) + } + }) +} + +func TestService_RestartOnFailure_CleanExit_NoRestart(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var attempts atomic.Int32 + + svc := NewService( + WithServiceName("restart-clean"), + WithStart(func(_ context.Context) error { + attempts.Add(1) + return nil + }), + WithStop(noopStop), + WithRestartPolicy(RestartPolicy{ + Mode: RestartOnFailure, + Delay: time.Second, + }), + ) + svc.Options().Context = context.Background() + + if err := svc.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + if attempts.Load() != 1 { + t.Fatalf("attempts = %d; want 1 (no restart on clean exit)", attempts.Load()) + } + }) +} + +func TestService_RestartAlways(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var attempts atomic.Int32 + + ctx, cancel := context.WithCancel(context.Background()) + + svc := NewService( + WithServiceName("restart-always"), + WithStart(func(ctx context.Context) error { + n := attempts.Add(1) + if n < 3 { + return nil + } + <-ctx.Done() + return nil + }), + WithStop(noopStop), + WithRestartPolicy(RestartPolicy{ + Mode: RestartAlways, + Delay: time.Second, + }), + ) + svc.Options().Context = ctx + + errCh := make(chan error, 1) + go func() { errCh <- svc.Start() }() + + // Advance fake time past backoff delays (1s + 2s = 3s) + time.Sleep(10 * time.Second) + synctest.Wait() + + if attempts.Load() != 3 { + t.Fatalf("attempts = %d; want 3", attempts.Load()) + } + + cancel() + synctest.Wait() + <-errCh + }) +} + +func TestService_RestartMaxRetries(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var attempts atomic.Int32 + failErr := errors.New("always fail") + + svc := NewService( + WithServiceName("max-retries"), + WithStart(func(_ context.Context) error { + attempts.Add(1) + return failErr + }), + WithStop(noopStop), + WithRestartPolicy(RestartPolicy{ + Mode: RestartOnFailure, + MaxRetries: 3, + Delay: time.Second, + }), + ) + svc.Options().Context = context.Background() + + err := svc.Start() + if err == nil { + t.Fatal("expected error after max retries") + } + // 1 initial + 3 retries = 4 total attempts + if attempts.Load() != 4 { + t.Fatalf("attempts = %d; want 4", attempts.Load()) + } + if svc.State() != ServiceStateFailed { + t.Fatalf("state = %v; want failed", svc.State()) + } + }) +} + +func TestService_RestartBackoff_ExponentialDelay(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var timestamps []time.Time + + ctx, cancel := context.WithCancel(context.Background()) + + svc := NewService( + WithServiceName("backoff"), + WithStart(func(ctx context.Context) error { + timestamps = append(timestamps, time.Now()) + if len(timestamps) < 4 { + return errors.New("fail") + } + <-ctx.Done() + return nil + }), + WithStop(noopStop), + WithRestartPolicy(RestartPolicy{ + Mode: RestartOnFailure, + Delay: time.Second, + MaxDelay: 10 * time.Second, + }), + ) + svc.Options().Context = ctx + + errCh := make(chan error, 1) + go func() { errCh <- svc.Start() }() + + // Advance fake time past all backoff delays (1s + 2s + 4s = 7s) + time.Sleep(10 * time.Second) + synctest.Wait() + + cancel() + synctest.Wait() + <-errCh + + if len(timestamps) != 4 { + t.Fatalf("got %d attempts; want 4", len(timestamps)) + } + + // Delays: attempt 0→1: 1s, attempt 1→2: 2s, attempt 2→3: 4s + d1 := timestamps[1].Sub(timestamps[0]) + d2 := timestamps[2].Sub(timestamps[1]) + d3 := timestamps[3].Sub(timestamps[2]) + + if d1 != time.Second { + t.Fatalf("delay 1 = %v; want 1s", d1) + } + if d2 != 2*time.Second { + t.Fatalf("delay 2 = %v; want 2s", d2) + } + if d3 != 4*time.Second { + t.Fatalf("delay 3 = %v; want 4s", d3) + } + }) +} + +// --- Launcher tests --- + +func TestLauncher_RunStop(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ln := newTestLauncher() + + svc := NewService( + WithServiceName("svc"), + WithStart(blockingStart), + WithStop(noopStop), + ) + ln.ServicesRunner().Register(svc) + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + + synctest.Wait() + + if svc.State() != ServiceStateRunning { + t.Fatalf("state = %v; want running", svc.State()) + } + + ln.Stop() + synctest.Wait() + + if err := <-errCh; err != nil { + t.Fatalf("Run returned error: %v", err) + } + }) +} + +func TestLauncher_NoServices(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ln := newTestLauncher() + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + + synctest.Wait() + + ln.Stop() + synctest.Wait() + + if err := <-errCh; err != nil { + t.Fatalf("Run with no services returned error: %v", err) + } + }) +} + +func TestLauncher_ServiceError_Propagates(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + svcErr := errors.New("service crashed") + ln := newTestLauncher() + + ln.ServicesRunner().Register( + NewService( + WithServiceName("crasher"), + WithStart(func(_ context.Context) error { return svcErr }), + WithStop(noopStop), + ), + ) + + err := ln.Run() + if err == nil { + t.Fatal("expected error from Run") + } + if !errors.Is(err, svcErr) { + t.Fatalf("Run error = %v; want wrapping %v", err, svcErr) + } + }) +} + +func TestLauncher_BeforeStartHook_Error(t *testing.T) { + hookErr := errors.New("before start hook failed") + ln := newTestLauncher( + WithBeforeStart(func() error { return hookErr }), + ) + + ln.ServicesRunner().Register( + NewService( + WithServiceName("svc"), + WithStart(blockingStart), + WithStop(noopStop), + ), + ) + + err := ln.Run() + if !errors.Is(err, hookErr) { + t.Fatalf("Run error = %v; want %v", err, hookErr) + } +} + +func TestLauncher_AfterStartHook_Error(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + hookErr := errors.New("after start hook failed") + ln := newTestLauncher( + WithAfterStart(func() error { return hookErr }), + ) + + ln.ServicesRunner().Register( + NewService( + WithServiceName("svc"), + WithStart(blockingStart), + WithStop(noopStop), + ), + ) + + err := ln.Run() + if !errors.Is(err, hookErr) { + t.Fatalf("Run error = %v; want %v", err, hookErr) + } + }) +} + +func TestLauncher_BeforeStopAfterStop_Hooks(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var order []string + var mu sync.Mutex + + appendOrder := func(s string) { + mu.Lock() + order = append(order, s) + mu.Unlock() + } + + ln := newTestLauncher( + WithBeforeStop(func() error { appendOrder("before-stop"); return nil }), + WithAfterStop(func() error { appendOrder("after-stop"); return nil }), + ) + + ln.ServicesRunner().Register( + NewService( + WithServiceName("svc"), + WithStart(blockingStart), + WithStop(noopStop), + ), + ) + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + synctest.Wait() + + ln.Stop() + synctest.Wait() + + if err := <-errCh; err != nil { + t.Fatalf("Run error: %v", err) + } + + mu.Lock() + defer mu.Unlock() + if len(order) != 2 || order[0] != "before-stop" || order[1] != "after-stop" { + t.Fatalf("hook order = %v; want [before-stop after-stop]", order) + } + }) +} + +func TestLauncher_AddHooksAfterCreation(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var called []string + + ln := newTestLauncher() + ln.AddBeforeStartHooks(func() error { called = append(called, "before"); return nil }) + ln.AddAfterStartHooks(func() error { called = append(called, "after-start"); return nil }) + ln.AddBeforeStopHooks(func() error { called = append(called, "before-stop"); return nil }) + ln.AddAfterStopHooks(func() error { called = append(called, "after-stop"); return nil }) + + // Nil hooks should be ignored + ln.AddBeforeStartHooks(nil) + + ln.ServicesRunner().Register( + NewService( + WithServiceName("svc"), + WithStart(blockingStart), + WithStop(noopStop), + ), + ) + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + synctest.Wait() + + ln.Stop() + synctest.Wait() + <-errCh + + want := []string{"before", "after-start", "before-stop", "after-stop"} + if len(called) != len(want) { + t.Fatalf("hooks = %v; want %v", called, want) + } + for i := range want { + if called[i] != want[i] { + t.Fatalf("hooks[%d] = %q; want %q", i, called[i], want[i]) + } + } + }) +} + +func TestLauncher_ContextCancel_StopsLauncher(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + ln := newTestLauncher(WithContext(ctx)) + + ln.ServicesRunner().Register( + NewService( + WithServiceName("svc"), + WithStart(blockingStart), + WithStop(noopStop), + ), + ) + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + synctest.Wait() + + cancel() + synctest.Wait() + + if err := <-errCh; err != nil { + t.Fatalf("Run error: %v", err) + } + }) +} + +// --- Stop sequence tests --- + +func TestLauncher_StopSequence_Fifo(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var order []string + var mu sync.Mutex + + ln := newTestLauncher( + WithRunnerServicesSequence(RunnerServicesSequenceFifo), + ) + + for _, name := range []string{"a", "b", "c"} { + n := name + ln.ServicesRunner().Register( + NewService( + WithServiceName(n), + WithStart(blockingStart), + WithStop(func(_ context.Context) error { + mu.Lock() + order = append(order, n) + mu.Unlock() + return nil + }), + ), + ) + } + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + synctest.Wait() + + ln.Stop() + synctest.Wait() + <-errCh + + mu.Lock() + defer mu.Unlock() + want := []string{"a", "b", "c"} + if len(order) != 3 || order[0] != "a" || order[1] != "b" || order[2] != "c" { + t.Fatalf("stop order = %v; want %v", order, want) + } + }) +} + +func TestLauncher_StopSequence_Lifo(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var order []string + var mu sync.Mutex + + ln := newTestLauncher( + WithRunnerServicesSequence(RunnerServicesSequenceLifo), + ) + + for _, name := range []string{"a", "b", "c"} { + n := name + ln.ServicesRunner().Register( + NewService( + WithServiceName(n), + WithStart(blockingStart), + WithStop(func(_ context.Context) error { + mu.Lock() + order = append(order, n) + mu.Unlock() + return nil + }), + ), + ) + } + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + synctest.Wait() + + ln.Stop() + synctest.Wait() + <-errCh + + mu.Lock() + defer mu.Unlock() + want := []string{"c", "b", "a"} + if len(order) != 3 || order[0] != "c" || order[1] != "b" || order[2] != "a" { + t.Fatalf("stop order = %v; want %v", order, want) + } + }) +} + +func TestLauncher_StopSequence_None_Parallel(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var stopCount atomic.Int32 + + ln := newTestLauncher( + WithRunnerServicesSequence(RunnerServicesSequenceNone), + ) + + for _, name := range []string{"a", "b", "c"} { + ln.ServicesRunner().Register( + NewService( + WithServiceName(name), + WithStart(blockingStart), + WithStop(func(_ context.Context) error { + stopCount.Add(1) + return nil + }), + ), + ) + } + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + synctest.Wait() + + ln.Stop() + synctest.Wait() + <-errCh + + if stopCount.Load() != 3 { + t.Fatalf("stopped %d services; want 3", stopCount.Load()) + } + }) +} + +// --- Startup priority tests --- + +func TestLauncher_StartupPriority_GroupOrder(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var order []string + var mu sync.Mutex + + appendOrder := func(s string) { + mu.Lock() + order = append(order, s) + mu.Unlock() + } + + ln := newTestLauncher() + + // Priority 2 + ln.ServicesRunner().Register( + NewService( + WithServiceName("broker"), + WithStartupPriority(2), + WithStart(func(ctx context.Context) error { + appendOrder("broker-started") + <-ctx.Done() + return nil + }), + WithStop(noopStop), + ), + ) + + // Priority 1 + ln.ServicesRunner().Register( + NewService( + WithServiceName("db"), + WithStartupPriority(1), + WithStart(func(ctx context.Context) error { + appendOrder("db-started") + <-ctx.Done() + return nil + }), + WithStop(noopStop), + ), + ) + + // Priority 0 (default) + ln.ServicesRunner().Register( + NewService( + WithServiceName("http"), + WithStart(func(ctx context.Context) error { + appendOrder("http-started") + <-ctx.Done() + return nil + }), + WithStop(noopStop), + ), + ) + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + + // Advance fake time past the inter-group yield sleeps + time.Sleep(time.Millisecond) + synctest.Wait() + + ln.Stop() + synctest.Wait() + <-errCh + + mu.Lock() + defer mu.Unlock() + + if len(order) != 3 { + t.Fatalf("order = %v; want 3 entries", order) + } + if order[0] != "db-started" { + t.Fatalf("order[0] = %q; want db-started", order[0]) + } + if order[1] != "broker-started" { + t.Fatalf("order[1] = %q; want broker-started", order[1]) + } + if order[2] != "http-started" { + t.Fatalf("order[2] = %q; want http-started", order[2]) + } + }) +} + +func TestLauncher_StartupPriority_SameGroup_Concurrent(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var readyCount atomic.Int32 + bothReady := make(chan struct{}) + + ln := newTestLauncher() + + makeSvc := func(name string) *Service { + return NewService( + WithServiceName(name), + WithStartupPriority(1), + WithStart(func(ctx context.Context) error { + if readyCount.Add(1) == 2 { + close(bothReady) + } + <-ctx.Done() + return nil + }), + WithStop(noopStop), + ) + } + + ln.ServicesRunner().Register(makeSvc("pg"), makeSvc("redis")) + + var p0Started atomic.Bool + ln.ServicesRunner().Register( + NewService( + WithServiceName("app"), + WithStart(func(ctx context.Context) error { + p0Started.Store(true) + <-ctx.Done() + return nil + }), + WithStop(noopStop), + ), + ) + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + + // Advance fake time past inter-group yield sleeps + time.Sleep(time.Millisecond) + synctest.Wait() + + select { + case <-bothReady: + default: + t.Fatal("both priority-1 services should have started concurrently") + } + + if !p0Started.Load() { + t.Fatal("priority-0 service should have started after group 1") + } + + ln.Stop() + synctest.Wait() + <-errCh + }) +} + +func TestLauncher_StartupPriority_GroupFailure_AbortsNext(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + svcErr := errors.New("db failed") + var p2Started atomic.Bool + + ln := newTestLauncher() + + ln.ServicesRunner().Register( + NewService( + WithServiceName("db"), + WithStartupPriority(1), + WithStart(func(_ context.Context) error { return svcErr }), + WithStop(noopStop), + ), + ) + + ln.ServicesRunner().Register( + NewService( + WithServiceName("broker"), + WithStartupPriority(2), + WithStart(func(ctx context.Context) error { + p2Started.Store(true) + <-ctx.Done() + return nil + }), + WithStop(noopStop), + ), + ) + + err := ln.Run() + if err == nil { + t.Fatal("expected error from Run") + } + if !errors.Is(err, svcErr) { + t.Fatalf("Run error = %v; want wrapping %v", err, svcErr) + } + if p2Started.Load() { + t.Fatal("priority-2 service should not have started after priority-1 failure") + } + }) +} + +func TestLauncher_StartupPriority_AllDefault_Concurrent(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var startCount atomic.Int32 + allStarted := make(chan struct{}) + const n = 5 + + ln := newTestLauncher() + + for i := range n { + ln.ServicesRunner().Register( + NewService( + WithServiceName(fmt.Sprintf("svc-%d", i)), + WithStart(func(ctx context.Context) error { + if startCount.Add(1) == n { + close(allStarted) + } + <-ctx.Done() + return nil + }), + WithStop(noopStop), + ), + ) + } + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + synctest.Wait() + + select { + case <-allStarted: + default: + t.Fatalf("expected all %d services to start concurrently, got %d", n, startCount.Load()) + } + + ln.Stop() + synctest.Wait() + <-errCh + }) +} + +func TestLauncher_StartupPriority_MultipleGroups(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var seq atomic.Int64 + var g1Seq, g2Seq, g3Seq int64 + + ln := newTestLauncher() + + makeGroupSvc := func(name string, priority int, seqDst *int64) *Service { + return NewService( + WithServiceName(name), + WithStartupPriority(priority), + WithStart(func(ctx context.Context) error { + *seqDst = seq.Add(1) + <-ctx.Done() + return nil + }), + WithStop(noopStop), + ) + } + + ln.ServicesRunner().Register( + makeGroupSvc("g3-svc", 3, &g3Seq), + makeGroupSvc("g1-svc", 1, &g1Seq), + makeGroupSvc("g2-svc", 2, &g2Seq), + ) + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + + time.Sleep(time.Millisecond) + synctest.Wait() + + if g1Seq >= g2Seq || g2Seq >= g3Seq { + t.Fatalf("groups out of order: g1=%d, g2=%d, g3=%d", g1Seq, g2Seq, g3Seq) + } + + ln.Stop() + synctest.Wait() + <-errCh + }) +} + +// --- Services runner tests --- + +func TestServicesRunner_Get(t *testing.T) { + ln := newTestLauncher() + + svc := NewService( + WithServiceName("findme"), + WithStart(blockingStart), + WithStop(noopStop), + ) + ln.ServicesRunner().Register(svc) + + found, ok := ln.ServicesRunner().Get("findme") + if !ok { + t.Fatal("service not found") + } + if found.Name() != "findme" { + t.Fatalf("found service name = %q; want findme", found.Name()) + } + + _, ok = ln.ServicesRunner().Get("nonexistent") + if ok { + t.Fatal("nonexistent service should not be found") + } +} + +func TestServicesRunner_RegisterNil(t *testing.T) { + ln := newTestLauncher() + ln.ServicesRunner().Register(nil) + + if len(ln.ServicesRunner().Services()) != 0 { + t.Fatal("nil service should not be registered") + } +} + +func TestServicesRunner_DisabledService_Skipped(t *testing.T) { + ln := newTestLauncher() + + ln.ServicesRunner().Register( + NewService( + WithServiceName("disabled"), + WithStart(blockingStart), + WithStop(noopStop), + WithEnabled(false), + ), + ) + + if len(ln.ServicesRunner().Services()) != 0 { + t.Fatal("disabled service should not be registered") + } +} + +// --- Multiple services with errors --- + +func TestLauncher_MultipleServices_OneFailsAfterStart(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + svcErr := errors.New("service-b crashed") + + ln := newTestLauncher() + + ln.ServicesRunner().Register( + NewService( + WithServiceName("a"), + WithStart(blockingStart), + WithStop(noopStop), + ), + NewService( + WithServiceName("b"), + WithStart(func(ctx context.Context) error { + time.Sleep(time.Second) + return svcErr + }), + WithStop(noopStop), + ), + ) + + err := ln.Run() + if err == nil { + t.Fatal("expected error from Run") + } + if !errors.Is(err, svcErr) { + t.Fatalf("Run error = %v; want wrapping %v", err, svcErr) + } + }) +} + +// --- Stop error from hooks --- + +func TestLauncher_BeforeStopHook_Error(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + hookErr := errors.New("before stop failed") + ln := newTestLauncher( + WithBeforeStop(func() error { return hookErr }), + ) + + ln.ServicesRunner().Register( + NewService( + WithServiceName("svc"), + WithStart(blockingStart), + WithStop(noopStop), + ), + ) + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + synctest.Wait() + + ln.Stop() + synctest.Wait() + + err := <-errCh + if !errors.Is(err, hookErr) { + t.Fatalf("Run error = %v; want wrapping %v", err, hookErr) + } + }) +} + +func TestLauncher_AfterStopHook_Error(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + hookErr := errors.New("after stop failed") + ln := newTestLauncher( + WithAfterStop(func() error { return hookErr }), + ) + + ln.ServicesRunner().Register( + NewService( + WithServiceName("svc"), + WithStart(blockingStart), + WithStop(noopStop), + ), + ) + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + synctest.Wait() + + ln.Stop() + synctest.Wait() + + err := <-errCh + if !errors.Is(err, hookErr) { + t.Fatalf("Run error = %v; want wrapping %v", err, hookErr) + } + }) +} + +// --- Service stop error --- + +func TestService_StopError_Propagates(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + stopErr := errors.New("stop failed") + ctx, cancel := context.WithCancel(context.Background()) + + svc := NewService( + WithServiceName("stop-err"), + WithStart(blockingStart), + WithStop(func(_ context.Context) error { return stopErr }), + ) + svc.Options().Context = ctx + + go func() { _ = svc.Start() }() + synctest.Wait() + + cancel() + synctest.Wait() + + err := svc.Stop() + if !errors.Is(err, stopErr) { + t.Fatalf("Stop error = %v; want %v", err, stopErr) + } + if svc.State() != ServiceStateFailed { + t.Fatalf("state = %v; want failed", svc.State()) + } + }) +} + +// --- Ready channel edge cases --- + +func TestService_ReadyCh_ClosedOnStartError(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + svc := NewService( + WithServiceName("fail-start"), + WithStart(func(_ context.Context) error { return errors.New("boom") }), + WithStop(noopStop), + ) + svc.Options().Context = context.Background() + + _ = svc.Start() + + select { + case <-svc.Ready(): + default: + t.Fatal("Ready() should be closed even after StartFn error") + } + }) +} + +// --- WithService wrapper --- + +type testIService struct { + name string + started atomic.Bool + stopped atomic.Bool +} + +func (s *testIService) Name() string { return s.name } +func (s *testIService) Start(ctx context.Context) error { s.started.Store(true); <-ctx.Done(); return nil } +func (s *testIService) Stop(_ context.Context) error { s.stopped.Store(true); return nil } +func (s *testIService) Enabled() bool { return true } +func (s *testIService) Interval() time.Duration { return time.Second } +func (s *testIService) Healthy(_ context.Context) error { return nil } + +func TestService_WithServiceWrapper(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + impl := &testIService{name: "wrapped"} + + svc := NewService(WithService(impl)) + + if svc.Name() != "wrapped" { + t.Fatalf("name = %q; want wrapped", svc.Name()) + } + + ctx, cancel := context.WithCancel(context.Background()) + svc.Options().Context = ctx + + go func() { _ = svc.Start() }() + synctest.Wait() + + if !impl.started.Load() { + t.Fatal("Start was not called on underlying service") + } + + cancel() + synctest.Wait() + + _ = svc.Stop() + if !impl.stopped.Load() { + t.Fatal("Stop was not called on underlying service") + } + }) +} + +// --- Launcher context --- + +func TestLauncher_Context(t *testing.T) { + ln := newTestLauncher() + ctx := ln.Context() + if ctx == nil { + t.Fatal("Context() returned nil") + } +} + +// --- AppStartStopLog --- + +func TestLauncher_AppStartStopLog(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ln := newTestLauncher( + WithName("test-app"), + WithAppStartStopLog(true), + ) + + ln.ServicesRunner().Register( + NewService( + WithServiceName("svc"), + WithStart(blockingStart), + WithStop(noopStop), + ), + ) + + errCh := make(chan error, 1) + go func() { errCh <- ln.Run() }() + synctest.Wait() + + ln.Stop() + synctest.Wait() + + if err := <-errCh; err != nil { + t.Fatalf("Run error: %v", err) + } + }) +} diff --git a/launcher/service.go b/launcher/service.go index b304d0b..9db36d8 100644 --- a/launcher/service.go +++ b/launcher/service.go @@ -23,17 +23,22 @@ const ( // Service wraps a lifecycle-managed unit with Start/Stop functions and hooks. type Service struct { - opts ServiceOptions - state ServiceState + opts ServiceOptions + state ServiceState + readyCh chan struct{} } // NewService creates a new Service. func NewService(opts ...ServiceOption) *Service { return &Service{ - opts: newServiceOptions(opts...), + opts: newServiceOptions(opts...), + readyCh: make(chan struct{}), } } +// Ready returns a channel that is closed when the service transitions to Running state. +func (s *Service) Ready() <-chan struct{} { return s.readyCh } + func (s Service) Name() string { return s.opts.Name } // State returns the current lifecycle state of the service. @@ -44,6 +49,14 @@ func (s *Service) Options() *ServiceOptions { return &s.opts } func (s Service) String() string { return "mx" } func (s *Service) Start() error { + defer func() { + select { + case <-s.readyCh: + default: + close(s.readyCh) + } + }() + if s.opts.StartFn == nil { return nil } @@ -99,6 +112,12 @@ func (s *Service) runWithRestarts() error { s.state = ServiceStateRunning + select { + case <-s.readyCh: + default: + close(s.readyCh) + } + if !afterStartDone { afterStartDone = true for _, fn := range s.opts.AfterStart { @@ -122,7 +141,6 @@ func (s *Service) runWithRestarts() error { case <-time.After(s.opts.StartupTimeout): exitErr = fmt.Errorf("service [%s] startup timeout exceeded (%s)", s.Name(), s.opts.StartupTimeout) case <-s.opts.Context.Done(): - // graceful shutdown: wait for StartFn goroutine select { case <-time.After(s.opts.ShutdownTimeout): s.opts.Logger.Infof("service [%s] was stopped by timeout", s.Name()) @@ -138,7 +156,6 @@ func (s *Service) runWithRestarts() error { case <-doneChan: cleanExit = true case <-s.opts.Context.Done(): - // graceful shutdown: wait for StartFn goroutine select { case <-time.After(s.opts.ShutdownTimeout): s.opts.Logger.Infof("service [%s] was stopped by timeout", s.Name()) diff --git a/launcher/service_options.go b/launcher/service_options.go index 851f782..ff56775 100644 --- a/launcher/service_options.go +++ b/launcher/service_options.go @@ -40,6 +40,13 @@ type ServiceOptions struct { // RestartPolicy defines how the service behaves after an unexpected exit. RestartPolicy RestartPolicy + + // StartupPriority controls service startup ordering. + // Services are grouped by priority and started group-by-group in ascending order. + // Services within the same priority group start concurrently. + // All services in a group must reach Running state before the next group starts. + // Priority 0 (default): start concurrently after all prioritized groups are ready. + StartupPriority int } func (s *ServiceOptions) Validate() error { @@ -136,6 +143,14 @@ func WithRestartPolicy(p RestartPolicy) ServiceOption { return func(o *ServiceOptions) { o.RestartPolicy = p } } +// WithStartupPriority sets the startup priority for the service. +// Services with the same priority start concurrently within a group. +// Groups are started sequentially in ascending priority order. +// Priority 0 (default) services start last, concurrently. +func WithStartupPriority(p int) ServiceOption { + return func(o *ServiceOptions) { o.StartupPriority = p } +} + // WithService wraps any value that implements Name/Start/Stop/Enabled/HealthChecker. func WithService(svc any) ServiceOption { return func(o *ServiceOptions) { diff --git a/skills/mx/SKILL.md b/skills/mx/SKILL.md index 15c8629..b4f068b 100644 --- a/skills/mx/SKILL.md +++ b/skills/mx/SKILL.md @@ -50,6 +50,7 @@ user-invocable: false - **Ops are separate**: Health checks, metrics, and profiler run on a dedicated HTTP server (default port 10000), not on the application transport. - **Context flows down**: The Launcher creates a root context that is passed to all services. Services should respect `<-ctx.Done()` in their Start function. - **Restart policies are per-service**: Configure `RestartOnFailure` or `RestartAlways` with exponential backoff on individual services, not globally. +- **Startup priority groups**: Services with `StartupPriority > 0` start in ascending group order (same priority = concurrent within group). All must be ready before the next group. Priority 0 (default) starts last, concurrently. ## Related files diff --git a/skills/mx/architecture.md b/skills/mx/architecture.md index fca18d1..1bb895c 100644 --- a/skills/mx/architecture.md +++ b/skills/mx/architecture.md @@ -79,7 +79,7 @@ Launcher 1. Register ops services (if `OpsConfig.Enabled`) 2. Run `BeforeStart` hooks sequentially -3. Start all services in parallel goroutines +3. Start services by priority groups: each group starts concurrently, groups run sequentially in ascending priority order. Priority 0 (default) starts last. 4. Run `AfterStart` hooks sequentially 5. Wait for: service error, OS signal, or context cancellation 6. On first signal: cancel context, log graceful shutdown message @@ -103,6 +103,7 @@ Launcher - `ShutdownTimeout` (default 10s) — max time for Stop to complete - `StartupTimeout` — max time for Start to signal (0 = no timeout) - `RestartPolicy` — automatic restart on failure or always +- `StartupPriority` — group-based startup ordering (same priority = concurrent, groups sequential) ### Restart Policies diff --git a/skills/mx/service-checklist.md b/skills/mx/service-checklist.md index 4df7433..efcc1a1 100644 --- a/skills/mx/service-checklist.md +++ b/skills/mx/service-checklist.md @@ -98,7 +98,29 @@ ln.ServicesRunner().Register( ) ``` -## 6. (Optional) Add lifecycle hooks +## 6. (Optional) Set startup priority + +For infrastructure services that must be ready before other services start: + +```go +ln.ServicesRunner().Register( + // Priority 1 services start first (same priority = concurrent within group) + launcher.NewService( + launcher.WithService(dbSvc), + launcher.WithStartupPriority(1), + ), + launcher.NewService( + launcher.WithService(redisSvc), + launcher.WithStartupPriority(1), + ), + // Priority 0 (default) starts last, after all prioritized groups are ready + launcher.NewService( + launcher.WithService(appSvc), + ), +) +``` + +## 7. (Optional) Add lifecycle hooks ```go ln.ServicesRunner().Register( @@ -116,7 +138,7 @@ ln.ServicesRunner().Register( ) ``` -## 7. Verify +## 8. Verify - [ ] Service implements `types.IService` (Name, Start, Stop) - [ ] Start function blocks until `ctx.Done()` or work completes diff --git a/skills/mx/templates/launcher/launcher-setup.md b/skills/mx/templates/launcher/launcher-setup.md index 8bdc949..6d7a8d8 100644 --- a/skills/mx/templates/launcher/launcher-setup.md +++ b/skills/mx/templates/launcher/launcher-setup.md @@ -122,6 +122,20 @@ func main() { | `WithBeforeStop(func() error)` | Hook before services stop | | `WithAfterStop(func() error)` | Hook after services stop | +### Service Options Reference + +| Option | Description | +| ------------------------------------ | ---------------------------------------------------------------------------------- | +| `WithServiceName(string)` | Service name used in logs and health checks | +| `WithStart(func(ctx) error)` | Start function (must block) | +| `WithStop(func(ctx) error)` | Stop function | +| `WithEnabled(bool)` | Enable/disable service | +| `WithService(any)` | Wrap struct implementing Name/Start/Stop | +| `WithStartupPriority(int)` | Startup priority (0 = concurrent last, >0 = grouped sequential in ascending order) | +| `WithShutdownTimeout(time.Duration)` | Max time for Stop to complete (default 10s) | +| `WithStartupTimeout(time.Duration)` | Max time for Start to signal ready (0 = no limit) | +| `WithRestartPolicy(RestartPolicy)` | Automatic restart on failure or always | + ## Programmatic Stop You can stop the launcher from code: