diff --git a/internal/app/apid/main.go b/internal/app/apid/main.go index 84ff5e55fc..f9e2d8278d 100644 --- a/internal/app/apid/main.go +++ b/internal/app/apid/main.go @@ -26,7 +26,10 @@ import ( "github.com/talos-systems/talos/pkg/startup" ) -var endpoints *string +var ( + endpoints *string + useK8sEndpoints *bool +) func init() { // Explicitly disable memory profiling to save around 1.4MiB of memory. @@ -34,7 +37,8 @@ func init() { log.SetFlags(log.Lshortfile | log.Ldate | log.Lmicroseconds | log.Ltime) - endpoints = flag.String("endpoints", "", "the IPs of the control plane nodes") + endpoints = flag.String("endpoints", "", "the static list of IPs of the control plane nodes") + useK8sEndpoints = flag.Bool("use-kubernetes-endpoints", false, "use Kubernetes master node endpoints as control plane endpoints") flag.Parse() } @@ -49,7 +53,17 @@ func main() { log.Fatalf("open config: %v", err) } - tlsConfig, err := provider.NewTLSConfig(config, strings.Split(*endpoints, ",")) + var endpointsProvider provider.Endpoints + + if *useK8sEndpoints { + endpointsProvider = &provider.KubernetesEndpoints{} + } else { + endpointsProvider = &provider.StaticEndpoints{ + Endpoints: strings.Split(*endpoints, ","), + } + } + + tlsConfig, err := provider.NewTLSConfig(config, endpointsProvider) if err != nil { log.Fatalf("failed to create remote certificate provider: %+v", err) } diff --git a/internal/app/apid/pkg/provider/endpoints.go b/internal/app/apid/pkg/provider/endpoints.go new file mode 100644 index 0000000000..66dbc20a81 --- /dev/null +++ b/internal/app/apid/pkg/provider/endpoints.go @@ -0,0 +1,57 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package provider + +import ( + "context" + "fmt" + "time" + + "github.com/talos-systems/go-retry/retry" + + "github.com/talos-systems/talos/pkg/kubernetes" +) + +// Endpoints interfaces describes a control plane endpoints provider. +type Endpoints interface { + GetEndpoints() (endpoints []string, err error) +} + +// StaticEndpoints provides static list of endpoints. +type StaticEndpoints struct { + Endpoints []string +} + +// GetEndpoints implements Endpoints inteface. +func (e *StaticEndpoints) GetEndpoints() (endpoints []string, err error) { + return e.Endpoints, nil +} + +// KubernetesEndpoints provides dynamic list of control plane endpoints via Kubernetes Endpoints resource. +type KubernetesEndpoints struct{} + +// GetEndpoints implements Endpoints inteface. +func (e *KubernetesEndpoints) GetEndpoints() (endpoints []string, err error) { + err = retry.Constant(8*time.Minute, retry.WithUnits(3*time.Second), retry.WithJitter(time.Second), retry.WithErrorLogging(true)).Retry(func() error { + ctx, ctxCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer ctxCancel() + + var client *kubernetes.Client + + client, err = kubernetes.NewClientFromKubeletKubeconfig() + if err != nil { + return retry.ExpectedError(fmt.Errorf("failed to create client: %w", err)) + } + + endpoints, err = client.MasterIPs(ctx) + if err != nil { + return retry.ExpectedError(err) + } + + return nil + }) + + return endpoints, err +} diff --git a/internal/app/apid/pkg/provider/tls.go b/internal/app/apid/pkg/provider/tls.go index 9e88e127f9..4cdf86db5f 100644 --- a/internal/app/apid/pkg/provider/tls.go +++ b/internal/app/apid/pkg/provider/tls.go @@ -2,15 +2,20 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -// Package provider provides TLS config for client & server +// Package provider provides TLS config for client & server. package provider import ( stdlibtls "crypto/tls" "fmt" + "log" stdlibnet "net" + "reflect" + "sort" + "time" "github.com/talos-systems/crypto/tls" + "github.com/talos-systems/crypto/x509" "github.com/talos-systems/net" "github.com/talos-systems/talos/pkg/grpc/gen" @@ -19,11 +24,14 @@ import ( // TLSConfig provides client & server TLS configs for apid. type TLSConfig struct { + endpoints Endpoints + lastEndpointList []string + generator *gen.RemoteGenerator certificateProvider tls.CertificateProvider } // NewTLSConfig builds provider from configuration and endpoints. -func NewTLSConfig(config config.Provider, endpoints []string) (*TLSConfig, error) { +func NewTLSConfig(config config.Provider, endpoints Endpoints) (*TLSConfig, error) { ips, err := net.IPAddrs() if err != nil { return nil, fmt.Errorf("failed to discover IP addresses: %w", err) @@ -42,18 +50,28 @@ func NewTLSConfig(config config.Provider, endpoints []string) (*TLSConfig, error } } - generator, err := gen.NewRemoteGenerator( + endpointList, err := endpoints.GetEndpoints() + if err != nil { + return nil, fmt.Errorf("failed to fetch initial endpoint list: %w", err) + } + + sort.Strings(endpointList) + + tlsConfig := &TLSConfig{ + endpoints: endpoints, + lastEndpointList: endpointList, + } + + tlsConfig.generator, err = gen.NewRemoteGenerator( config.Machine().Security().Token(), - endpoints, + endpointList, ) if err != nil { return nil, fmt.Errorf("failed to create remote certificate genertor: %w", err) } - tlsConfig := &TLSConfig{} - tlsConfig.certificateProvider, err = tls.NewRenewingCertificateProvider( - generator, + tlsConfig.generator, dnsNames, ips, ) @@ -61,6 +79,8 @@ func NewTLSConfig(config config.Provider, endpoints []string) (*TLSConfig, error return nil, err } + go tlsConfig.refreshEndpoints() + return tlsConfig, nil } @@ -91,3 +111,36 @@ func (tlsConfig *TLSConfig) ClientConfig() (*stdlibtls.Config, error) { tls.WithClientCertificateProvider(tlsConfig.certificateProvider), ) } + +func (tlsConfig *TLSConfig) refreshEndpoints() { + // refresh endpoints 1/20 of the default certificate validity time + ticker := time.NewTicker(x509.DefaultCertificateValidityDuration / 20) + defer ticker.Stop() + + for { + <-ticker.C + + endpointList, err := tlsConfig.endpoints.GetEndpoints() + if err != nil { + log.Printf("error refreshing endpoints: %s", err) + + continue + } + + sort.Strings(endpointList) + + if reflect.DeepEqual(tlsConfig.lastEndpointList, endpointList) { + continue + } + + if err = tlsConfig.generator.SetEndpoints(endpointList); err != nil { + log.Printf("error setting new endpoints %v: %s", endpointList, err) + + continue + } + + tlsConfig.lastEndpointList = endpointList + + log.Printf("updated control plane endpoints to %v", endpointList) + } +} diff --git a/internal/app/machined/pkg/system/services/apid.go b/internal/app/machined/pkg/system/services/apid.go index 8ca1cd2a43..32711b64a4 100644 --- a/internal/app/machined/pkg/system/services/apid.go +++ b/internal/app/machined/pkg/system/services/apid.go @@ -13,11 +13,9 @@ import ( "os" "path/filepath" "strings" - "time" "github.com/containerd/containerd/oci" specs "github.com/opencontainers/runtime-spec/specs-go" - "github.com/talos-systems/go-retry/retry" "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" "github.com/talos-systems/talos/internal/app/machined/pkg/system/events" @@ -27,7 +25,6 @@ import ( "github.com/talos-systems/talos/internal/app/machined/pkg/system/runner/restart" "github.com/talos-systems/talos/internal/pkg/containers/image" "github.com/talos-systems/talos/pkg/conditions" - "github.com/talos-systems/talos/pkg/kubernetes" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" "github.com/talos-systems/talos/pkg/machinery/constants" ) @@ -70,51 +67,30 @@ func (o *APID) DependsOn(r runtime.Runtime) []string { } // Runner implements the Service interface. -// -//nolint: gocyclo func (o *APID) Runner(r runtime.Runtime) (runner.Runner, error) { image := "talos/apid" - endpoints := []string{"127.0.0.1"} - // Ensure socket dir exists if err := os.MkdirAll(filepath.Dir(constants.APISocketPath), 0o750); err != nil { return nil, err } - if r.Config().Machine().Type() == machine.TypeJoin { - opts := []retry.Option{retry.WithUnits(3 * time.Second), retry.WithJitter(time.Second)} - - err := retry.Constant(8*time.Minute, opts...).Retry(func() error { - ctx, ctxCancel := context.WithTimeout(context.Background(), 30*time.Second) - defer ctxCancel() - - h, err := kubernetes.NewClientFromKubeletKubeconfig() - if err != nil { - return retry.ExpectedError(fmt.Errorf("failed to create client: %w", err)) - } - - endpoints, err = h.MasterIPs(ctx) - if err != nil { - return retry.ExpectedError(err) - } - - return nil - }) - if err != nil { - return nil, err - } - } - // Set the process arguments. args := runner.Args{ ID: o.ID(r), ProcessArgs: []string{ "/apid", - "--endpoints=" + strings.Join(endpoints, ","), }, } + isWorker := r.Config().Machine().Type() == machine.TypeJoin + + if !isWorker { + args.ProcessArgs = append(args.ProcessArgs, "--endpoints="+strings.Join([]string{"127.0.0.1"}, ",")) + } else { + args.ProcessArgs = append(args.ProcessArgs, "--use-kubernetes-endpoints") + } + // Set the mounts. mounts := []specs.Mount{ {Type: "bind", Destination: "/etc/ssl", Source: "/etc/ssl", Options: []string{"bind", "ro"}}, @@ -122,6 +98,14 @@ func (o *APID) Runner(r runtime.Runtime) (runner.Runner, error) { {Type: "bind", Destination: filepath.Dir(constants.APISocketPath), Source: filepath.Dir(constants.APISocketPath), Options: []string{"rbind", "rw"}}, } + if isWorker { + // worker requires kubelet config to refresh the certs via Kubernetes + mounts = append(mounts, + specs.Mount{Type: "bind", Destination: filepath.Dir(constants.KubeletKubeconfig), Source: filepath.Dir(constants.KubeletKubeconfig), Options: []string{"rbind", "ro"}}, + specs.Mount{Type: "bind", Destination: constants.KubeletPKIDir, Source: constants.KubeletPKIDir, Options: []string{"rbind", "ro"}}, + ) + } + env := []string{} for key, val := range r.Config().Machine().Env() { diff --git a/pkg/grpc/gen/remote.go b/pkg/grpc/gen/remote.go index 16fefdd55c..8ad3879688 100644 --- a/pkg/grpc/gen/remote.go +++ b/pkg/grpc/gen/remote.go @@ -9,6 +9,7 @@ import ( "fmt" "log" "strings" + "sync" "time" "github.com/talos-systems/crypto/x509" @@ -28,9 +29,13 @@ func init() { // RemoteGenerator represents the OS identity generator. type RemoteGenerator struct { - client securityapi.SecurityServiceClient + done chan struct{} + creds basic.Credentials + + // connMu protects conn & client + connMu sync.Mutex conn *grpc.ClientConn - done chan struct{} + client securityapi.SecurityServiceClient } // NewRemoteGenerator initializes a RemoteGenerator with a preconfigured grpc.ClientConn. @@ -39,35 +44,32 @@ func NewRemoteGenerator(token string, endpoints []string) (g *RemoteGenerator, e return nil, fmt.Errorf("at least one root of trust endpoint is required") } - creds := basic.NewTokenCredentials(token) - - conn, err := basic.NewConnection(fmt.Sprintf("%s:///%s", trustdResolverScheme, strings.Join(endpoints, ",")), creds) - if err != nil { - return nil, err + g = &RemoteGenerator{ + done: make(chan struct{}), + creds: basic.NewTokenCredentials(token), } - client := securityapi.NewSecurityServiceClient(conn) - - g = &RemoteGenerator{ - client: client, - conn: conn, - done: make(chan struct{}), + if err = g.SetEndpoints(endpoints); err != nil { + return nil, err } return g, nil } -// Certificate implements the securityapi.SecurityClient interface. -func (g *RemoteGenerator) Certificate(in *securityapi.CertificateRequest) (resp *securityapi.CertificateResponse, err error) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - resp, err = g.client.Certificate(ctx, in) +// SetEndpoints updates the list of endpoints to talk to. +func (g *RemoteGenerator) SetEndpoints(endpoints []string) error { + conn, err := basic.NewConnection(fmt.Sprintf("%s:///%s", trustdResolverScheme, strings.Join(endpoints, ",")), g.creds) if err != nil { - return nil, err + return err } - return resp, err + g.connMu.Lock() + defer g.connMu.Unlock() + + g.conn = conn + g.client = securityapi.NewSecurityServiceClient(g.conn) + + return nil } // Identity creates an identity certificate via the security API. @@ -91,6 +93,16 @@ func (g *RemoteGenerator) Close() error { return g.conn.Close() } +func (g *RemoteGenerator) certificate(in *securityapi.CertificateRequest) (resp *securityapi.CertificateResponse, err error) { + g.connMu.Lock() + defer g.connMu.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + return g.client.Certificate(ctx, in) +} + func (g *RemoteGenerator) poll(in *securityapi.CertificateRequest) (ca, crt []byte, err error) { timeout := time.NewTimer(time.Minute * 5) defer timeout.Stop() @@ -105,7 +117,7 @@ func (g *RemoteGenerator) poll(in *securityapi.CertificateRequest) (ca, crt []by case <-tick.C: var resp *securityapi.CertificateResponse - resp, err = g.Certificate(in) + resp, err = g.certificate(in) if err != nil { log.Println(err)