Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ skills repo add tkcrm/mx
| Startup timeout | `WithStartupTimeout(d)` | Fail the service if `StartFn` does not signal ready within `d` |
| Shutdown timeout (per service) | `WithShutdownTimeout(d)` | Max time to wait for a service to stop |
| Global shutdown timeout | `WithGlobalShutdownTimeout(d)` | Hard deadline for the entire graceful shutdown phase |
| Startup priority | `WithStartupPriority(n)` | Group-based startup ordering: same priority starts concurrently, groups run in ascending order |
| Stop sequence | `WithRunnerServicesSequence(...)` | `None` (parallel) / `Fifo` / `Lifo` |
| Service lookup | `ServicesRunner().Get(name)` | Retrieve a registered service by name at runtime |
| Health checker | `types.HealthChecker` interface | Periodic per-service health check, polled on a configurable interval |
Expand Down Expand Up @@ -164,6 +165,42 @@ func main() {
}
```

### Startup priority

Services can be assigned a startup priority to control initialization order. Services with the same priority start concurrently within a group. Groups are started sequentially in ascending priority order. Priority 0 (default) services start last, concurrently, after all prioritized groups are ready.

```go
ln.ServicesRunner().Register(
// Priority 1: DB layer — start concurrently, both must be ready before next group
launcher.NewService(
launcher.WithServiceName("postgres"),
launcher.WithStartupPriority(1),
launcher.WithService(pgService),
),
launcher.NewService(
launcher.WithServiceName("redis"),
launcher.WithStartupPriority(1),
launcher.WithService(redisService),
),
// Priority 2: message broker — waits for DB layer to be ready
launcher.NewService(
launcher.WithServiceName("rabbitmq"),
launcher.WithStartupPriority(2),
launcher.WithService(rabbitService),
),
// Priority 0 (default): application services — start concurrently after all groups
launcher.NewService(
launcher.WithServiceName("http-server"),
launcher.WithService(httpService),
),
launcher.NewService(
launcher.WithServiceName("grpc-server"),
launcher.WithService(grpcService),
),
)
// Start order: (postgres + redis) → rabbitmq → (http + grpc concurrently)
```

### Graceful shutdown

The first signal (SIGTERM / SIGINT / SIGQUIT) starts a graceful shutdown. A second signal forces immediate exit.
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ require (
github.com/prometheus/common v0.67.5 // indirect
github.com/prometheus/procfs v0.20.1 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.42.0 // indirect
go.opentelemetry.io/otel/metric v1.42.0 // indirect
go.opentelemetry.io/otel/trace v1.42.0 // indirect
go.opentelemetry.io/otel v1.43.0 // indirect
go.opentelemetry.io/otel/metric v1.43.0 // indirect
go.opentelemetry.io/otel/trace v1.43.0 // indirect
go.yaml.in/yaml/v2 v2.4.4 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/sys v0.43.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
)

require (
github.com/prometheus/client_golang v1.23.2
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0
go.uber.org/multierr v1.11.0 // indirect
)
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0 h1:OyrsyzuttWTSur2qN/Lm0m2a8yqyIjUVBZcxFPuXq2o=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0/go.mod h1:C2NGBr+kAB4bk3xtMXfZ94gqFDtg/GkI7e9zqGh5Beg=
go.opentelemetry.io/otel v1.42.0 h1:lSQGzTgVR3+sgJDAU/7/ZMjN9Z+vUip7leaqBKy4sho=
go.opentelemetry.io/otel v1.42.0/go.mod h1:lJNsdRMxCUIWuMlVJWzecSMuNjE7dOYyWlqOXWkdqCc=
go.opentelemetry.io/otel/metric v1.42.0 h1:2jXG+3oZLNXEPfNmnpxKDeZsFI5o4J+nz6xUlaFdF/4=
go.opentelemetry.io/otel/metric v1.42.0/go.mod h1:RlUN/7vTU7Ao/diDkEpQpnz3/92J9ko05BIwxYa2SSI=
go.opentelemetry.io/otel/sdk v1.42.0 h1:LyC8+jqk6UJwdrI/8VydAq/hvkFKNHZVIWuslJXYsDo=
go.opentelemetry.io/otel/sdk v1.42.0/go.mod h1:rGHCAxd9DAph0joO4W6OPwxjNTYWghRWmkHuGbayMts=
go.opentelemetry.io/otel/sdk/metric v1.42.0 h1:D/1QR46Clz6ajyZ3G8SgNlTJKBdGp84q9RKCAZ3YGuA=
go.opentelemetry.io/otel/sdk/metric v1.42.0/go.mod h1:Ua6AAlDKdZ7tdvaQKfSmnFTdHx37+J4ba8MwVCYM5hc=
go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4LenLmOYY=
go.opentelemetry.io/otel/trace v1.42.0/go.mod h1:f3K9S+IFqnumBkKhRJMeaZeNk9epyhnCmQh/EysQCdc=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 h1:CqXxU8VOmDefoh0+ztfGaymYbhdB/tT3zs79QaZTNGY=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0/go.mod h1:BuhAPThV8PBHBvg8ZzZ/Ok3idOdhWIodywz2xEcRbJo=
go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I=
go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0=
go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM=
go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY=
go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg=
go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg=
go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw=
go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A=
go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A=
go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
Expand All @@ -59,8 +59,8 @@ go.yaml.in/yaml/v2 v2.4.4 h1:tuyd0P+2Ont/d6e2rl3be67goVK4R6deVxCUX5vyPaQ=
go.yaml.in/yaml/v2 v2.4.4/go.mod h1:gMZqIpDtDqOfM0uNfy0SkpRhvUryYH0Z6wdMYcacYXQ=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
73 changes: 66 additions & 7 deletions launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/signal"
"slices"
"sync"
"time"

"github.com/tkcrm/mx/launcher/ops"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -80,18 +81,74 @@ func (l *launcher) Run() error { //nolint:cyclop
}
}

// start service
// group services by startup priority
groups := make(map[int][]*Service)
for _, svc := range l.servicesRunner.Services() {
p := svc.Options().StartupPriority
groups[p] = append(groups[p], svc)
}

// collect and sort unique priorities (excluding 0)
var priorities []int
for p := range groups {
if p > 0 {
priorities = append(priorities, p)
}
}
slices.Sort(priorities)

errChan := make(chan error, len(l.servicesRunner.Services()))
graceWait := new(sync.WaitGroup)
graceWait.Add(len(l.servicesRunner.Services()))
for i := range l.servicesRunner.Services() {
go func(svc *Service) {

startSvc := func(svc *Service) {
graceWait.Add(1)
go func() {
defer graceWait.Done()
if err := svc.Start(); err != nil {
err := fmt.Errorf("failed to start service [%s]: %w", svc.Name(), err)
errChan <- err
errChan <- fmt.Errorf("failed to start service [%s]: %w", svc.Name(), err)
}
}()
}

// start priority groups sequentially; within each group — concurrently
for _, p := range priorities {
group := groups[p]

for _, svc := range group {
startSvc(svc)
}

// wait for ALL services in this group to become ready
for _, svc := range group {
select {
case <-svc.Ready():
case err := <-errChan:
l.cancelFn()
graceWait.Wait()
return err
case <-l.opts.Context.Done():
graceWait.Wait()
return l.opts.Context.Err()
}
}(l.servicesRunner.Services()[i])
}

// Yield to let any immediate service failures propagate.
// A service that fails synchronously closes readyCh (via deferred Start)
// before the error reaches errChan. This sleep gives startSvc goroutines
// time to deliver the error after Start() returns.
time.Sleep(time.Nanosecond)
select {
case err := <-errChan:
l.cancelFn()
graceWait.Wait()
return err
default:
}
}

// start priority-0 services — all concurrently
for _, svc := range groups[0] {
startSvc(svc)
}

if l.opts.AppStartStopLog {
Expand All @@ -101,6 +158,8 @@ func (l *launcher) Run() error { //nolint:cyclop
// after start
for _, fn := range l.opts.AfterStart {
if err := fn(); err != nil {
l.cancelFn()
graceWait.Wait()
return err
}
}
Expand Down
Loading