Skip to content

Commit

Permalink
fix: close Kubernetes API client
Browse files Browse the repository at this point in the history
The problem is that there's no official way to close Kuberentes client
underlying TCP/HTTP connections. So each time Talos initializes
connection to the control plane endpoint, new client is built, but this
client is never closed, so the connection stays active on the load
balancers, on the API server level, etc. It also eats some resources out
of Talos itself.

We add a way to close underlying connections by using helper from the
Kubernetes client libraries to force close all TCP connections which
should shut down all HTTP/2 connections as well.

Alternative approach might be to cache a client for some time, but many
of the clients are created with temporary PKI, so even cached client
still needs to be closed once it gets stale, and it's not clear how to
recreate a client in case existing one is broken for one reason or
another (and we need to force a re-connection).

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira authored and talos-bot committed Jul 5, 2021
1 parent aaa36f3 commit 6d13d2c
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 10 deletions.
Expand Up @@ -28,6 +28,8 @@ func (s *Server) HealthCheck(in *clusterapi.HealthCheckRequest, srv clusterapi.C
ClientProvider: clientProvider,
ForceEndpoint: in.GetClusterInfo().GetForceEndpoint(),
}
defer k8sProvider.K8sClose() //nolint:errcheck

clusterState := clusterState{
controlPlaneNodes: in.GetClusterInfo().GetControlPlaneNodes(),
workerNodes: in.GetClusterInfo().GetWorkerNodes(),
Expand Down
17 changes: 10 additions & 7 deletions internal/app/machined/pkg/controllers/k8s/endpoint.go
Expand Up @@ -89,18 +89,21 @@ func (ctrl *EndpointController) Run(ctx context.Context, r controller.Runtime, l
return err
}

client, err := kubernetes.NewClientFromKubeletKubeconfig()
if err != nil {
return fmt.Errorf("error building Kubernetes client: %w", err)
}

if err = ctrl.watchEndpoints(ctx, r, logger, client); err != nil {
if err = ctrl.watchEndpoints(ctx, r, logger); err != nil {
return err
}
}
}

func (ctrl *EndpointController) watchEndpoints(ctx context.Context, r controller.Runtime, logger *zap.Logger, client *kubernetes.Client) error {
//nolint:gocyclo
func (ctrl *EndpointController) watchEndpoints(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
client, err := kubernetes.NewClientFromKubeletKubeconfig()
if err != nil {
return fmt.Errorf("error building Kubernetes client: %w", err)
}

defer client.Close() //nolint:errcheck

ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()

Expand Down
Expand Up @@ -1110,6 +1110,8 @@ func CordonAndDrainNode(seq runtime.Sequence, data interface{}) (runtime.TaskExe
return err
}

defer kubeHelper.Close() //nolint:errcheck

return kubeHelper.CordonAndDrain(ctx, nodename)
}, "cordonAndDrainNode"
}
Expand All @@ -1136,6 +1138,8 @@ func UncordonNode(seq runtime.Sequence, data interface{}) (runtime.TaskExecution
return err
}

defer kubeHelper.Close() //nolint:errcheck

if err = kubeHelper.WaitUntilReady(ctx, nodename); err != nil {
return err
}
Expand Down Expand Up @@ -1347,6 +1351,8 @@ func LabelNodeAsMaster(seq runtime.Sequence, data interface{}) (runtime.TaskExec
return err
}

defer h.Close() //nolint:errcheck

var nodename string

if nodename, err = r.NodeName(); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/etcd/etcd.go
Expand Up @@ -78,6 +78,8 @@ func NewClientFromControlPlaneIPs(ctx context.Context, creds *x509.PEMEncodedCer
return nil, fmt.Errorf("error building kubernetes client from PKI: %w", err)
}

defer h.Close() //nolint:errcheck

var endpoints []string

if endpoints, err = h.MasterIPs(ctx); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/cluster.go
Expand Up @@ -34,6 +34,7 @@ type K8sProvider interface {
K8sRestConfig(ctx context.Context) (*rest.Config, error)
K8sClient(ctx context.Context) (*kubernetes.Clientset, error)
K8sHelper(ctx context.Context) (*k8s.Client, error)
K8sClose() error
}

// CrashDumper captures Talos cluster state to the specified writer for debugging.
Expand Down
9 changes: 9 additions & 0 deletions pkg/cluster/kubelet.go
Expand Up @@ -49,3 +49,12 @@ func (k *KubernetesFromKubeletClient) K8sHelper(ctx context.Context) (*kubernete

return k.KubeHelper, nil
}

// K8sClose closes Kubernetes client.
func (k *KubernetesFromKubeletClient) K8sClose() error {
if k.KubeHelper == nil {
return nil
}

return k.KubeHelper.Close()
}
9 changes: 9 additions & 0 deletions pkg/cluster/kubernetes.go
Expand Up @@ -109,3 +109,12 @@ func (k *KubernetesClient) K8sHelper(ctx context.Context) (*k8s.Client, error) {

return k.KubeHelper, nil
}

// K8sClose closes Kubernetes client.
func (k *KubernetesClient) K8sClose() error {
if k.KubeHelper == nil {
return nil
}

return k.KubeHelper.Close()
}
43 changes: 40 additions & 3 deletions pkg/kubernetes/kubernetes.go
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"log"
"net"
"net/url"
"time"

Expand All @@ -27,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/connrotation"

"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
Expand All @@ -41,6 +43,12 @@ const (
// Kubernetes API.
type Client struct {
*kubernetes.Clientset

dialer *connrotation.Dialer
}

func newDialer() *connrotation.Dialer {
return connrotation.NewDialer((&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext)
}

// NewClientFromKubeletKubeconfig initializes and returns a Client.
Expand All @@ -56,26 +64,42 @@ func NewClientFromKubeletKubeconfig() (client *Client, err error) {
config.Timeout = 30 * time.Second
}

dialer := newDialer()
config.Dial = dialer.DialContext

var clientset *kubernetes.Clientset

clientset, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

return &Client{clientset}, nil
return &Client{
Clientset: clientset,
dialer: dialer,
}, nil
}

// NewForConfig initializes and returns a client using the provided config.
func NewForConfig(config *restclient.Config) (client *Client, err error) {
var clientset *kubernetes.Clientset

if config.Dial != nil {
return nil, fmt.Errorf("dialer is already set")
}

dialer := newDialer()
config.Dial = dialer.DialContext

clientset, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

return &Client{clientset}, nil
return &Client{
Clientset: clientset,
dialer: dialer,
}, nil
}

// NewClientFromPKI initializes and returns a Client.
Expand All @@ -94,14 +118,20 @@ func NewClientFromPKI(ca, crt, key []byte, endpoint *url.URL) (client *Client, e
Timeout: 30 * time.Second,
}

dialer := newDialer()
config.Dial = dialer.DialContext

var clientset *kubernetes.Clientset

clientset, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

return &Client{clientset}, nil
return &Client{
Clientset: clientset,
dialer: dialer,
}, nil
}

// NewTemporaryClientFromPKI initializes a Kubernetes client using a certificate
Expand Down Expand Up @@ -132,6 +162,13 @@ func NewTemporaryClientFromPKI(ca *x509.PEMEncodedCertificateAndKey, endpoint *u
return h, nil
}

// Close all connections.
func (h *Client) Close() error {
h.dialer.CloseAll()

return nil
}

// MasterIPs returns a list of control plane endpoints (IP addresses).
func (h *Client) MasterIPs(ctx context.Context) (addrs []string, err error) {
endpoints, err := h.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{})
Expand Down

0 comments on commit 6d13d2c

Please sign in to comment.