Skip to content

Commit

Permalink
fix: refresh control plane endpoints on worker apids on schedule
Browse files Browse the repository at this point in the history
This moves endpoint refresh from the context of the service `apid` in
`machined` into `apid` service itself for the workers. `apid` does
initial poll for the endpoints when it boots, but also periodically
polls for new endpoints to make sure it has accurate list of `trustd`
endpoints to talk to, this handles cases when control plane endpoints
change (e.g. rolling replace of control plane nodes with new IPs).

Related to #3069

Fixes #3068

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira authored and talos-bot committed Feb 3, 2021
1 parent 47c260e commit 5855b8d
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 64 deletions.
20 changes: 17 additions & 3 deletions internal/app/apid/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@ import (
"github.com/talos-systems/talos/pkg/startup"
)

var endpoints *string
var (
endpoints *string
useK8sEndpoints *bool
)

func init() {
// Explicitly disable memory profiling to save around 1.4MiB of memory.
runtime.MemProfileRate = 0

log.SetFlags(log.Lshortfile | log.Ldate | log.Lmicroseconds | log.Ltime)

endpoints = flag.String("endpoints", "", "the IPs of the control plane nodes")
endpoints = flag.String("endpoints", "", "the static list of IPs of the control plane nodes")
useK8sEndpoints = flag.Bool("use-kubernetes-endpoints", false, "use Kubernetes master node endpoints as control plane endpoints")

flag.Parse()
}
Expand All @@ -49,7 +53,17 @@ func main() {
log.Fatalf("open config: %v", err)
}

tlsConfig, err := provider.NewTLSConfig(config, strings.Split(*endpoints, ","))
var endpointsProvider provider.Endpoints

if *useK8sEndpoints {
endpointsProvider = &provider.KubernetesEndpoints{}
} else {
endpointsProvider = &provider.StaticEndpoints{
Endpoints: strings.Split(*endpoints, ","),
}
}

tlsConfig, err := provider.NewTLSConfig(config, endpointsProvider)
if err != nil {
log.Fatalf("failed to create remote certificate provider: %+v", err)
}
Expand Down
57 changes: 57 additions & 0 deletions internal/app/apid/pkg/provider/endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package provider

import (
"context"
"fmt"
"time"

"github.com/talos-systems/go-retry/retry"

"github.com/talos-systems/talos/pkg/kubernetes"
)

// Endpoints interfaces describes a control plane endpoints provider.
type Endpoints interface {
GetEndpoints() (endpoints []string, err error)
}

// StaticEndpoints provides static list of endpoints.
type StaticEndpoints struct {
Endpoints []string
}

// GetEndpoints implements Endpoints inteface.
func (e *StaticEndpoints) GetEndpoints() (endpoints []string, err error) {
return e.Endpoints, nil
}

// KubernetesEndpoints provides dynamic list of control plane endpoints via Kubernetes Endpoints resource.
type KubernetesEndpoints struct{}

// GetEndpoints implements Endpoints inteface.
func (e *KubernetesEndpoints) GetEndpoints() (endpoints []string, err error) {
err = retry.Constant(8*time.Minute, retry.WithUnits(3*time.Second), retry.WithJitter(time.Second), retry.WithErrorLogging(true)).Retry(func() error {
ctx, ctxCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer ctxCancel()

var client *kubernetes.Client

client, err = kubernetes.NewClientFromKubeletKubeconfig()
if err != nil {
return retry.ExpectedError(fmt.Errorf("failed to create client: %w", err))
}

endpoints, err = client.MasterIPs(ctx)
if err != nil {
return retry.ExpectedError(err)
}

return nil
})

return endpoints, err
}
67 changes: 60 additions & 7 deletions internal/app/apid/pkg/provider/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

// Package provider provides TLS config for client & server
// Package provider provides TLS config for client & server.
package provider

