Skip to content

Commit

Permalink
*: support Alertmanager API v2
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Pasquier <spasquie@redhat.com>
  • Loading branch information
simonpasquier committed Jan 13, 2020
1 parent 17fd999 commit ffc6d7b
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -50,6 +50,7 @@ Compactor now properly handles partial block uploads for all operation like rete
- [#1904](https://github.com/thanos-io/thanos/pull/1904) Add a skip-chunks option in Store Series API to improve the response time of `/api/v1/series` endpoint.
- [#1910](https://github.com/thanos-io/thanos/pull/1910) Query: `/api/v1/labels` now understands `POST` - useful for sending bigger requests
- [#1939](https://github.com/thanos-io/thanos/pull/1939) Ruler: Add TLS and authentication support for query endpoints with the `--query.config` and `--query.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information.
- [#1982](https://github.com/thanos-io/thanos/pull/1982) Ruler: Add support for Alertmanager v2 API endpoints.

### Changed

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -71,7 +71,7 @@ ME ?= $(shell whoami)
PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0
PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0

ALERTMANAGER_VERSION ?= v0.15.2
ALERTMANAGER_VERSION ?= v0.20.0
ALERTMANAGER ?= $(GOBIN)/alertmanager-$(ALERTMANAGER_VERSION)

MINIO_SERVER_VERSION ?= RELEASE.2018-10-06T00-15-16Z
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Expand Up @@ -381,7 +381,7 @@ func runRule(
// Discover and resolve Alertmanager addresses.
addDiscoveryGroups(g, amClient, alertmgrsDNSSDInterval)

alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout)))
alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout), cfg.APIVersion))
}

