Skip to content

Commit

Permalink
consolidate metrics code in a single package, add metrics prefix conf…
Browse files Browse the repository at this point in the history
…ig option (#55)
  • Loading branch information
Evedel committed Mar 17, 2023
1 parent 8af50d7 commit 9aa12cf
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 65 deletions.
3 changes: 2 additions & 1 deletion deploy/01-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ metadata:
namespace: monitoring
data:
config.yaml: |
logLevel: error
logLevel: warn
logFormat: json
metricsNamePrefix: event_exporter_
route:
routes:
- match:
Expand Down
28 changes: 6 additions & 22 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@ import (
"context"
"flag"
"io/ioutil"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/resmoio/kubernetes-event-exporter/pkg/exporter"
"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/resmoio/kubernetes-event-exporter/pkg/metrics"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -74,7 +71,10 @@ func main() {
kubeconfig.QPS = cfg.KubeQPS
kubeconfig.Burst = cfg.KubeBurst

engine := exporter.NewEngine(&cfg, &exporter.ChannelBasedReceiverRegistry{})
metrics.Init(*addr)
metricsStore := metrics.NewMetricsStore(cfg.MetricsNamePrefix)

engine := exporter.NewEngine(&cfg, &exporter.ChannelBasedReceiverRegistry{MetricsStore: metricsStore})
onEvent := engine.OnEvent
if len(cfg.ClusterName) != 0 {
onEvent = func(event *kube.EnhancedEvent) {
Expand All @@ -84,7 +84,7 @@ func main() {
engine.OnEvent(event)
}
}
w := kube.NewEventWatcher(kubeconfig, cfg.Namespace, cfg.MaxEventAgeSeconds, onEvent)
w := kube.NewEventWatcher(kubeconfig, cfg.Namespace, cfg.MaxEventAgeSeconds, metricsStore, onEvent)

ctx, cancel := context.WithCancel(context.Background())
leaderLost := make(chan bool)
Expand All @@ -107,22 +107,6 @@ func main() {
w.Start()
}

// Setup the prometheus metrics machinery
// Add Go module build info.
prometheus.MustRegister(collectors.NewBuildInfoCollector())

// Expose the registered metrics via HTTP.
http.Handle("/metrics", promhttp.HandlerFor(
prometheus.DefaultGatherer,
promhttp.HandlerOpts{
// Opt into OpenMetrics to support exemplars.
EnableOpenMetrics: true,
},
))

// start up the http listener to expose the metrics
go http.ListenAndServe(*addr, nil)

c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)

Expand Down
13 changes: 3 additions & 10 deletions pkg/exporter/channel_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,11 @@ import (
"sync"

"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/resmoio/kubernetes-event-exporter/pkg/metrics"
"github.com/resmoio/kubernetes-event-exporter/pkg/sinks"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog/log"
)

var (
sendErrors = promauto.NewCounter(prometheus.CounterOpts{
Name: "send_event_errors",
Help: "The total number of send event errors",
})
)

// ChannelBasedReceiverRegistry creates two channels for each receiver. One is for receiving events and other one is
// for breaking out of the infinite loop. Each message is passed to receivers
// This might not be the best way to implement such feature. A ring buffer can be better
Expand All @@ -27,6 +19,7 @@ type ChannelBasedReceiverRegistry struct {
ch map[string]chan kube.EnhancedEvent
exitCh map[string]chan interface{}
wg *sync.WaitGroup
MetricsStore *metrics.Store
}

func (r *ChannelBasedReceiverRegistry) SendEvent(name string, event *kube.EnhancedEvent) {
Expand Down Expand Up @@ -65,7 +58,7 @@ func (r *ChannelBasedReceiverRegistry) Register(name string, receiver sinks.Sink
log.Debug().Str("sink", name).Str("event", ev.Message).Msg("sending event to sink")
err := receiver.Send(context.Background(), &ev)
if err != nil {
sendErrors.Inc()
r.MetricsStore.SendErrors.Inc()
log.Debug().Err(err).Str("sink", name).Str("event", ev.Message).Msg("Cannot send event")
}
case <-exitCh:
Expand Down
24 changes: 24 additions & 0 deletions pkg/exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package exporter

import (
"errors"
"regexp"
"strconv"

"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
Expand All @@ -25,12 +26,16 @@ type Config struct {
Receivers []sinks.ReceiverConfig `yaml:"receivers"`
KubeQPS float32 `yaml:"kubeQPS,omitempty"`
KubeBurst int `yaml:"kubeBurst,omitempty"`
MetricsNamePrefix string `yaml:"metricsNamePrefix,omitempty"`
}

func (c *Config) Validate() error {
if err := c.validateDefaults(); err != nil {
return err
}
if err := c.validateMetricsNamePrefix(); err != nil {
return err
}

// No duplicate receivers
// Receivers individually
Expand Down Expand Up @@ -63,3 +68,22 @@ func (c *Config) validateMaxEventAgeSeconds() error {
}
return nil
}

func (c *Config) validateMetricsNamePrefix() error {
if c.MetricsNamePrefix != "" {
// https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
checkResult, err := regexp.MatchString("^[a-zA-Z][a-zA-Z0-9_:]*_$", c.MetricsNamePrefix)
if err != nil {
return err
}
if checkResult {
log.Info().Msg("config.metricsNamePrefix='" + c.MetricsNamePrefix + "'")
} else {
log.Error().Msg("config.metricsNamePrefix should match the regex: ^[a-zA-Z][a-zA-Z0-9_:]*_$")
return errors.New("validateMetricsNamePrefix failed")
}
} else {
log.Warn().Msg("metrics name prefix is empty, setting config.metricsNamePrefix='event_exporter_' is recommended")
}
return nil
}
59 changes: 58 additions & 1 deletion pkg/exporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,61 @@ func TestValidate_IsCheckingMaxEventAgeSeconds_WhenMaxEventAgeSecondsAndThrottle
err := config.Validate()
assert.Error(t, err)
assert.Contains(t, output.String(), "cannot set both throttlePeriod (depricated) and MaxEventAgeSeconds")
}
}

func TestValidate_MetricsNamePrefix_WhenEmpty(t *testing.T) {
output := &bytes.Buffer{}
log.Logger = log.Logger.Output(output)

config := Config{}
err := config.Validate()
assert.NoError(t, err)
assert.Equal(t, "", config.MetricsNamePrefix)
assert.Contains(t, output.String(), "metrics name prefix is empty, setting config.metricsNamePrefix='event_exporter_' is recommended")
}

func TestValidate_MetricsNamePrefix_WhenValid(t *testing.T) {
output := &bytes.Buffer{}
log.Logger = log.Logger.Output(output)

validCases := []string{
"kubernetes_event_exporter_",
"test_",
"test_test_",
"test::test_test_",
"TEST::test_test_",
"test_test::1234_test_",
}

for _, testPrefix := range validCases {
config := Config{
MetricsNamePrefix: testPrefix,
}
err := config.Validate()
assert.NoError(t, err)
assert.Equal(t, testPrefix, config.MetricsNamePrefix)
assert.Contains(t, output.String(), "config.metricsNamePrefix='" + testPrefix + "'")
}
}

func TestValidate_MetricsNamePrefix_WhenInvalid(t *testing.T) {
output := &bytes.Buffer{}
log.Logger = log.Logger.Output(output)

invalidCases := []string{
"no_tracing_underscore",
"__reserved_",
"::wrong_",
"13245_test_",
}

for _, testPrefix := range invalidCases {
config := Config{
MetricsNamePrefix: testPrefix,
}
err := config.Validate()
assert.Error(t, err)
assert.Equal(t, testPrefix, config.MetricsNamePrefix)
assert.Contains(t, output.String(), "config.metricsNamePrefix should match the regex: ^[a-zA-Z][a-zA-Z0-9_:]*_$")
}
}
3 changes: 2 additions & 1 deletion pkg/exporter/engine.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package exporter

import (
"reflect"

"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/rs/zerolog/log"
"reflect"
)

// Engine is responsible for initializing the receivers from sinks
Expand Down
35 changes: 11 additions & 24 deletions pkg/kube/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package kube
import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/resmoio/kubernetes-event-exporter/pkg/metrics"
"github.com/rs/zerolog/log"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
Expand All @@ -13,22 +12,7 @@ import (
"k8s.io/client-go/tools/cache"
)

var (
eventsProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "events_sent",
Help: "The total number of events sent",
})
eventsDiscarded = promauto.NewCounter(prometheus.CounterOpts{
Name: "events_discarded",
Help: "The total number of events discarded because of being older than the maxEventAgeSeconds specified",
})
watchErrors = promauto.NewCounter(prometheus.CounterOpts{
Name: "watch_errors",
Help: "The total number of errors received from the informer",
})

startUpTime = time.Now()
)
var startUpTime = time.Now()

type EventHandler func(event *EnhancedEvent)

Expand All @@ -39,9 +23,10 @@ type EventWatcher struct {
annotationCache *AnnotationCache
fn EventHandler
maxEventAgeSeconds time.Duration
metricsStore *metrics.Store
}

func NewEventWatcher(config *rest.Config, namespace string, MaxEventAgeSeconds int64, fn EventHandler) *EventWatcher {
func NewEventWatcher(config *rest.Config, namespace string, MaxEventAgeSeconds int64, metricsStore *metrics.Store, fn EventHandler) *EventWatcher {
clientset := kubernetes.NewForConfigOrDie(config)
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(namespace))
informer := factory.Core().V1().Events().Informer()
Expand All @@ -53,11 +38,12 @@ func NewEventWatcher(config *rest.Config, namespace string, MaxEventAgeSeconds i
annotationCache: NewAnnotationCache(config),
fn: fn,
maxEventAgeSeconds: time.Second * time.Duration(MaxEventAgeSeconds),
metricsStore: metricsStore,
}

informer.AddEventHandler(watcher)
informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
watchErrors.Inc()
watcher.metricsStore.WatchErrors.Inc()
})

return watcher
Expand Down Expand Up @@ -88,7 +74,7 @@ func (e *EventWatcher) isEventDiscarded(event *corev1.Event) bool {
Str("event namespace", event.Namespace).
Str("event name", event.Name).
Msg("Event discarded as being older then maxEventAgeSeconds")
eventsDiscarded.Inc()
e.metricsStore.EventsDiscarded.Inc()
}
return true
}
Expand All @@ -107,7 +93,7 @@ func (e *EventWatcher) onEvent(event *corev1.Event) {
Str("involvedObject", event.InvolvedObject.Name).
Msg("Received event")

eventsProcessed.Inc()
e.metricsStore.EventsProcessed.Inc()

ev := &EnhancedEvent{
Event: *event.DeepCopy(),
Expand Down Expand Up @@ -155,16 +141,17 @@ func (e *EventWatcher) Stop() {
close(e.stopper)
}

func NewMockEventWatcher(MaxEventAgeSeconds int64) *EventWatcher {
func NewMockEventWatcher(MaxEventAgeSeconds int64, metricsStore *metrics.Store) *EventWatcher {
watcher := &EventWatcher{
labelCache: NewMockLabelCache(),
annotationCache: NewMockAnnotationCache(),
maxEventAgeSeconds: time.Second * time.Duration(MaxEventAgeSeconds),
fn: func(event *EnhancedEvent) {},
metricsStore: metricsStore,
}
return watcher
}

func (e *EventWatcher) SetStartUpTime(time time.Time) {
func (e *EventWatcher) setStartUpTime(time time.Time) {
startUpTime = time
}
Loading

0 comments on commit 9aa12cf

Please sign in to comment.