import (
stdlibtls "crypto/tls"
"fmt"
"log"
stdlibnet "net"
"reflect"
"sort"
"time"

"github.com/talos-systems/crypto/tls"
"github.com/talos-systems/crypto/x509"
"github.com/talos-systems/net"

"github.com/talos-systems/talos/pkg/grpc/gen"
Expand All @@ -19,11 +24,14 @@ import (

// TLSConfig provides client & server TLS configs for apid.
type TLSConfig struct {
endpoints Endpoints
lastEndpointList []string
generator *gen.RemoteGenerator
certificateProvider tls.CertificateProvider
}

// NewTLSConfig builds provider from configuration and endpoints.
func NewTLSConfig(config config.Provider, endpoints []string) (*TLSConfig, error) {
func NewTLSConfig(config config.Provider, endpoints Endpoints) (*TLSConfig, error) {
ips, err := net.IPAddrs()
if err != nil {
return nil, fmt.Errorf("failed to discover IP addresses: %w", err)
Expand All @@ -42,25 +50,37 @@ func NewTLSConfig(config config.Provider, endpoints []string) (*TLSConfig, error
}
}

generator, err := gen.NewRemoteGenerator(
endpointList, err := endpoints.GetEndpoints()
if err != nil {
return nil, fmt.Errorf("failed to fetch initial endpoint list: %w", err)
}

sort.Strings(endpointList)

tlsConfig := &TLSConfig{
endpoints: endpoints,
lastEndpointList: endpointList,
}

tlsConfig.generator, err = gen.NewRemoteGenerator(
config.Machine().Security().Token(),
endpoints,
endpointList,
)
if err != nil {
return nil, fmt.Errorf("failed to create remote certificate genertor: %w", err)
}

tlsConfig := &TLSConfig{}

tlsConfig.certificateProvider, err = tls.NewRenewingCertificateProvider(
generator,
tlsConfig.generator,
dnsNames,
ips,
)
if err != nil {
return nil, err
}

go tlsConfig.refreshEndpoints()

return tlsConfig, nil
}

Expand Down Expand Up @@ -91,3 +111,36 @@ func (tlsConfig *TLSConfig) ClientConfig() (*stdlibtls.Config, error) {
tls.WithClientCertificateProvider(tlsConfig.certificateProvider),
)
}

func (tlsConfig *TLSConfig) refreshEndpoints() {
// refresh endpoints 1/20 of the default certificate validity time
ticker := time.NewTicker(x509.DefaultCertificateValidityDuration / 20)
defer ticker.Stop()

for {
<-ticker.C

endpointList, err := tlsConfig.endpoints.GetEndpoints()
if err != nil {
log.Printf("error refreshing endpoints: %s", err)

continue
}

sort.Strings(endpointList)

if reflect.DeepEqual(tlsConfig.lastEndpointList, endpointList) {
continue
}

if err = tlsConfig.generator.SetEndpoints(endpointList); err != nil {
log.Printf("error setting new endpoints %v: %s", endpointList, err)

continue
}

tlsConfig.lastEndpointList = endpointList

log.Printf("updated control plane endpoints to %v", endpointList)
}
}
48 changes: 16 additions & 32 deletions internal/app/machined/pkg/system/services/apid.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ import (
"os"
"path/filepath"
"strings"
"time"

"github.com/containerd/containerd/oci"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/talos-systems/go-retry/retry"

"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/events"
Expand All @@ -27,7 +25,6 @@ import (
"github.com/talos-systems/talos/internal/app/machined/pkg/system/runner/restart"
"github.com/talos-systems/talos/internal/pkg/containers/image"
"github.com/talos-systems/talos/pkg/conditions"
"github.com/talos-systems/talos/pkg/kubernetes"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
)
Expand Down Expand Up @@ -70,58 +67,45 @@ func (o *APID) DependsOn(r runtime.Runtime) []string {
}

// Runner implements the Service interface.
//
//nolint: gocyclo
func (o *APID) Runner(r runtime.Runtime) (runner.Runner, error) {
image := "talos/apid"

endpoints := []string{"127.0.0.1"}

// Ensure socket dir exists
if err := os.MkdirAll(filepath.Dir(constants.APISocketPath), 0o750); err != nil {
return nil, err
}

if r.Config().Machine().Type() == machine.TypeJoin {
opts := []retry.Option{retry.WithUnits(3 * time.Second), retry.WithJitter(time.Second)}

err := retry.Constant(8*time.Minute, opts...).Retry(func() error {
ctx, ctxCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer ctxCancel()

h, err := kubernetes.NewClientFromKubeletKubeconfig()
if err != nil {
return retry.ExpectedError(fmt.Errorf("failed to create client: %w", err))
}

endpoints, err = h.MasterIPs(ctx)
if err != nil {
return retry.ExpectedError(err)
}

return nil
})
if err != nil {
return nil, err
}
}

// Set the process arguments.
args := runner.Args{
ID: o.ID(r),
ProcessArgs: []string{
"/apid",
"--endpoints=" + strings.Join(endpoints, ","),
},
}

isWorker := r.Config().Machine().Type() == machine.TypeJoin

if !isWorker {
args.ProcessArgs = append(args.ProcessArgs, "--endpoints="+strings.Join([]string{"127.0.0.1"}, ","))
} else {
args.ProcessArgs = append(args.ProcessArgs, "--use-kubernetes-endpoints")
}

// Set the mounts.
mounts := []specs.Mount{
{Type: "bind", Destination: "/etc/ssl", Source: "/etc/ssl", Options: []string{"bind", "ro"}},
{Type: "bind", Destination: filepath.Dir(constants.RouterdSocketPath), Source: filepath.Dir(constants.RouterdSocketPath), Options: []string{"rbind", "ro"}},
{Type: "bind", Destination: filepath.Dir(constants.APISocketPath), Source: filepath.Dir(constants.APISocketPath), Options: []string{"rbind", "rw"}},
}

if isWorker {
// worker requires kubelet config to refresh the certs via Kubernetes
mounts = append(mounts,
specs.Mount{Type: "bind", Destination: filepath.Dir(constants.KubeletKubeconfig), Source: filepath.Dir(constants.KubeletKubeconfig), Options: []string{"rbind", "ro"}},
specs.Mount{Type: "bind", Destination: constants.KubeletPKIDir, Source: constants.KubeletPKIDir, Options: []string{"rbind", "ro"}},
)
}

env := []string{}

for key, val := range r.Config().Machine().Env() {
Expand Down
Loading

0 comments on commit 5855b8d

Please sign in to comment.