Skip to content

Commit

Permalink
Merge pull request #313 from mfojtik/metrics-observe-controller
Browse files Browse the repository at this point in the history
Bug 1827585: Add controller that watches leader changes and capture disk metrics
  • Loading branch information
openshift-merge-robot committed Apr 24, 2020
2 parents b58a378 + c578cb2 commit 3d9d95a
Show file tree
Hide file tree
Showing 7 changed files with 1,208 additions and 0 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ require (
github.com/openshift/build-machinery-go v0.0.0-20200211121458-5e3d6e570160
github.com/openshift/client-go v0.0.0-20200326155132-2a6cd50aedd0
github.com/openshift/library-go v0.0.0-20200422120251-a5cb46356745
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/common v0.6.0
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
github.com/vincent-petithory/dataurl v0.0.0-20191104211930-d1553a71de50
Expand Down
67 changes: 67 additions & 0 deletions pkg/operator/metriccontroller/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package metriccontroller

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"net/http"
"strings"
"time"

prometheusapi "github.com/prometheus/client_golang/api"
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
coreclientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/transport"
)

func getPrometheusClient(ctx context.Context, secretClient coreclientv1.SecretsGetter) (prometheusv1.API, error) {
secrets, err := secretClient.Secrets("openshift-monitoring").List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
bearerToken := ""
for _, s := range secrets.Items {
if s.Type != corev1.SecretTypeServiceAccountToken ||
!strings.HasPrefix(s.Name, "prometheus-k8s") {
continue
}
bearerToken = string(s.Data[corev1.ServiceAccountTokenKey])
break
}
if len(bearerToken) == 0 {
return nil, fmt.Errorf("unable to retrieve prometheus-k8 bearer token")
}

serviceCABytes, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt")
if err != nil {
return nil, err
}

roots := x509.NewCertPool()
roots.AppendCertsFromPEM(serviceCABytes)

client, err := prometheusapi.NewClient(prometheusapi.Config{
Address: "https://" + net.JoinHostPort("thanos-querier.openshift-monitoring.svc", "9091"),
RoundTripper: transport.NewBearerAuthRoundTripper(
bearerToken,
&http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{
RootCAs: roots,
},
},
),
})

return prometheusv1.NewAPI(client), nil
}
100 changes: 100 additions & 0 deletions pkg/operator/metriccontroller/fsync_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package metriccontroller

import (
"context"
"fmt"
"strings"
"time"

prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/klog"

configclientv1 "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
operatorv1helpers "github.com/openshift/library-go/pkg/operator/v1helpers"
)

type FSyncController struct {
secretClient corev1client.SecretsGetter
infraClient configclientv1.InfrastructuresGetter
}

func NewFSyncController(operatorClient operatorv1helpers.OperatorClient, infraClient configclientv1.InfrastructuresGetter, secretGetter corev1client.SecretsGetter, recorder events.Recorder) factory.Controller {
c := &FSyncController{
secretClient: secretGetter,
infraClient: infraClient,
}
return factory.New().ResyncEvery(1*time.Minute).WithSync(c.sync).ToController("FSyncController", recorder)
}

func (c *FSyncController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
client, err := getPrometheusClient(ctx, c.secretClient)
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}

// First check how many leader changes we see in last 15 minutes
etcdLeaderChangesResult, _, err := client.Query(ctx, "increase((max by (job) (etcd_server_leader_changes_seen_total) or 0*absent(etcd_server_leader_changes_seen_total))[5m:1m])", time.Now())
if err != nil {
return err
}
leaderChanges := etcdLeaderChangesResult.(model.Vector)[0].Value
klog.Infof("Etcd leader changes increase in last 5m: %s", leaderChanges)

// Do nothing if there are no leader changes
if leaderChanges == 0.0 {
return nil
}

// Capture etcd disk metrics as we detected excessive etcd leader changes
etcdWalFsyncResult, _, err := client.QueryRange(ctx, "histogram_quantile(0.99, rate(etcd_disk_wal_fsync_duration_seconds_bucket[5m]))", prometheusv1.Range{
Start: time.Now().Add(-1 * time.Hour),
End: time.Now(),
Step: 1 * time.Second,
})
if err != nil {
return err
}

matrix, ok := etcdWalFsyncResult.(model.Matrix)
if !ok {
return fmt.Errorf("unexpected type, expected Matrix, got %T", matrix)
}

values := []string{}
for _, s := range matrix {
if _, ok := s.Metric["pod"]; !ok {
klog.Warningf("No pod label found in metric: %+v", s.Metric)
continue
}
values = append(values, fmt.Sprintf("%s=%s", s.Metric["pod"], s.Values[0].Value))
}

infra, err := c.infraClient.Infrastructures().Get(ctx, "cluster", metav1.GetOptions{})
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
var platformType string
if infra.Status.PlatformStatus != nil {
platformType = string(infra.Status.PlatformStatus.Type)
}

// Send warning event if excessive leader changes are detected. The event will include fsync disk metrics.
syncCtx.Recorder().Warningf("EtcdLeaderChangeMetrics", "Detected %s leader changes in last 5 minutes on %q disk metrics are: %s", leaderChanges, platformType, strings.Join(values, ","))

// TODO: Consider Degraded condition here.

return nil
}
4 changes: 4 additions & 0 deletions pkg/operator/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/openshift/cluster-etcd-operator/pkg/operator/etcdmemberipmigrator"
"github.com/openshift/cluster-etcd-operator/pkg/operator/etcdmemberscontroller"
"github.com/openshift/cluster-etcd-operator/pkg/operator/hostendpointscontroller2"
"github.com/openshift/cluster-etcd-operator/pkg/operator/metriccontroller"
"github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient"
"github.com/openshift/cluster-etcd-operator/pkg/operator/resourcesynccontroller"
"github.com/openshift/cluster-etcd-operator/pkg/operator/scriptcontroller"
Expand Down Expand Up @@ -158,6 +159,8 @@ func RunOperator(ctx context.Context, controllerContext *controllercmd.Controlle
return err
}

fsyncMetricController := metriccontroller.NewFSyncController(operatorClient, configClient.ConfigV1(), kubeClient.CoreV1(), controllerContext.EventRecorder)

statusController := status.NewClusterOperatorStatusController(
"etcd",
[]configv1.ObjectReference{
Expand Down Expand Up @@ -234,6 +237,7 @@ func RunOperator(ctx context.Context, controllerContext *controllercmd.Controlle
configInformers.Start(ctx.Done())
dynamicInformers.Start(ctx.Done())

go fsyncMetricController.Run(ctx, 1)
go staticResourceController.Run(ctx, 1)
go targetConfigReconciler.Run(1, ctx.Done())
go etcdCertSignerController.Run(1, ctx.Done())
Expand Down
156 changes: 156 additions & 0 deletions vendor/github.com/prometheus/client_golang/api/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3d9d95a

Please sign in to comment.