Skip to content

Commit 4f142cc

Browse files
committed
feat: implement D-Bus systemd-compatible shutdown for kubelet
Add a mock D-Bus daemon and a mock logind implementation over D-Bus. Kubelet gets a handle to the D-Bus socket, connects over it to our logind mock and negotiates shutdown activities. Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com> (cherry picked from commit caf800f)
1 parent 3345cde commit 4f142cc

File tree

17 files changed

+858
-8
lines changed

17 files changed

+858
-8
lines changed

hack/release.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,19 @@ with a single `--mode` flag that can take the following values:
4545
title = "Kubelet"
4646
description="""\
4747
Kubelet configuration can now be overridden with the `.machine.kubelet.extraConfig` machine configuration field.
48-
As most of the kubelet command line arguments are being depreacted, it is recommended to migrate to `extraConfig`
48+
As most of the kubelet command line arguments are being deprecated, it is recommended to migrate to `extraConfig`
4949
instead of using `extraArgs`.
5050
5151
A number of conformance tweaks have been made to the `kubelet` to allow it to run without
5252
`protectKernelDefaults`.
5353
This includes both kubelet configuration options and sysctls.
5454
Of particular note is that Talos now sets the `kernel.panic` reboot interval to 10s instead of 1s.
5555
If your kubelet fails to start after the upgrade, please check the `kubelet` logs to determine the problem.
56+
57+
Talos now performs graceful kubelet shutdown by default on node reboot/shutdown.
58+
Default shutdown timeouts: 20s for regular priority pods and 10s for critical priority pods.
59+
Timeouts can be overridden with the `.machine.kubelet.extraConfig` machine configuration key:
60+
`shutdownGracePeriod` and `shutdownGracePeriodCriticalPods`.
5661
"""
5762