// Run rule evaluation and alert notifications.
Expand Down
1 change: 1 addition & 0 deletions docs/components/rule.md
Expand Up @@ -416,6 +416,7 @@ alertmanagers:
scheme: http
path_prefix: ""
timeout: 10s
api_version: v1
```

### Query API
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Expand Up @@ -29,6 +29,7 @@ require (
github.com/fortytw2/leaktest v1.3.0
github.com/fsnotify/fsnotify v1.4.7
github.com/go-kit/kit v0.9.0
github.com/go-openapi/strfmt v0.19.2
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9 // indirect
github.com/golang/snappy v0.0.1
Expand Down Expand Up @@ -66,6 +67,7 @@ require (
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.8.1
github.com/prometheus/alertmanager v0.20.0
github.com/prometheus/client_golang v1.2.1
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.7.0
Expand Down
66 changes: 66 additions & 0 deletions go.sum

Large diffs are not rendered by default.

76 changes: 64 additions & 12 deletions pkg/alert/alert.go
Expand Up @@ -16,7 +16,9 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
"github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"

Expand All @@ -26,7 +28,6 @@ import (

const (
defaultAlertmanagerPort = 9093
alertPushEndpoint = "/api/v1/alerts"
contentTypeJSON = "application/json"
)

Expand Down Expand Up @@ -255,6 +256,7 @@ func (q *Queue) Push(alerts []*Alert) {
type Sender struct {
logger log.Logger
alertmanagers []*Alertmanager
versions []APIVersion

sent *prometheus.CounterVec
errs *prometheus.CounterVec
Expand All @@ -272,9 +274,20 @@ func NewSender(
if logger == nil {
logger = log.NewNopLogger()
}
var (
versions []APIVersion
versionPresent map[APIVersion]struct{}
)
for _, am := range alertmanagers {
if _, found := versionPresent[am.version]; found {
continue
}
versions = append(versions, am.version)
}
s := &Sender{
logger: logger,
alertmanagers: alertmanagers,
versions: versions,

sent: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "thanos_alert_sender_alerts_sent_total",
Expand Down Expand Up @@ -302,6 +315,15 @@ func NewSender(
return s
}

func toAPILabels(labels labels.Labels) models.LabelSet {
apiLabels := make(models.LabelSet, len(labels))
for _, label := range labels {
apiLabels[label.Name] = label.Value
}

return apiLabels
}

// Send an alert batch to all given Alertmanager clients.
// TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660.
func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
Expand All @@ -310,10 +332,38 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
if len(alerts) == 0 {
return
}
b, err := json.Marshal(alerts)
if err != nil {
level.Warn(s.logger).Log("msg", "sending alerts failed", "err", err)
return

payload := make(map[APIVersion][]byte)
for _, version := range s.versions {
var (
b []byte
err error
)
switch version {
case APIv1:
if b, err = json.Marshal(alerts); err != nil {
level.Warn(s.logger).Log("msg", "encoding alerts for v1 API failed", "err", err)
return
}
case APIv2:
apiAlerts := make(models.PostableAlerts, 0, len(alerts))
for _, a := range alerts {
apiAlerts = append(apiAlerts, &models.PostableAlert{
Annotations: toAPILabels(a.Annotations),
EndsAt: strfmt.DateTime(a.EndsAt),
StartsAt: strfmt.DateTime(a.StartsAt),
Alert: models.Alert{
GeneratorURL: strfmt.URI(a.GeneratorURL),
Labels: toAPILabels(a.Labels),
},
})
}
if b, err = json.Marshal(apiAlerts); err != nil {
level.Warn(s.logger).Log("msg", "encoding alerts for v2 API failed", "err", err)
return
}
}
payload[version] = b
}

var (
Expand All @@ -323,18 +373,19 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
for _, am := range s.alertmanagers {
for _, u := range am.dispatcher.Endpoints() {
wg.Add(1)
go func(am *Alertmanager, u *url.URL) {
go func(am *Alertmanager, u url.URL) {
defer wg.Done()

level.Debug(s.logger).Log("msg", "sending alerts", "alertmanager", u.Host, "numAlerts", len(alerts))
start := time.Now()
u.Path = path.Join(u.Path, fmt.Sprintf("/api/%s/alerts", string(am.version)))
span, ctx := tracing.StartSpan(ctx, "post_alerts HTTP[client]")
defer span.Finish()
if err := am.postAlerts(ctx, *u, bytes.NewReader(b)); err != nil {
if err := am.postAlerts(ctx, u, bytes.NewReader(payload[am.version])); err != nil {
level.Warn(s.logger).Log(
"msg", "sending alerts failed",
"alertmanager", u.Host,
"numAlerts", len(alerts),
"alerts", string(payload[am.version]),
"err", err,
)
s.errs.WithLabelValues(u.Host).Inc()
Expand All @@ -344,7 +395,7 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
s.sent.WithLabelValues(u.Host).Add(float64(len(alerts)))

atomic.AddUint64(&numSuccess, 1)
}(am, u)
}(am, *u)
}
}
wg.Wait()
Expand All @@ -354,7 +405,7 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
}

s.dropped.Add(float64(len(alerts)))
level.Warn(s.logger).Log("msg", "failed to send alerts to all alertmanagers", "alerts", string(b))
level.Warn(s.logger).Log("msg", "failed to send alerts to all alertmanagers")
}

type Dispatcher interface {
Expand All @@ -369,10 +420,11 @@ type Alertmanager struct {
logger log.Logger
dispatcher Dispatcher
timeout time.Duration
version APIVersion
}

// NewAlertmanager returns a new Alertmanager client.
func NewAlertmanager(logger log.Logger, dispatcher Dispatcher, timeout time.Duration) *Alertmanager {
func NewAlertmanager(logger log.Logger, dispatcher Dispatcher, timeout time.Duration, version APIVersion) *Alertmanager {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -381,11 +433,11 @@ func NewAlertmanager(logger log.Logger, dispatcher Dispatcher, timeout time.Dura
logger: logger,
dispatcher: dispatcher,
timeout: timeout,
version: version,
}
}

func (a *Alertmanager) postAlerts(ctx context.Context, u url.URL, r io.Reader) error {
u.Path = path.Join(u.Path, alertPushEndpoint)
req, err := http.NewRequest("POST", u.String(), r)
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/alert/alert_test.go
Expand Up @@ -77,7 +77,7 @@ func TestSenderSendsOk(t *testing.T) {
poster := &fakeClient{
urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}},
}
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute)})
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute, APIv1)})

s.Send(context.Background(), []*Alert{{}, {}})

Expand All @@ -104,7 +104,7 @@ func TestSenderSendsOneFails(t *testing.T) {
return rec.Result(), nil
},
}
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute)})
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute, APIv1)})

s.Send(context.Background(), []*Alert{{}, {}})

Expand All @@ -125,7 +125,7 @@ func TestSenderSendsAllFail(t *testing.T) {
return nil, errors.New("no such host")
},
}
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute)})
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute, APIv1)})

s.Send(context.Background(), []*Alert{{}, {}})

Expand Down
34 changes: 32 additions & 2 deletions pkg/alert/config.go
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"time"

"github.com/pkg/errors"
"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"

Expand All @@ -19,11 +20,39 @@ type AlertingConfig struct {
}

// AlertmanagerConfig represents a client to a cluster of Alertmanager endpoints.
// TODO(simonpasquier): add support for API version (v1 or v2).
type AlertmanagerConfig struct {
HTTPClientConfig http_util.ClientConfig `yaml:"http_config"`
EndpointsConfig http_util.EndpointsConfig `yaml:",inline"`
Timeout model.Duration `yaml:"timeout"`
APIVersion APIVersion `yaml:"api_version"`
}

// APIVersion represents the API version of the Alertmanager endpoint.
type APIVersion string

const (
APIv1 APIVersion = "v1"
APIv2 APIVersion = "v2"
)

var supportedAPIVersions = []APIVersion{
APIv1, APIv2,
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (v *APIVersion) UnmarshalYAML(unmarshal func(interface{}) error) error {
var s string
if err := unmarshal(&s); err != nil {
return errors.Wrap(err, "invalid Alertmanager API version")
}

for _, ver := range supportedAPIVersions {
if APIVersion(s) == ver {
*v = ver
return nil
}
}
return errors.Errorf("expected Alertmanager API version to be one of %v but got %q", supportedAPIVersions, s)
}

func DefaultAlertmanagerConfig() AlertmanagerConfig {
Expand All @@ -33,7 +62,8 @@ func DefaultAlertmanagerConfig() AlertmanagerConfig {
StaticAddresses: []string{},
FileSDConfigs: []http_util.FileSDConfig{},
},
Timeout: model.Duration(time.Second * 10),
Timeout: model.Duration(time.Second * 10),
APIVersion: APIv1,
}
}

Expand Down
33 changes: 33 additions & 0 deletions pkg/alert/config_test.go
Expand Up @@ -4,10 +4,43 @@ import (
"testing"
"time"

"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/http"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestUnmarshalAPIVersion(t *testing.T) {
for _, tc := range []struct {
v string

err bool
expected APIVersion
}{
{
v: "v1",
expected: APIv1,
},
{
v: "v3",
err: true,
},
{
v: "{}",
err: true,
},
} {
var got APIVersion
err := yaml.Unmarshal([]byte(tc.v), &got)
if tc.err {
testutil.NotOk(t, err)
continue
}
testutil.Ok(t, err)
testutil.Equals(t, tc.expected, got)
}
}

func TestBuildAlertmanagerConfiguration(t *testing.T) {
for _, tc := range []struct {
address string
Expand Down
2 changes: 1 addition & 1 deletion pkg/testutil/prometheus.go
Expand Up @@ -28,7 +28,7 @@ import (

const (
defaultPrometheusVersion = "v2.13.0"
defaultAlertmanagerVersion = "v0.15.2"
defaultAlertmanagerVersion = "v0.20.0"
defaultMinioVersion = "RELEASE.2018-10-06T00-15-16Z"

// Space delimited list of versions.
Expand Down

0 comments on commit ffc6d7b

Please sign in to comment.