Skip to content

Commit

Permalink
Merge pull request #5778 from timoreimann/support-customizing-etcd-di…
Browse files Browse the repository at this point in the history
…al-timeout

✨ Support customizing the etcd dial timeout, increase default to 10s
  • Loading branch information
k8s-ci-robot committed Dec 6, 2021
2 parents 2d7c00f + ce2700c commit 008f1e7
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 23 deletions.
17 changes: 11 additions & 6 deletions controlplane/kubeadm/controllers/controller.go
Expand Up @@ -61,11 +61,12 @@ import (

// KubeadmControlPlaneReconciler reconciles a KubeadmControlPlane object.
type KubeadmControlPlaneReconciler struct {
Client client.Client
APIReader client.Reader
controller controller.Controller
recorder record.EventRecorder
Tracker *remote.ClusterCacheTracker
Client client.Client
APIReader client.Reader
controller controller.Controller
recorder record.EventRecorder
Tracker *remote.ClusterCacheTracker
EtcdDialTimeout time.Duration

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
Expand Down Expand Up @@ -104,7 +105,11 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
if r.Tracker == nil {
return errors.New("cluster cache tracker is nil, cannot create the internal management cluster resource")
}
r.managementCluster = &internal.Management{Client: r.Client, Tracker: r.Tracker}
r.managementCluster = &internal.Management{
Client: r.Client,
Tracker: r.Tracker,
EtcdDialTimeout: r.EtcdDialTimeout,
}
}

if r.managementClusterUncached == nil {
Expand Down
7 changes: 4 additions & 3 deletions controlplane/kubeadm/internal/cluster.go
Expand Up @@ -49,8 +49,9 @@ type ManagementCluster interface {

// Management holds operations on the management cluster.
type Management struct {
Client client.Reader
Tracker *remote.ClusterCacheTracker
Client client.Reader
Tracker *remote.ClusterCacheTracker
EtcdDialTimeout time.Duration
}

// RemoteClusterConnectionError represents a failure to connect to a remote cluster.
Expand Down Expand Up @@ -149,7 +150,7 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O
return &Workload{
Client: c,
CoreDNSMigrator: &CoreDNSMigrator{},
etcdClientGenerator: NewEtcdClientGenerator(restConfig, tlsConfig),
etcdClientGenerator: NewEtcdClientGenerator(restConfig, tlsConfig, m.EtcdDialTimeout),
}, nil
}

Expand Down
23 changes: 14 additions & 9 deletions controlplane/kubeadm/internal/etcd/etcd.go
Expand Up @@ -29,9 +29,6 @@ import (
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/proxy"
)

// etcdTimeout is the maximum time any individual call to the etcd client through the backoff adapter will take.
const etcdTimeout = 2 * time.Second

// GRPCDial is a function that creates a connection to a given endpoint.
type GRPCDial func(ctx context.Context, addr string) (net.Conn, error)

Expand Down Expand Up @@ -125,21 +122,29 @@ func pbMemberToMember(m *etcdserverpb.Member) *Member {
}
}

// NewClient creates a new etcd client with a proxy, and a TLS configuration.
func NewClient(ctx context.Context, endpoints []string, p proxy.Proxy, tlsConfig *tls.Config) (*Client, error) {
dialer, err := proxy.NewDialer(p)
// ClientConfiguration describes the configuration for an etcd client.
type ClientConfiguration struct {
Endpoints []string
Proxy proxy.Proxy
TLSConfig *tls.Config
DialTimeout time.Duration
}

// NewClient creates a new etcd client with the given configuration.
func NewClient(ctx context.Context, config ClientConfiguration) (*Client, error) {
dialer, err := proxy.NewDialer(config.Proxy)
if err != nil {
return nil, errors.Wrap(err, "unable to create a dialer for etcd client")
}

etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: etcdTimeout,
Endpoints: config.Endpoints,
DialTimeout: config.DialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
grpc.WithContextDialer(dialer.DialContextWithAddr),
},
TLS: tlsConfig,
TLS: config.TLSConfig,
})
if err != nil {
return nil, errors.Wrap(err, "unable to create etcd client")
Expand Down
10 changes: 8 additions & 2 deletions controlplane/kubeadm/internal/etcd_client_generator.go
Expand Up @@ -19,6 +19,7 @@ package internal
import (
"context"
"crypto/tls"
"time"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -39,7 +40,7 @@ type EtcdClientGenerator struct {
type clientCreator func(ctx context.Context, endpoints []string) (*etcd.Client, error)

// NewEtcdClientGenerator returns a new etcdClientGenerator instance.
func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config) *EtcdClientGenerator {
func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcdDialTimeout time.Duration) *EtcdClientGenerator {
ecg := &EtcdClientGenerator{restConfig: restConfig, tlsConfig: tlsConfig}

ecg.createClient = func(ctx context.Context, endpoints []string) (*etcd.Client, error) {
Expand All @@ -50,7 +51,12 @@ func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config) *Etc
TLSConfig: ecg.tlsConfig,
Port: 2379,
}
return etcd.NewClient(ctx, endpoints, p, ecg.tlsConfig)
return etcd.NewClient(ctx, etcd.ClientConfiguration{
Endpoints: endpoints,
Proxy: p,
TLSConfig: tlsConfig,
DialTimeout: etcdDialTimeout,
})
}

return ecg
Expand Down
6 changes: 3 additions & 3 deletions controlplane/kubeadm/internal/etcd_client_generator_test.go
Expand Up @@ -37,7 +37,7 @@ var (

func TestNewEtcdClientGenerator(t *testing.T) {
g := NewWithT(t)
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12})
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0)
g.Expect(subject.createClient).To(Not(BeNil()))
}

Expand Down Expand Up @@ -89,7 +89,7 @@ func TestFirstAvailableNode(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12})
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0)
subject.createClient = tt.cc

client, err := subject.forFirstAvailableNode(ctx, tt.nodes)
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestForLeader(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12})
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0)
subject.createClient = tt.cc

client, err := subject.forLeader(ctx, tt.nodes)
Expand Down
5 changes: 5 additions & 0 deletions controlplane/kubeadm/main.go
Expand Up @@ -85,6 +85,7 @@ var (
webhookPort int
webhookCertDir string
healthAddr string
etcdDialTimeout time.Duration
)

// InitFlags initializes the flags.
Expand Down Expand Up @@ -128,6 +129,9 @@ func InitFlags(fs *pflag.FlagSet) {
fs.StringVar(&healthAddr, "health-addr", ":9440",
"The address the health endpoint binds to.")

fs.DurationVar(&etcdDialTimeout, "etcd-dial-timeout-duration", 10*time.Second,
"Duration that the etcd client waits at most to establish a connection with etcd")

feature.MutableGates.AddFlag(fs)
}
func main() {
Expand Down Expand Up @@ -232,6 +236,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
EtcdDialTimeout: etcdDialTimeout,
}).SetupWithManager(ctx, mgr, concurrency(kubeadmControlPlaneConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "KubeadmControlPlane")
os.Exit(1)
Expand Down

0 comments on commit 008f1e7

Please sign in to comment.