5863
[notes.auditlog]
@@ -144,7 +149,7 @@ Old behavior can be achieved by specifiying empty flag value: `--kubernetes-vers
144149
[notes.admission]
145150
title = "Admission Plugin Configuration"
146151
description="""\
147-
Talos now supports Kubernetes API server admission plugin configuration via the `.cluster.apiServer.admissonControl` machine configuration field.
152+
Talos now supports Kubernetes API server admission plugin configuration via the `.cluster.apiServer.admissionControl` machine configuration field.
148153
149154
This configuration can be used to enable [Pod Security Admission](https://kubernetes.io/docs/concepts/security/pod-security-admission/) plugin and
150155
define cluster-wide default [Pod Security Standards](https://kubernetes.io/docs/concepts/security/pod-security-standards/).

internal/app/machined/pkg/controllers/config/k8s_control_plane.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func (ctrl *K8sControlPlaneController) Run(ctx context.Context, r controller.Run
107107

108108
for _, f := range []func(context.Context, controller.Runtime, *zap.Logger, talosconfig.Provider) error{
109109
ctrl.manageAPIServerConfig,
110-
ctrl.manageAdmissonControlConfig,
110+
ctrl.manageAdmissionControlConfig,
111111
ctrl.manageControllerManagerConfig,
112112
ctrl.manageSchedulerConfig,
113113
ctrl.manageManifestsConfig,
@@ -159,7 +159,7 @@ func (ctrl *K8sControlPlaneController) manageAPIServerConfig(ctx context.Context
159159
})
160160
}
161161

162-
func (ctrl *K8sControlPlaneController) manageAdmissonControlConfig(ctx context.Context, r controller.Runtime, logger *zap.Logger, cfgProvider talosconfig.Provider) error {
162+
func (ctrl *K8sControlPlaneController) manageAdmissionControlConfig(ctx context.Context, r controller.Runtime, logger *zap.Logger, cfgProvider talosconfig.Provider) error {
163163
spec := config.K8sAdmissionControlSpec{}
164164

165165
for _, cfg := range cfgProvider.Cluster().APIServer().AdmissionControl() {

internal/app/machined/pkg/controllers/k8s/kubelet_spec.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,14 @@ func NewKubeletConfiguration(clusterDNS []string, dnsDomain string, extraConfig
294294
config.Logging.Format = "json"
295295
}
296296

297+
if config.ShutdownGracePeriod.Duration == 0 {
298+
config.ShutdownGracePeriod = metav1.Duration{Duration: constants.KubeletShutdownGracePeriod}
299+
}
300+
301+
if config.ShutdownGracePeriodCriticalPods.Duration == 0 {
302+
config.ShutdownGracePeriodCriticalPods = metav1.Duration{Duration: constants.KubeletShutdownGracePeriodCriticalPods}
303+
}
304+
297305
if config.StreamingConnectionIdleTimeout.Duration == 0 {
298306
config.StreamingConnectionIdleTimeout = metav1.Duration{Duration: 5 * time.Minute}
299307
}

internal/app/machined/pkg/controllers/k8s/kubelet_spec_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -333,10 +333,11 @@ func TestNewKubeletConfigurationSuccess(t *testing.T) {
333333
Logging: v1alpha1.LoggingConfiguration{
334334
Format: "json",
335335
},
336-
337-
StreamingConnectionIdleTimeout: metav1.Duration{Duration: 5 * time.Minute},
338-
TLSMinVersion: "VersionTLS13",
339-
EnableDebuggingHandlers: pointer.ToBool(true),
336+
ShutdownGracePeriod: metav1.Duration{Duration: constants.KubeletShutdownGracePeriod},
337+
ShutdownGracePeriodCriticalPods: metav1.Duration{Duration: constants.KubeletShutdownGracePeriodCriticalPods},
338+
StreamingConnectionIdleTimeout: metav1.Duration{Duration: 5 * time.Minute},
339+
TLSMinVersion: "VersionTLS13",
340+
EnableDebuggingHandlers: pointer.ToBool(true),
340341
},
341342
config)
342343
}

internal/app/machined/pkg/runtime/state.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
package runtime
66

77
import (
8+
"context"
9+
810
"github.com/cosi-project/runtime/pkg/state"
911
"github.com/cosi-project/runtime/pkg/state/registry"
1012
"github.com/talos-systems/go-blockdevice/blockdevice/probe"
@@ -37,6 +39,7 @@ type MachineState interface {
3739
StagedInstallOptions() []byte
3840
KexecPrepared(bool)
3941
IsKexecPrepared() bool
42+
DBus() DBusState
4043
}
4144

4245
// ClusterState defines the cluster state.
@@ -51,3 +54,10 @@ type V1Alpha2State interface {
5154

5255
SetConfig(config.Provider) error
5356
}
57+
58+
// DBusState defines the D-Bus logind mock.
59+
type DBusState interface {
60+
Start() error
61+
Stop() error
62+
WaitShutdown(ctx context.Context) error
63+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
package v1alpha1
6+
7+
import (
8+
"context"
9+
"fmt"
10+
"os"
11+
"path/filepath"
12+
"time"
13+
14+
"github.com/talos-systems/talos/internal/pkg/logind"
15+
"github.com/talos-systems/talos/pkg/machinery/constants"
16+
)
17+
18+
// DBusState implements the logind mock.
19+
type DBusState struct {
20+
broker *logind.DBusBroker
21+
logindMock *logind.ServiceMock
22+
errCh chan error
23+
cancel context.CancelFunc
24+
}
25+
26+
// Start the D-Bus broker and logind mock.
27+
func (dbus *DBusState) Start() error {
28+
for _, path := range []string{constants.DBusServiceSocketPath, constants.DBusClientSocketPath} {
29+
if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
30+
return err
31+
}
32+
}
33+
34+
var err error
35+
36+
dbus.broker, err = logind.NewBroker(constants.DBusServiceSocketPath, constants.DBusClientSocketPath)
37+
if err != nil {
38+
return err
39+
}
40+
41+
var ctx context.Context
42+
43+
ctx, dbus.cancel = context.WithCancel(context.Background())
44+
45+
dbus.errCh = make(chan error)
46+
47+
go func() {
48+
dbus.errCh <- dbus.broker.Run(ctx)
49+
}()
50+
51+
dbus.logindMock, err = logind.NewServiceMock(constants.DBusServiceSocketPath)
52+
53+
return err
54+
}
55+
56+
// Stop the D-Bus broker and logind mock.
57+
func (dbus *DBusState) Stop() error {
58+
dbus.cancel()
59+
60+
if err := dbus.logindMock.Close(); err != nil {
61+
return err
62+
}
63+
64+
if err := dbus.broker.Close(); err != nil {
65+
return err
66+
}
67+
68+
select {
69+
case <-time.After(time.Second):
70+
return fmt.Errorf("timed out stopping D-Bus broker")
71+
case err := <-dbus.errCh:
72+
return err
73+
}
74+
}
75+
76+
// WaitShutdown signals the shutdown over the D-Bus and waits for the inhibit lock to be released.
77+
func (dbus *DBusState) WaitShutdown(ctx context.Context) error {
78+
if err := dbus.logindMock.EmitShutdown(); err != nil {
79+
return err
80+
}
81+
82+
return dbus.logindMock.WaitLockRelease(ctx)
83+
}

internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ func (*Sequencer) Boot(r runtime.Runtime) []runtime.Phase {
192192
).Append(
193193
"containerd",
194194
StartContainerd,
195+
).Append(
196+
"dbus",
197+
StartDBus,
195198
).AppendWhen(
196199
r.State().Platform().Mode() == runtime.ModeContainer,
197200
"sharedFilesystems",
@@ -263,6 +266,9 @@ func (*Sequencer) Reboot(r runtime.Runtime) []runtime.Phase {
263266
phases := PhaseList{}.Append(
264267
"cleanup",
265268
StopAllPods,
269+
).Append(
270+
"dbus",
271+
StopDBus,
266272
).
267273
AppendList(stopAllPhaselist(r, true)).
268274
Append("reboot", Reboot)
@@ -294,6 +300,9 @@ func (*Sequencer) Reset(r runtime.Runtime, in runtime.ResetOptions) []runtime.Ph
294300
!in.GetGraceful(),
295301
"cleanup",
296302
StopAllPods,
303+
).Append(
304+
"dbus",
305+
StopDBus,
297306
).AppendWhen(
298307
in.GetGraceful() && (r.Config().Machine().Type() != machine.TypeWorker),
299308
"leave",
@@ -331,6 +340,9 @@ func (*Sequencer) Shutdown(r runtime.Runtime, in *machineapi.ShutdownRequest) []
331340
).Append(
332341
"cleanup",
333342
StopAllPods,
343+
).Append(
344+
"dbus",
345+
StopDBus,
334346
).
335347
AppendList(stopAllPhaselist(r, false)).
336348
Append("shutdown", Shutdown)
@@ -349,6 +361,9 @@ func (*Sequencer) StageUpgrade(r runtime.Runtime, in *machineapi.UpgradeRequest)
349361
phases = phases.Append(
350362
"cleanup",
351363
StopAllPods,
364+
).Append(
365+
"dbus",
366+
StopDBus,
352367
).AppendWhen(
353368
!in.GetPreserve() && (r.Config().Machine().Type() != machine.TypeWorker),
354369
"leave",
@@ -383,6 +398,9 @@ func (*Sequencer) Upgrade(r runtime.Runtime, in *machineapi.UpgradeRequest) []ru
383398
in.GetPreserve(),
384399
"cleanup",
385400
StopAllPods,
401+
).Append(
402+
"dbus",
403+
StopDBus,
386404
).AppendWhen(
387405
!in.GetPreserve() && (r.Config().Machine().Type() != machine.TypeWorker),
388406
"leave",

internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1237,6 +1237,15 @@ func StopAllPods(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionF
12371237

12381238
func stopAndRemoveAllPods(stopAction cri.StopAction) runtime.TaskExecutionFunc {
12391239
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) {
1240+
logger.Printf("shutting down kubelet gracefully")
1241+
1242+
shutdownCtx, shutdownCtxCancel := context.WithTimeout(ctx, constants.KubeletShutdownGracePeriod*2)
1243+
defer shutdownCtxCancel()
1244+
1245+
if err = r.State().Machine().DBus().WaitShutdown(shutdownCtx); err != nil {
1246+
logger.Printf("failed waiting for inhibit shutdown lock: %s", err)
1247+
}
1248+
12401249
if err = system.Services(nil).Stop(ctx, "kubelet"); err != nil {
12411250
return err
12421251
}
@@ -1821,3 +1830,21 @@ func KexecPrepare(seq runtime.Sequence, data interface{}) (runtime.TaskExecution
18211830
return nil
18221831
}, "kexecPrepare"
18231832
}
1833+
1834+
// StartDBus starts the D-Bus mock.
1835+
func StartDBus(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
1836+
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) error {
1837+
return r.State().Machine().DBus().Start()
1838+
}, "startDBus"
1839+
}
1840+
1841+
// StopDBus stops the D-Bus mock.
1842+
func StopDBus(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
1843+
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) error {
1844+
if err := r.State().Machine().DBus().Stop(); err != nil {
1845+
logger.Printf("error stopping D-Bus: %s, ignored", err)
1846+
}
1847+
1848+
return nil
1849+
}, "stopDBus"
1850+
}

internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_state.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ type MachineState struct {
3939
stagedInstallOptions []byte
4040

4141
kexecPrepared bool
42+
43+
dbus DBusState
4244
}
4345

4446
// ClusterState represents the cluster's state.
@@ -229,3 +231,8 @@ func (s *MachineState) KexecPrepared(prepared bool) {
229231
func (s *MachineState) IsKexecPrepared() bool {
230232
return s.kexecPrepared
231233
}
234+
235+
// DBus implements the machine state interface.
236+
func (s *MachineState) DBus() runtime.DBusState {
237+
return &s.dbus
238+
}

internal/app/machined/pkg/system/services/kubelet.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ func (k *Kubelet) Runner(r runtime.Runtime) (runner.Runner, error) {
113113
{Type: "bind", Destination: "/etc/cni", Source: "/etc/cni", Options: []string{"rbind", "rshared", "rw"}},
114114
{Type: "bind", Destination: "/usr/libexec/kubernetes", Source: "/usr/libexec/kubernetes", Options: []string{"rbind", "rshared", "rw"}},
115115
{Type: "bind", Destination: "/var/run", Source: "/run", Options: []string{"rbind", "rshared", "rw"}},
116+
{Type: "bind", Destination: "/var/run/dbus/system_bus_socket", Source: constants.DBusClientSocketPath, Options: []string{"bind", "rw"}},
116117
{Type: "bind", Destination: "/var/lib/containerd", Source: "/var/lib/containerd", Options: []string{"rbind", "rshared", "rw"}},
117118
{Type: "bind", Destination: "/var/lib/kubelet", Source: "/var/lib/kubelet", Options: []string{"rbind", "rshared", "rw"}},
118119
{Type: "bind", Destination: "/var/log/containers", Source: "/var/log/containers", Options: []string{"rbind", "rshared", "rw"}},

0 commit comments

Comments
 (0)