Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate metrics code in a single package, add metrics prefix config option #55

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deploy/01-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ metadata:
namespace: monitoring
data:
config.yaml: |
logLevel: error
logLevel: warn
logFormat: json
metricsNamePrefix: event_exporter_
route:
routes:
- match:
Expand Down
28 changes: 6 additions & 22 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@ import (
"context"
"flag"
"io/ioutil"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/resmoio/kubernetes-event-exporter/pkg/exporter"
"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/resmoio/kubernetes-event-exporter/pkg/metrics"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -74,7 +71,10 @@ func main() {
kubeconfig.QPS = cfg.KubeQPS
kubeconfig.Burst = cfg.KubeBurst

engine := exporter.NewEngine(&cfg, &exporter.ChannelBasedReceiverRegistry{})
metrics.Init(*addr)
metricsStore := metrics.NewMetricsStore(cfg.MetricsNamePrefix)

engine := exporter.NewEngine(&cfg, &exporter.ChannelBasedReceiverRegistry{MetricsStore: metricsStore})
onEvent := engine.OnEvent
if len(cfg.ClusterName) != 0 {
onEvent = func(event *kube.EnhancedEvent) {
Expand All @@ -84,7 +84,7 @@ func main() {
engine.OnEvent(event)
}
}
w := kube.NewEventWatcher(kubeconfig, cfg.Namespace, cfg.MaxEventAgeSeconds, onEvent)
w := kube.NewEventWatcher(kubeconfig, cfg.Namespace, cfg.MaxEventAgeSeconds, metricsStore, onEvent)

ctx, cancel := context.WithCancel(context.Background())
leaderLost := make(chan bool)
Expand All @@ -107,22 +107,6 @@ func main() {
w.Start()
}

// Setup the prometheus metrics machinery
// Add Go module build info.
prometheus.MustRegister(collectors.NewBuildInfoCollector())

// Expose the registered metrics via HTTP.
http.Handle("/metrics", promhttp.HandlerFor(
prometheus.DefaultGatherer,
promhttp.HandlerOpts{
// Opt into OpenMetrics to support exemplars.
EnableOpenMetrics: true,
},
))

// start up the http listener to expose the metrics
go http.ListenAndServe(*addr, nil)

c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)

Expand Down
13 changes: 3 additions & 10 deletions pkg/exporter/channel_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,11 @@ import (
"sync"

"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/resmoio/kubernetes-event-exporter/pkg/metrics"
"github.com/resmoio/kubernetes-event-exporter/pkg/sinks"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog/log"
)

var (
sendErrors = promauto.NewCounter(prometheus.CounterOpts{
Name: "send_event_errors",
Help: "The total number of send event errors",
})
)

// ChannelBasedReceiverRegistry creates two channels for each receiver. One is for receiving events and other one is
// for breaking out of the infinite loop. Each message is passed to receivers
// This might not be the best way to implement such feature. A ring buffer can be better
Expand All @@ -27,6 +19,7 @@ type ChannelBasedReceiverRegistry struct {
ch map[string]chan kube.EnhancedEvent
exitCh map[string]chan interface{}
wg *sync.WaitGroup
MetricsStore *metrics.Store
}

func (r *ChannelBasedReceiverRegistry) SendEvent(name string, event *kube.EnhancedEvent) {
Expand Down Expand Up @@ -65,7 +58,7 @@ func (r *ChannelBasedReceiverRegistry) Register(name string, receiver sinks.Sink
log.Debug().Str("sink", name).Str("event", ev.Message).Msg("sending event to sink")
err := receiver.Send(context.Background(), &ev)
if err != nil {
sendErrors.Inc()
r.MetricsStore.SendErrors.Inc()
log.Debug().Err(err).Str("sink", name).Str("event", ev.Message).Msg("Cannot send event")
}
case <-exitCh:
Expand Down
24 changes: 24 additions & 0 deletions pkg/exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package exporter

import (
"errors"
"regexp"
"strconv"

"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
Expand All @@ -25,12 +26,16 @@ type Config struct {
Receivers []sinks.ReceiverConfig `yaml:"receivers"`
KubeQPS float32 `yaml:"kubeQPS,omitempty"`
KubeBurst int `yaml:"kubeBurst,omitempty"`
MetricsNamePrefix string `yaml:"metricsNamePrefix,omitempty"`
}

func (c *Config) Validate() error {
if err := c.validateDefaults(); err != nil {
return err
}
if err := c.validateMetricsNamePrefix(); err != nil {
return err
}

// No duplicate receivers
// Receivers individually
Expand Down Expand Up @@ -63,3 +68,22 @@ func (c *Config) validateMaxEventAgeSeconds() error {
}
return nil
}

func (c *Config) validateMetricsNamePrefix() error {
if c.MetricsNamePrefix != "" {
// https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
checkResult, err := regexp.MatchString("^[a-zA-Z][a-zA-Z0-9_:]*_$", c.MetricsNamePrefix)
if err != nil {
return err
}
if checkResult {
log.Info().Msg("config.metricsNamePrefix='" + c.MetricsNamePrefix + "'")
} else {
log.Error().Msg("config.metricsNamePrefix should match the regex: ^[a-zA-Z][a-zA-Z0-9_:]*_$")
return errors.New("validateMetricsNamePrefix failed")
}
} else {
log.Warn().Msg("metrics name prefix is empty, setting config.metricsNamePrefix='event_exporter_' is recommended")
}
return nil
}
59 changes: 58 additions & 1 deletion pkg/exporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,61 @@ func TestValidate_IsCheckingMaxEventAgeSeconds_WhenMaxEventAgeSecondsAndThrottle
err := config.Validate()
assert.Error(t, err)
assert.Contains(t, output.String(), "cannot set both throttlePeriod (depricated) and MaxEventAgeSeconds")
}
}

func TestValidate_MetricsNamePrefix_WhenEmpty(t *testing.T) {
output := &bytes.Buffer{}
log.Logger = log.Logger.Output(output)

config := Config{}
err := config.Validate()
assert.NoError(t, err)
assert.Equal(t, "", config.MetricsNamePrefix)
assert.Contains(t, output.String(), "metrics name prefix is empty, setting config.metricsNamePrefix='event_exporter_' is recommended")
}

func TestValidate_MetricsNamePrefix_WhenValid(t *testing.T) {
output := &bytes.Buffer{}
log.Logger = log.Logger.Output(output)

validCases := []string{
"kubernetes_event_exporter_",
"test_",
"test_test_",
"test::test_test_",
"TEST::test_test_",
"test_test::1234_test_",
}

for _, testPrefix := range validCases {
config := Config{
MetricsNamePrefix: testPrefix,
}
err := config.Validate()
assert.NoError(t, err)
assert.Equal(t, testPrefix, config.MetricsNamePrefix)
assert.Contains(t, output.String(), "config.metricsNamePrefix='" + testPrefix + "'")
}
}

func TestValidate_MetricsNamePrefix_WhenInvalid(t *testing.T) {
output := &bytes.Buffer{}
log.Logger = log.Logger.Output(output)

invalidCases := []string{
"no_tracing_underscore",
"__reserved_",
"::wrong_",
"13245_test_",
}

for _, testPrefix := range invalidCases {
config := Config{
MetricsNamePrefix: testPrefix,
}
err := config.Validate()
assert.Error(t, err)
assert.Equal(t, testPrefix, config.MetricsNamePrefix)
assert.Contains(t, output.String(), "config.metricsNamePrefix should match the regex: ^[a-zA-Z][a-zA-Z0-9_:]*_$")
}
}
3 changes: 2 additions & 1 deletion pkg/exporter/engine.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package exporter

import (
"reflect"

"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/rs/zerolog/log"
"reflect"
)

// Engine is responsible for initializing the receivers from sinks
Expand Down
35 changes: 11 additions & 24 deletions pkg/kube/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package kube
import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/resmoio/kubernetes-event-exporter/pkg/metrics"
"github.com/rs/zerolog/log"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
Expand All @@ -13,22 +12,7 @@ import (
"k8s.io/client-go/tools/cache"
)

var (
eventsProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "events_sent",
Help: "The total number of events sent",
})
eventsDiscarded = promauto.NewCounter(prometheus.CounterOpts{
Name: "events_discarded",
Help: "The total number of events discarded because of being older than the maxEventAgeSeconds specified",
})
watchErrors = promauto.NewCounter(prometheus.CounterOpts{
Name: "watch_errors",
Help: "The total number of errors received from the informer",
})

startUpTime = time.Now()
)
var startUpTime = time.Now()

type EventHandler func(event *EnhancedEvent)

Expand All @@ -39,9 +23,10 @@ type EventWatcher struct {
annotationCache *AnnotationCache
fn EventHandler
maxEventAgeSeconds time.Duration
metricsStore *metrics.Store
}

func NewEventWatcher(config *rest.Config, namespace string, MaxEventAgeSeconds int64, fn EventHandler) *EventWatcher {
func NewEventWatcher(config *rest.Config, namespace string, MaxEventAgeSeconds int64, metricsStore *metrics.Store, fn EventHandler) *EventWatcher {
clientset := kubernetes.NewForConfigOrDie(config)
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(namespace))
informer := factory.Core().V1().Events().Informer()
Expand All @@ -53,11 +38,12 @@ func NewEventWatcher(config *rest.Config, namespace string, MaxEventAgeSeconds i
annotationCache: NewAnnotationCache(config),
fn: fn,
maxEventAgeSeconds: time.Second * time.Duration(MaxEventAgeSeconds),
metricsStore: metricsStore,
}

informer.AddEventHandler(watcher)
informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
watchErrors.Inc()
watcher.metricsStore.WatchErrors.Inc()
})

