Skip to content

Commit 71991a9

Browse files
committed
fix: correctly handle stopping services with reverse dependencies
This bug showed up with extension services: say we have a service `ext-foo` which depends on service `cri`. Service `ext-foo` will be started correctly only once `cri` is up. But we should also stop `ext-foo` before `cri` is stopped, as otherwise the dependency chain is broken. This PR fixes exactly that: once `cri` is stopped, anything which depends on it should be stopped. We should stop as well anything which depends on `ext-foo` (transitive dependency). In practical terms we use dependency on `cri` in extension service to correctly stop/start extension services with `/var` filesystem mount/unmount. Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com> (cherry picked from commit 992e230)
1 parent f881f2f commit 71991a9

File tree

4 files changed

+88
-20
lines changed

4 files changed

+88
-20
lines changed

internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,8 @@ func StartAllServices(seq runtime.Sequence, data interface{}) (runtime.TaskExecu
714714
// StopServicesForUpgrade represents the StopServicesForUpgrade task.
715715
func StopServicesForUpgrade(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
716716
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) {
717-
return system.Services(nil).StopWithRevDepenencies(ctx, "cri", "etcd", "kubelet", "udevd")
717+
// stopping 'cri' service stops everything which depends on it (kubelet, etcd, ...)
718+
return system.Services(nil).StopWithRevDepenencies(ctx, "cri", "udevd")
718719
}, "stopServicesForUpgrade"
719720
}
720721

internal/app/machined/pkg/system/services/extension.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
specs "github.com/opencontainers/runtime-spec/specs-go"
1515

