Skip to content

Commit

Permalink
Refactor kubeletClient
Browse files Browse the repository at this point in the history
Move address resover into client and merge all config setup.
  • Loading branch information
serathius committed Jun 8, 2020
1 parent bd8fa17 commit 0d4e211
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 287 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Expand Up @@ -14,7 +14,7 @@ If your repo has certain guidelines for contribution, put them here ahead of the

- [Contributor License Agreement](https://git.k8s.io/community/CLA.md) Kubernetes projects require that you sign a Contributor License Agreement (CLA) before we can accept your pull requests
- [Kubernetes Contributor Guide](https://git.k8s.io/community/contributors/guide) - Main contributor documentation, or you can just jump directly to the [contributing section](https://git.k8s.io/community/contributors/guide#contributing)
- [Contributor Cheat Sheet](https://git.k8s.io/community/contributors/guide/contributor-cheatsheet) - Common resources for existing developers
- [Contributor Cheat Sheet](https://git.k8s.io/community/contributors/guide/contributor-cheatsheet.md) - Common resources for existing developers

## Mentorship

Expand Down
50 changes: 0 additions & 50 deletions FAQ.md
Expand Up @@ -72,56 +72,6 @@ Default 60 seconds, can be changed using `metrics-resolution` flag. We are not r

## Known issues

#### Incorrectly configured front-proxy certificate

Metrics Server needs to validate requests coming from kube-apiserver. You can recognize problems with front-proxy certificate configuration if you observe line below in your logs:
```
E0524 01:37:36.055326 1 authentication.go:65] Unable to authenticate the request due to an error: x509: certificate signed by unknown authority
```

To fix this problem you need to provide kube-apiserver proxy-client CA to Metrics Server under `--requestheader-client-ca-file` flag. You can read more about this flag in [Authenticating Proxy](https://kubernetes.io/docs/reference/access-authn-authz/authentication/#authenticating-proxy)


For cluster created by kubeadm:

1. Find your front-proxy certificates by checking arguments passed in kube-apiserver config (by default located in /etc/kubernetes/manifests/kube-apiserver.yaml)

```
- --proxy-client-cert-file=/etc/kubernetes/pki/front-proxy-client.crt
- --proxy-client-key-file=/etc/kubernetes/pki/front-proxy-client.key
```

2. Create configmap including `front-proxy-ca.crt`

```
kubectl -nkube-system create configmap front-proxy-ca --from-file=front-proxy-ca.crt=/etc/kubernetes/pki/front-proxy-ca.crt -o yaml | kubectl -nkube-system replace configmap front-proxy-ca -f -
```

3. Mount configmap in Metrics Server deployment and add `--requestheader-client-ca-file` flag

```
- args:
- --requestheader-client-ca-file=/ca/front-proxy-ca.crt // ADD THIS!
- --cert-dir=/tmp
- --secure-port=4443
- --kubelet-insecure-tls // ignore validate kubelet x509
- --kubelet-preferred-address-types=InternalIP // using InternalIP to connect kubelet
volumeMounts:
- mountPath: /tmp
name: tmp-dir
- mountPath: /ca // ADD THIS!
name: ca-dir
volumes:
- emptyDir: {}
name: tmp-dir
- configMap: // ADD THIS!
defaultMode: 420
name: front-proxy-ca
name: ca-dir
```

#### Network problems

Metrics server needs to contact all nodes in cluster to collect metrics. Problems with network would can be recognized by following symptoms:
Expand Down
31 changes: 23 additions & 8 deletions cmd/metrics-server/app/options/options.go
Expand Up @@ -16,6 +16,7 @@ package options
import (
"fmt"
"net"
"os"
"strings"
"time"

Expand Down Expand Up @@ -112,13 +113,10 @@ func (o Options) MetricsServerConfig() (*metric_server.Config, error) {
if err != nil {
return nil, err
}
kubelet := o.kubeletConfig(restConfig)
addressResolver := o.addressResolverConfig()
return &metric_server.Config{
Apiserver: apiserver,
Rest: restConfig,
Kubelet: kubelet,
AddresResolver: addressResolver,
Kubelet: o.kubeletConfig(restConfig),
MetricResolution: o.MetricResolution,
ScrapeTimeout: time.Duration(float64(o.MetricResolution) * 0.90), // scrape timeout is 90% of the scrape interval
}, nil
Expand Down Expand Up @@ -169,12 +167,29 @@ func (o Options) restConfig() (*rest.Config, error) {
}

func (o Options) kubeletConfig(restConfig *rest.Config) *scraper.KubeletClientConfig {
kubeletRestCfg := rest.CopyConfig(restConfig)
config := &scraper.KubeletClientConfig{
DefaultPort: o.KubeletPort,
AddressTypePriority: o.addressResolverConfig(),
UseNodeStatusPort: o.KubeletUseNodeStatusPort,
}
if len(o.KubeletCAFile) > 0 {
kubeletRestCfg.TLSClientConfig.CAFile = o.KubeletCAFile
kubeletRestCfg.TLSClientConfig.CAData = nil
config.RESTConfig.TLSClientConfig.CAFile = o.KubeletCAFile
config.RESTConfig.TLSClientConfig.CAData = nil
}
if o.DeprecatedCompletelyInsecureKubelet {
config.Scheme = "http"
config.RESTConfig = rest.AnonymousClientConfig(config.RESTConfig) // don't use auth to avoid leaking auth details to insecure endpoints
config.RESTConfig.TLSClientConfig = rest.TLSClientConfig{} // empty TLS config --> no TLS
} else {
config.Scheme = "https"
config.RESTConfig = rest.CopyConfig(restConfig)
}
if o.InsecureKubeletTLS {
config.RESTConfig.TLSClientConfig.Insecure = true
config.RESTConfig.TLSClientConfig.CAData = nil
config.RESTConfig.TLSClientConfig.CAFile = ""
}
return scraper.GetKubeletConfig(kubeletRestCfg, o.KubeletPort, o.KubeletUseNodeStatusPort, o.InsecureKubeletTLS, o.DeprecatedCompletelyInsecureKubelet)
return config
}

func (o Options) addressResolverConfig() []corev1.NodeAddressType {
Expand Down
1 change: 0 additions & 1 deletion manifests/base/deployment.yaml
Expand Up @@ -32,7 +32,6 @@ spec:
- --cert-dir=/tmp
- --secure-port=4443
- --kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname
- --kubelet-use-node-status-port
ports:
- name: main-port
containerPort: 4443
Expand Down
24 changes: 3 additions & 21 deletions pkg/metrics-server/config.go
Expand Up @@ -17,7 +17,6 @@ import (
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -27,14 +26,12 @@ import (
"sigs.k8s.io/metrics-server/pkg/api"
"sigs.k8s.io/metrics-server/pkg/scraper"
"sigs.k8s.io/metrics-server/pkg/storage"
"sigs.k8s.io/metrics-server/pkg/utils"
)

type Config struct {
Apiserver *genericapiserver.Config
Rest *rest.Config
Kubelet *scraper.KubeletClientConfig
AddresResolver []corev1.NodeAddressType
MetricResolution time.Duration
ScrapeTimeout time.Duration
}
Expand All @@ -44,14 +41,12 @@ func (c Config) Complete() (*MetricsServer, error) {
if err != nil {
return nil, err
}
kubeletClient, err := c.kubeletClient()
kubeletClient, err := c.Kubelet.Complete()
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to construct a client to connect to the kubelets: %v", err)
}
addressResolver := c.addressResolver()

nodes := informer.Core().V1().Nodes()
scrape := scraper.NewScraper(nodes.Lister(), kubeletClient, addressResolver, c.ScrapeTimeout)
scrape := scraper.NewScraper(nodes.Lister(), kubeletClient, c.ScrapeTimeout)

scraper.RegisterScraperMetrics(c.ScrapeTimeout)
RegisterServerMetrics(c.MetricResolution)
Expand Down Expand Up @@ -86,16 +81,3 @@ func (c Config) informer() (informers.SharedInformerFactory, error) {
// so set the default resync interval to 0
return informers.NewSharedInformerFactory(kubeClient, 0), nil
}

func (c Config) kubeletClient() (scraper.KubeletInterface, error) {
kubeletClient, err := scraper.KubeletClientFor(c.Kubelet)
if err != nil {
return nil, fmt.Errorf("unable to construct a client to connect to the kubelets: %v", err)
}
return kubeletClient, nil
}

func (c Config) addressResolver() utils.NodeAddressResolver {
// set up an address resolver according to the user's priorities
return utils.NewPriorityNodeAddressResolver(c.AddresResolver)
}
33 changes: 17 additions & 16 deletions pkg/metrics-server/server.go
Expand Up @@ -78,24 +78,25 @@ func (ms *MetricsServer) RunUntil(stopCh <-chan struct{}) error {
if !shutdown {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go ms.runScrape(ctx)
return ms.GenericAPIServer.PrepareRun().Run(stopCh)
}

go func() {
ticker := time.NewTicker(ms.resolution)
defer ticker.Stop()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ms.scrape(ctx, time.Now())

for {
select {
case startTime := <-ticker.C:
ms.scrape(ctx, startTime)
case <-stopCh:
return
}
func (ms *MetricsServer) runScrape(ctx context.Context) {
ticker := time.NewTicker(ms.resolution)
defer ticker.Stop()
ms.scrape(ctx, time.Now())

for {
select {
case startTime := <-ticker.C:
ms.scrape(ctx, startTime)
case <-ctx.Done():
return
}
}()
return ms.GenericAPIServer.PrepareRun().Run(stopCh)
}
}

func (ms *MetricsServer) scrape(ctx context.Context, startTime time.Time) {
Expand Down
52 changes: 21 additions & 31 deletions pkg/scraper/client.go
Expand Up @@ -25,22 +25,28 @@ import (

"github.com/mailru/easyjson"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog"

"sigs.k8s.io/metrics-server/pkg/utils"
)

// KubeletInterface knows how to fetch metrics from the Kubelet
type KubeletInterface interface {
// GetSummary fetches summary metrics from the given Kubelet
GetSummary(ctx context.Context, info NodeInfo) (*Summary, error)
GetSummary(ctx context.Context, node *corev1.Node) (*Summary, error)
}

type kubeletClient struct {
defaultPort int
kubeletUseNodeStatusPort bool
deprecatedNoTLS bool
client *http.Client
defaultPort int
useNodeStatusPort bool
client *http.Client
scheme string
addrResolver utils.NodeAddressResolver
}

var _ KubeletInterface = (*kubeletClient)(nil)

type ErrNotFound struct {
endpoint string
}
Expand All @@ -49,11 +55,6 @@ func (err *ErrNotFound) Error() string {
return fmt.Sprintf("%q not found", err.endpoint)
}

func IsNotFoundError(err error) bool {
_, isNotFound := err.(*ErrNotFound)
return isNotFound
}

func (kc *kubeletClient) makeRequestAndGetValue(client *http.Client, req *http.Request, value easyjson.Unmarshaler) error {
// TODO(directxman12): support validating certs by hostname
response, err := client.Do(req)
Expand Down Expand Up @@ -84,18 +85,19 @@ func (kc *kubeletClient) makeRequestAndGetValue(client *http.Client, req *http.R
return nil
}

func (kc *kubeletClient) GetSummary(ctx context.Context, info NodeInfo) (*Summary, error) {
scheme := "https"
if kc.deprecatedNoTLS {
scheme = "http"
func (kc *kubeletClient) GetSummary(ctx context.Context, node *corev1.Node) (*Summary, error) {
port := kc.defaultPort
nodeStatusPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
if kc.useNodeStatusPort && nodeStatusPort != 0 {
port = nodeStatusPort
}
kubeletPort := kc.defaultPort
if kc.kubeletUseNodeStatusPort && info.KubeletPort != 0 {
kubeletPort = info.KubeletPort
addr, err := kc.addrResolver.NodeAddress(node)
if err != nil {
return nil, fmt.Errorf("unable to extract connection information for node %q: %v", node.Name, err)
}
url := url.URL{
Scheme: scheme,
Host: net.JoinHostPort(info.ConnectAddress, strconv.Itoa(kubeletPort)),
Scheme: kc.scheme,
Host: net.JoinHostPort(addr, strconv.Itoa(port)),
Path: "/stats/summary",
RawQuery: "only_cpu_and_memory=true",
}
Expand All @@ -112,15 +114,3 @@ func (kc *kubeletClient) GetSummary(ctx context.Context, info NodeInfo) (*Summar
err = kc.makeRequestAndGetValue(client, req.WithContext(ctx), summary)
return summary, err
}

func NewKubeletClient(transport http.RoundTripper, port int, kubeletUseNodeStatusPort bool, deprecatedNoTLS bool) (KubeletInterface, error) {
c := &http.Client{
Transport: transport,
}
return &kubeletClient{
defaultPort: port,
kubeletUseNodeStatusPort: kubeletUseNodeStatusPort,
client: c,
deprecatedNoTLS: deprecatedNoTLS,
}, nil
}
48 changes: 21 additions & 27 deletions pkg/scraper/configs.go
Expand Up @@ -16,44 +16,38 @@ package scraper

import (
"fmt"
"net/http"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
)

// GetKubeletConfig fetches connection config for connecting to the Kubelet.
func GetKubeletConfig(cfg *rest.Config, port int, kubeletUseNodeStatusPort bool, insecureTLS bool, completelyInsecure bool) *KubeletClientConfig {
if completelyInsecure {
cfg = rest.AnonymousClientConfig(cfg) // don't use auth to avoid leaking auth details to insecure endpoints
cfg.TLSClientConfig = rest.TLSClientConfig{} // empty TLS config --> no TLS
} else if insecureTLS {
cfg.TLSClientConfig.Insecure = true
cfg.TLSClientConfig.CAData = nil
cfg.TLSClientConfig.CAFile = ""
}
kubeletConfig := &KubeletClientConfig{
Port: port,
KubeletUseNodeStatusPort: kubeletUseNodeStatusPort,
RESTConfig: cfg,
DeprecatedCompletelyInsecure: completelyInsecure,
}

return kubeletConfig
}
"sigs.k8s.io/metrics-server/pkg/utils"
)

// KubeletClientConfig represents configuration for connecting to Kubelets.
type KubeletClientConfig struct {
KubeletUseNodeStatusPort bool
Port int
RESTConfig *rest.Config
DeprecatedCompletelyInsecure bool
RESTConfig *rest.Config
AddressTypePriority []corev1.NodeAddressType
Scheme string
DefaultPort int
UseNodeStatusPort bool
}

// KubeletClientFor constructs a new KubeletInterface for the given configuration.
func KubeletClientFor(config *KubeletClientConfig) (KubeletInterface, error) {
// Complete constructs a new kubeletCOnfig for the given configuration.
func (config KubeletClientConfig) Complete() (*kubeletClient, error) {
transport, err := rest.TransportFor(config.RESTConfig)
if err != nil {
return nil, fmt.Errorf("unable to construct transport: %v", err)
}

return NewKubeletClient(transport, config.Port, config.KubeletUseNodeStatusPort, config.DeprecatedCompletelyInsecure)
c := &http.Client{
Transport: transport,
}
return &kubeletClient{
addrResolver: utils.NewPriorityNodeAddressResolver(config.AddressTypePriority),
defaultPort: config.DefaultPort,
client: c,
scheme: config.Scheme,
useNodeStatusPort: config.UseNodeStatusPort,
}, nil
}

0 comments on commit 0d4e211

Please sign in to comment.