return watcher
Expand Down Expand Up @@ -88,7 +74,7 @@ func (e *EventWatcher) isEventDiscarded(event *corev1.Event) bool {
Str("event namespace", event.Namespace).
Str("event name", event.Name).
Msg("Event discarded as being older then maxEventAgeSeconds")
eventsDiscarded.Inc()
e.metricsStore.EventsDiscarded.Inc()
}
return true
}
Expand All @@ -107,7 +93,7 @@ func (e *EventWatcher) onEvent(event *corev1.Event) {
Str("involvedObject", event.InvolvedObject.Name).
Msg("Received event")

eventsProcessed.Inc()
e.metricsStore.EventsProcessed.Inc()

ev := &EnhancedEvent{
Event: *event.DeepCopy(),
Expand Down Expand Up @@ -155,16 +141,17 @@ func (e *EventWatcher) Stop() {
close(e.stopper)
}

func NewMockEventWatcher(MaxEventAgeSeconds int64) *EventWatcher {
func NewMockEventWatcher(MaxEventAgeSeconds int64, metricsStore *metrics.Store) *EventWatcher {
watcher := &EventWatcher{
labelCache: NewMockLabelCache(),
annotationCache: NewMockAnnotationCache(),
maxEventAgeSeconds: time.Second * time.Duration(MaxEventAgeSeconds),
fn: func(event *EnhancedEvent) {},
metricsStore: metricsStore,
}
return watcher
}

func (e *EventWatcher) SetStartUpTime(time time.Time) {
func (e *EventWatcher) setStartUpTime(time time.Time) {
startUpTime = time
}
Loading