1616
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
17-
"github.com/talos-systems/talos/internal/app/machined/pkg/system"
1817
"github.com/talos-systems/talos/internal/app/machined/pkg/system/events"
1918
"github.com/talos-systems/talos/internal/app/machined/pkg/system/runner"
2019
"github.com/talos-systems/talos/internal/app/machined/pkg/system/runner/containerd"
@@ -66,8 +65,6 @@ func (svc *Extension) Condition(r runtime.Runtime) conditions.Condition {
6665

6766
for _, dep := range svc.Spec.Depends {
6867
switch {
69-
case dep.Service != "":
70-
conds = append(conds, system.WaitForService(system.StateEventUp, dep.Service))
7168
case dep.Path != "":
7269
conds = append(conds, conditions.WaitForFileToExist(dep.Path))
7370
case len(dep.Network) > 0:
@@ -86,7 +83,15 @@ func (svc *Extension) Condition(r runtime.Runtime) conditions.Condition {
8683

8784
// DependsOn implements the Service interface.
8885
func (svc *Extension) DependsOn(r runtime.Runtime) []string {
89-
return []string{"containerd"}
86+
deps := []string{"containerd"}
87+
88+
for _, dep := range svc.Spec.Depends {
89+
if dep.Service != "" {
90+
deps = append(deps, dep.Service)
91+
}
92+
}
93+
94+
return deps
9095
}
9196

9297
// Runner implements the Service interface.

internal/app/machined/pkg/system/system.go

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func (s *singleton) Stop(ctx context.Context, serviceIDs ...string) (err error)
229229

230230
// StopWithRevDepenencies will initiate a shutdown of the specified services waiting for reverse dependencies to finish first.
231231
//
232-
// If reverse dependency is not stopped, this method might block waiting on it being stopped forever.
232+
// If reverse dependency is not stopped, this method might block waiting on it being stopped for up to 30 seconds.
233233
func (s *singleton) StopWithRevDepenencies(ctx context.Context, serviceIDs ...string) (err error) {
234234
if len(serviceIDs) == 0 {
235235
return
@@ -247,35 +247,77 @@ func (s *singleton) StopWithRevDepenencies(ctx context.Context, serviceIDs ...st
247247

248248
//nolint:gocyclo
249249
func (s *singleton) stopServices(ctx context.Context, services []string, waitForRevDependencies bool) error {
250-
stateCopy := make(map[string]*ServiceRunner)
250+
servicesToStop := make(map[string]*ServiceRunner)
251251

252252
if services == nil {
253253
for name, svcrunner := range s.state {
254-
stateCopy[name] = svcrunner
254+
servicesToStop[name] = svcrunner
255255
}
256256
} else {
257257
for _, name := range services {
258258
if _, ok := s.state[name]; !ok {
259259
continue
260260
}
261261

262-
stateCopy[name] = s.state[name]
262+
servicesToStop[name] = s.state[name]
263263
}
264264
}
265265

266-
s.mu.Unlock()
267-
268-
// build reverse dependencies
266+
// build reverse dependencies, and expand the list of services to stop
267+
// with services which depend on the one being stopped
269268
reverseDependencies := make(map[string][]string)
270269

271270
if waitForRevDependencies {
272-
for name, svcrunner := range stateCopy {
271+
// expand the list of services to stop with the list of services which depend
272+
// on the ones being stopped
273+
// the loop is run as long as more dependencies are added to the list
274+
for {
275+
expanded := false
276+
277+
for name, svcrunner := range s.state {
278+
if _, scheduledToStop := servicesToStop[name]; scheduledToStop {
279+
continue
280+
}
281+
282+
dependencies := svcrunner.service.DependsOn(s.runtime)
283+
284+
shouldStopService := false
285+
286+
for _, dependency := range dependencies {
287+
for scheduledService := range servicesToStop {
288+
if scheduledService == dependency {
289+
shouldStopService = true
290+
291+
break
292+
}
293+
}
294+
295+
if shouldStopService {
296+
break
297+
}
298+
}
299+
300+
if shouldStopService {
301+
servicesToStop[name] = svcrunner
302+
expanded = true
303+
}
304+
}
305+
306+
if !expanded {
307+
break
308+
}
309+
}
310+
311+
// build a list of dependencies to wait for before stopping each of the services
312+
for name, svcrunner := range servicesToStop {
273313
for _, dependency := range svcrunner.service.DependsOn(s.runtime) {
274314
reverseDependencies[dependency] = append(reverseDependencies[dependency], name)
275315
}
276316
}
277317
}
278318

319+
s.mu.Unlock()
320+
279321
// shutdown all the services waiting for rev deps
280322
var shutdownWg sync.WaitGroup
281323

@@ -285,7 +327,7 @@ func (s *singleton) stopServices(ctx context.Context, services []string, waitFor
285327

286328
stoppedConds := []conditions.Condition{}
287329

288-
for name, svcrunner := range stateCopy {
330+
for name, svcrunner := range servicesToStop {
289331
shutdownWg.Add(1)
290332

291333
stoppedConds = append(stoppedConds, WaitForService(StateEventDown, name))

internal/app/machined/pkg/system/system_test.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,6 @@ func (suite *SystemServicesSuite) TestStartShutdown() {
2727
time.Sleep(10 * time.Millisecond)
2828

2929
suite.Require().NoError(system.Services(nil).Unload(context.Background(), "trustd", "notrunning"))
30-
31-
system.Services(nil).Shutdown(context.TODO())
32-
}
33-
34-
func TestSystemServicesSuite(t *testing.T) {
35-
suite.Run(t, new(SystemServicesSuite))
3630
}
3731

3832
func (suite *SystemServicesSuite) TestStartStop() {
@@ -47,3 +41,29 @@ func (suite *SystemServicesSuite) TestStartStop() {
4741
)
4842
suite.Assert().NoError(err)
4943
}
44+
45+
func (suite *SystemServicesSuite) TestStopWithRevDeps() {
46+
system.Services(nil).LoadAndStart(
47+
&MockService{name: "cri"},
48+
&MockService{name: "networkd", dependencies: []string{"cri"}},
49+
&MockService{name: "vland", dependencies: []string{"networkd"}},
50+
)
51+
time.Sleep(10 * time.Millisecond)
52+
53+
// stopping cri should stop all services
54+
suite.Require().NoError(system.Services(nil).StopWithRevDepenencies(context.Background(), "cri"))
55+
56+
// no services should be running
57+
for _, name := range []string{"cri", "networkd", "vland"} {
58+
svc, running, err := system.Services(nil).IsRunning(name)
59+
suite.Require().NoError(err)
60+
suite.Assert().NotNil(svc)
61+
suite.Assert().False(running)
62+
}
63+
64+
system.Services(nil).Shutdown(context.TODO())
65+
}
66+
67+
func TestSystemServicesSuite(t *testing.T) {
68+
suite.Run(t, new(SystemServicesSuite))
69+
}

0 commit comments

Comments
 (0)