Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple metrics endpoint from the aggregated server #829

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/metrics-server/app/options/options.go
Expand Up @@ -42,6 +42,7 @@ type Options struct {
KubeletClient *KubeletClientOptions

MetricResolution time.Duration
MetricsAddress string
ShowVersion bool
Kubeconfig string

Expand All @@ -62,6 +63,7 @@ func (o *Options) Flags() (fs flag.NamedFlagSets) {
msfs.DurationVar(&o.MetricResolution, "metric-resolution", o.MetricResolution, "The resolution at which metrics-server will retain metrics, must set value at least 10s.")
msfs.BoolVar(&o.ShowVersion, "version", false, "Show version")
msfs.StringVar(&o.Kubeconfig, "kubeconfig", o.Kubeconfig, "The path to the kubeconfig used to connect to the Kubernetes API server and the Kubelets (defaults to in-cluster config)")
msfs.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address used for exposing prometheus metrics (optional)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
msfs.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address used for exposing prometheus metrics (optional)")
msfs.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "Separate insecure address used for exposing prometheus metrics (optional)")


o.KubeletClient.AddFlags(fs.FlagSet("kubelet client"))
o.SecureServing.AddFlags(fs.FlagSet("apiserver secure serving"))
Expand All @@ -84,6 +86,7 @@ func NewOptions() *Options {
KubeletClient: NewKubeletClientOptions(),

MetricResolution: 60 * time.Second,
MetricsAddress: "",
}
}

Expand All @@ -101,6 +104,7 @@ func (o Options) ServerConfig() (*server.Config, error) {
Rest: restConfig,
Kubelet: o.KubeletClient.Config(restConfig),
MetricResolution: o.MetricResolution,
MetricsAddress: o.MetricsAddress,
ScrapeTimeout: time.Duration(float64(o.MetricResolution) * 0.90), // scrape timeout is 90% of the scrape interval
}, nil
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -5,6 +5,7 @@ go 1.17
require (
github.com/google/addlicense v1.0.0
github.com/google/go-cmp v0.5.5
github.com/oklog/run v1.0.0
github.com/onsi/ginkgo v1.14.0
github.com/onsi/gomega v1.10.1
github.com/prometheus/common v0.26.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -341,6 +341,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/config.go
Expand Up @@ -40,6 +40,7 @@ type Config struct {
Rest *rest.Config
Kubelet *client.KubeletClientConfig
MetricResolution time.Duration
MetricsAddress string
ScrapeTimeout time.Duration
}

Expand Down Expand Up @@ -81,6 +82,8 @@ func (c Config) Complete() (*server, error) {
nodes.Informer(),
podInformer.Informer(),
genericServer,
metricsHandler,
c.MetricsAddress,
store,
scrape,
c.MetricResolution,
Expand Down
66 changes: 61 additions & 5 deletions pkg/server/server.go
Expand Up @@ -21,6 +21,8 @@ import (
"sync"
"time"

"github.com/oklog/run"

genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -57,12 +59,18 @@ func RegisterServerMetrics(registrationFunc func(metrics.Registerable) error, re
func NewServer(
nodes cache.Controller,
pods cache.Controller,
apiserver *genericapiserver.GenericAPIServer, storage storage.Storage,
scraper scraper.Scraper, resolution time.Duration) *server {
apiserver *genericapiserver.GenericAPIServer,
metricsHandler http.HandlerFunc,
metricsAddress string,
storage storage.Storage,
scraper scraper.Scraper, resolution time.Duration,
) *server {
return &server{
nodes: nodes,
pods: pods,
GenericAPIServer: apiserver,
metricsHandler: metricsHandler,
metricsAddress: metricsAddress,
storage: storage,
scraper: scraper,
resolution: resolution,
Expand All @@ -72,6 +80,8 @@ func NewServer(
// server scrapes metrics and serves then using k8s api.
type server struct {
*genericapiserver.GenericAPIServer
metricsHandler http.HandlerFunc
metricsAddress string

pods cache.Controller
nodes cache.Controller
Expand Down Expand Up @@ -105,9 +115,23 @@ func (s *server) RunUntil(stopCh <-chan struct{}) error {
return nil
}

// Start serving API and scrape loop
go s.runScrape(ctx)
return s.GenericAPIServer.PrepareRun().Run(stopCh)
g := run.Group{}
s.addRunScrape(ctx, &g)
s.addGenericAPIServer(stopCh, cancel, &g)
if s.metricsAddress != "" {
s.addMetricsServer(ctx, &g)
}

return g.Run()
}

func (s *server) newMetricsServer() *http.Server {
mux := http.NewServeMux()
mux.Handle("/metrics", s.metricsHandler)
return &http.Server{
Addr: s.metricsAddress,
Handler: mux,
}
}

func (s *server) runScrape(ctx context.Context) {
Expand Down Expand Up @@ -144,6 +168,38 @@ func (s *server) tick(ctx context.Context, startTime time.Time) {
klog.V(6).InfoS("Scraping cycle complete")
}

func (s *server) addRunScrape(ctx context.Context, g *run.Group) {
ctx, cancel := context.WithCancel(ctx)
g.Add(func() error {
s.runScrape(ctx)
return nil
}, func(err error) {
cancel()
})
}

func (s *server) addMetricsServer(ctx context.Context, g *run.Group) {
ctx, cancel := context.WithCancel(ctx)
metricsServer := s.newMetricsServer()
g.Add(func() error {
return metricsServer.ListenAndServe()
}, func(err error) {
klog.InfoS("Shutting down metrics handler")
if err := metricsServer.Shutdown(ctx); err != nil {
klog.ErrorS(err, "Could not gracefully shut down metrics handler")
}
cancel()
})
}

func (s *server) addGenericAPIServer(stopCh <-chan struct{}, cancel context.CancelFunc, g *run.Group) {
g.Add(func() error {
return s.GenericAPIServer.PrepareRun().Run(stopCh)
}, func(err error) {
cancel()
})
}

func (s *server) RegisterProbes(waiter cacheSyncWaiter) error {
err := s.AddReadyzChecks(s.probeMetricStorageReady("metric-storage-ready"))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_test.go
Expand Up @@ -58,7 +58,7 @@ var _ = Describe("Server", func() {
},
}
store = &storageMock{}
server = NewServer(nil, nil, nil, store, scraper, resolution)
server = NewServer(nil, nil, nil, nil, "", store, scraper, resolution)
})

It("metric-collection-timely probe should pass before first scrape tick finishes", func() {
Expand Down