Skip to content

Commit

Permalink
fix: mount kubelet secrets from system instead of ephemeral
Browse files Browse the repository at this point in the history
Launch goroutine that copies kubelet pki folder contents into
`/system/secrets/kubelet` every minute before starting apid container
when running on the worker node.

Mounting kubelet secrets directly from `/var/lib/kubelet/pki` breaks
upgrade flow, because we are not able to unmount ephemeral partition,
which is being used by apid, which is not stopped during the upgrade.

Signed-off-by: Artem Chernyshev <artem.0xD2@gmail.com>
  • Loading branch information
Unix4ever authored and talos-bot committed Feb 9, 2021
1 parent 4734fe7 commit a07cfbd
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 17 deletions.
73 changes: 70 additions & 3 deletions internal/app/machined/pkg/system/services/apid.go
Expand Up @@ -9,12 +9,15 @@ import (
"bytes"
"context"
"fmt"
"log"
"net"
"os"
"path/filepath"
"strings"
"sync"

"github.com/containerd/containerd/oci"
"github.com/fsnotify/fsnotify"
specs "github.com/opencontainers/runtime-spec/specs-go"

"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
Expand All @@ -25,13 +28,18 @@ import (
"github.com/talos-systems/talos/internal/app/machined/pkg/system/runner/restart"
"github.com/talos-systems/talos/internal/pkg/containers/image"
"github.com/talos-systems/talos/pkg/conditions"
"github.com/talos-systems/talos/pkg/copy"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
)

// APID implements the Service interface. It serves as the concrete type with
// the required methods.
type APID struct{}
type APID struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

// ID implements the Service interface.
func (o *APID) ID(r runtime.Runtime) string {
Expand All @@ -40,11 +48,18 @@ func (o *APID) ID(r runtime.Runtime) string {

// PreFunc implements the Service interface.
func (o *APID) PreFunc(ctx context.Context, r runtime.Runtime) error {
if r.Config().Machine().Type() == machine.TypeJoin {
o.syncKubeletPKI()
}

return image.Import(ctx, "/usr/images/apid.tar", "talos/apid")
}

// PostFunc implements the Service interface.
func (o *APID) PostFunc(r runtime.Runtime, state events.ServiceState) (err error) {
o.cancel()
o.wg.Wait()

return nil
}

Expand Down Expand Up @@ -101,8 +116,8 @@ func (o *APID) Runner(r runtime.Runtime) (runner.Runner, error) {
if isWorker {
// worker requires kubelet config to refresh the certs via Kubernetes
mounts = append(mounts,
specs.Mount{Type: "bind", Destination: filepath.Dir(constants.KubeletKubeconfig), Source: filepath.Dir(constants.KubeletKubeconfig), Options: []string{"rbind", "ro"}},
specs.Mount{Type: "bind", Destination: constants.KubeletPKIDir, Source: constants.KubeletPKIDir, Options: []string{"rbind", "ro"}},
specs.Mount{Type: "bind", Destination: filepath.Dir(constants.KubeletKubeconfig), Source: constants.SystemKubeletPKIDir, Options: []string{"rbind", "ro"}},
specs.Mount{Type: "bind", Destination: constants.KubeletPKIDir, Source: constants.SystemKubeletPKIDir, Options: []string{"rbind", "ro"}},
)
}

Expand Down Expand Up @@ -164,3 +179,55 @@ func (o *APID) HealthFunc(runtime.Runtime) health.Check {
func (o *APID) HealthSettings(runtime.Runtime) *health.Settings {
return &health.DefaultSettings
}

func (o *APID) syncKubeletPKI() {
copyAll := func() {
if err := copy.Dir(constants.KubeletPKIDir, constants.SystemKubeletPKIDir, copy.WithMode(0o700)); err != nil {
log.Printf("failed to sync %s dir contents into %s: %s", constants.KubeletPKIDir, constants.SystemKubeletPKIDir, err)

return
}

if err := copy.File(constants.KubeletKubeconfig, filepath.Join(constants.SystemKubeletPKIDir, filepath.Base(constants.KubeletKubeconfig)), copy.WithMode(0o700)); err != nil {
log.Printf("failed to sync %s into %s: %s", constants.KubeletKubeconfig, constants.SystemKubeletPKIDir, err)

return
}
}

copyAll()

o.ctx, o.cancel = context.WithCancel(context.Background())
o.wg.Add(1)

go func() {
defer o.wg.Done()

watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Printf("failed to create directory watcher %s", err)

return
}

defer watcher.Close() //nolint:errcheck

err = watcher.Add(constants.KubeletPKIDir)
if err != nil {
log.Printf("failed to watch dir %s %s", constants.KubeletPKIDir, err)

return
}

for {
select {
case <-o.ctx.Done():
return
case <-watcher.Events:
copyAll()
case err = <-watcher.Errors:
log.Printf("directory watch error %s", err)
}
}
}()
}
62 changes: 49 additions & 13 deletions pkg/copy/copy.go
Expand Up @@ -12,14 +12,19 @@ import (
)

// File copies the `src` file to the `dst` file.
func File(src, dst string) error {
func File(src, dst string, setters ...Option) error {
var (
err error
s *os.File
d *os.File
info os.FileInfo
err error
s *os.File
d *os.File
info os.FileInfo
options Options
)

for _, setter := range setters {
setter(&options)
}

if s, err = os.Open(src); err != nil {
return err
}
Expand All @@ -45,22 +50,38 @@ func File(src, dst string) error {
return err
}

return os.Chmod(dst, info.Mode())
mode := info.Mode()
if options.Mode != 0 {
mode = options.Mode
}

return os.Chmod(dst, mode)
}

// Dir copies the `src` directory to the `dst` directory.
func Dir(src, dst string) error {
func Dir(src, dst string, setters ...Option) error {
var (
err error
files []os.FileInfo
info os.FileInfo
err error
files []os.FileInfo
info os.FileInfo
options Options
)

for _, setter := range setters {
setter(&options)
}

if info, err = os.Stat(src); err != nil {
return err
}

if err = os.MkdirAll(dst, info.Mode()); err != nil {
mode := info.Mode()

if options.Mode != 0 {
mode = options.Mode
}

if err = os.MkdirAll(dst, mode); err != nil {
return err
}

Expand All @@ -73,15 +94,30 @@ func Dir(src, dst string) error {
d := path.Join(dst, file.Name())

if file.IsDir() {
if err = Dir(s, d); err != nil {
if err = Dir(s, d, setters...); err != nil {
return err
}
} else {
if err = File(s, d); err != nil {
if err = File(s, d, setters...); err != nil {
return err
}
}
}

return nil
}

// Option represents copy option.
type Option func(o *Options)

// Options represents copy options.
type Options struct {
Mode os.FileMode
}

// WithMode sets destination files filemode.
func WithMode(m os.FileMode) Option {
return func(o *Options) {
o.Mode = m
}
}
5 changes: 4 additions & 1 deletion pkg/machinery/constants/constants.go
Expand Up @@ -181,9 +181,12 @@ const (
// KubeletPort is the kubelet port for secure API.
KubeletPort = 10250

// KubeletPKIDir is the path to the directory where kubelet stores issues certificates and keys.
// KubeletPKIDir is the path to the directory where kubelet stores issued certificates and keys.
KubeletPKIDir = "/var/lib/kubelet/pki"

// SystemKubeletPKIDir is the path to the directory where Talos copies kubelet issued certificates and keys.
SystemKubeletPKIDir = "/system/secrets/kubelet"

// DefaultKubernetesVersion is the default target version of the control plane.
DefaultKubernetesVersion = "1.20.2"

Expand Down

0 comments on commit a07cfbd

Please sign in to comment.