Skip to content

Commit

Permalink
Allow HTTP pushes directly to ingesters, remove old billing code. (co…
Browse files Browse the repository at this point in the history
…rtexproject#1491)

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
  • Loading branch information
tomwilkie committed Feb 26, 2020
1 parent fa5b104 commit fc47e09
Show file tree
Hide file tree
Showing 51 changed files with 51 additions and 8,907 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -24,6 +24,7 @@
* [CHANGE] Removed unused /validate_expr endpoint. #2152
* [CHANGE] Updated Prometheus dependency to v2.16.0. This Prometheus version uses Active Query Tracker to limit concurrent queries. In order to keep `-querier.max-concurrent` working, Active Query Tracker is enabled by default, and is configured to store its data to `active-query-tracker` directory (relative to current directory when Cortex started). This can be changed by using `-querier.active-query-tracker-dir` option. Purpose of Active Query Tracker is to log queries that were running when Cortex crashes. This logging happens on next Cortex start. #2088
* [CHANGE] Experimental TSDB: TSDB head compaction interval and concurrency is now configurable (defaults to 1 min interval and 5 concurrent head compactions). New options: `-experimental.tsdb.head-compaction-interval` and `-experimental.tsdb.head-compaction-concurrency`. #2172
* [CHANGE] Remove fluentd-based billing infrastructure and flags such as `-distributor.enable-billing`. #1491
* [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125
* [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947
* `--experimental.distributor.user-subring-size`
Expand All @@ -32,6 +33,7 @@
* [FEATURE] Add ability to override YAML config file settings using environment variables. #2147
* `-config.expand-env`
* [FEATURE] Add /config HTTP endpoint which exposes the current Cortex configuration as YAML. #2165
* [FEATURE] Allow Prometheus remote write directly to ingesters. #1491
* [ENHANCEMENT] Add `status` label to `cortex_alertmanager_configs` metric to gauge the number of valid and invalid configs. #2125
* [ENHANCEMENT] Cassandra Authentication: added the `custom_authenticators` config option that allows users to authenticate with cassandra clusters using password authenticators that are not approved by default in [gocql](https://github.com/gocql/gocql/blob/81b8263d9fe526782a588ef94d3fa5c6148e5d67/conn.go#L27) #2093
* [ENHANCEMENT] Experimental TSDB: Export TSDB Syncer metrics from Compactor component, they are prefixed with `cortex_compactor_`. #2023
Expand Down
6 changes: 4 additions & 2 deletions docs/apis.md
Expand Up @@ -24,8 +24,6 @@ The API for reads also accepts HTTP/protobuf/snappy, and the path is `/api/prom/

See the Prometheus documentation for [more information on the Prometheus remote write format](https://prometheus.io/docs/prometheus/latest/storage/#remote-storage-integrations).



## Alerts & Rules API

Cortex supports the Prometheus' [alerts](https://prometheus.io/docs/prometheus/latest/querying/api/#alerts) and [rules](https://prometheus.io/docs/prometheus/latest/querying/api/#rules) api endpoints. This is supported in the Ruler service and can be enabled using the `experimental.ruler.enable-api` flag.
Expand Down Expand Up @@ -166,3 +164,7 @@ Note that setting a new config will effectively "re-enable" the Rules and Alertm

- Normal Response Codes: NoContent(204)
- Error Response Codes: Unauthorized(401)

#### Testing APIs

`POST /push` - Push samples directly to ingesters. Accepts requests in Prometheus remote write format. Indended for performance testing and debugging.
17 changes: 0 additions & 17 deletions docs/configuration/config-file-reference.md
Expand Up @@ -229,23 +229,6 @@ The `server_config` configures the HTTP and gRPC server of the launched service(
The `distributor_config` configures the Cortex distributor.

```yaml
# Report number of ingested samples to billing system.
# CLI flag: -distributor.enable-billing
[enable_billing: <boolean> | default = false]

billing:
# Maximum number of billing events to buffer in memory
# CLI flag: -billing.max-buffered-events
[maxbufferedevents: <int> | default = 1024]

# How often to retry sending events to the billing ingester.
# CLI flag: -billing.retry-delay
[retrydelay: <duration> | default = 500ms]

# points to the billing ingester sidecar (should be on localhost)
# CLI flag: -billing.ingester
[ingesterhostport: <string> | default = "localhost:24225"]

pool:
# How frequently to clean up clients for ingesters that have gone away.
# CLI flag: -distributor.client-cleanup-period
Expand Down
4 changes: 0 additions & 4 deletions go.mod
Expand Up @@ -20,7 +20,6 @@ require (
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/fluent/fluent-logger-golang v1.2.1 // indirect
github.com/fsouza/fake-gcs-server v1.7.0
github.com/go-kit/kit v0.9.0
github.com/gocql/gocql v0.0.0-20200121121104-95d072f1b5bb
Expand All @@ -47,7 +46,6 @@ require (
github.com/opentracing-contrib/go-grpc v0.0.0-20180928155321-4b5a12d3ff02
github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9
github.com/opentracing/opentracing-go v1.1.1-0.20200124165624-2876d2018785
github.com/philhofer/fwd v0.0.0-20160129035939-98c11a7a6ec8 // indirect
github.com/pkg/errors v0.8.1
github.com/prometheus/alertmanager v0.19.0
github.com/prometheus/client_golang v1.2.1
Expand All @@ -59,10 +57,8 @@ require (
github.com/spf13/afero v1.2.2
github.com/stretchr/testify v1.4.0
github.com/thanos-io/thanos v0.8.1-0.20200109203923-552ffa4c1a0d
github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/uber/jaeger-client-go v2.20.1+incompatible
github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1
github.com/weaveworks/common v0.0.0-20200206153930-760e36ae819a
go.etcd.io/bbolt v1.3.3
go.etcd.io/etcd v0.0.0-20190709142735-eb7dd97135a5
Expand Down
8 changes: 0 additions & 8 deletions go.sum
Expand Up @@ -218,8 +218,6 @@ github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb h1:IT4JYU7k4ikYg1S
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb/go.mod h1:bH6Xx7IW64qjjJq8M2u4dxNaBiDfKK+z/3eGDpXEQhc=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/structtag v1.1.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/fluent/fluent-logger-golang v1.2.1 h1:CMA+mw2zMiOGEOarZtaqM3GBWT1IVLNncNi0nKELtmU=
github.com/fluent/fluent-logger-golang v1.2.1/go.mod h1:2/HCT/jTy78yGyeNGQLGQsjF3zzzAuy6Xlk6FCMV5eU=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
Expand Down Expand Up @@ -635,8 +633,6 @@ github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0Mw
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/philhofer/fwd v0.0.0-20160129035939-98c11a7a6ec8 h1:jkUFVqrKRttbdDqkTrvOmHxfqIsJK0Oe2WGi1ACAE+M=
github.com/philhofer/fwd v0.0.0-20160129035939-98c11a7a6ec8/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
Expand Down Expand Up @@ -763,8 +759,6 @@ github.com/thanos-io/thanos v0.8.1-0.20200109203923-552ffa4c1a0d/go.mod h1:usT/T
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d h1:Ninez2SUm08xpmnw7kVxCeOc3DahF6IuMuRMCdM4wTQ=
github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand All @@ -776,8 +770,6 @@ github.com/uber/jaeger-lib v1.5.1-0.20181102163054-1fc5c315e03c/go.mod h1:ComeND
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1 h1:qi+YkNiB7T3Ikw1DoDIFhdAPbDU7fUPDsKrUoZdupnQ=
github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1/go.mod h1:7gGdEUJaCrSlWi/mjd68CZv0sfqektYPDcro9cE+M9k=
github.com/weaveworks/common v0.0.0-20200206153930-760e36ae819a h1:4Sm4LnEnP1yQ2NeNgGqLTuN2xrTvcBOU+EsljpB8Ed0=
github.com/weaveworks/common v0.0.0-20200206153930-760e36ae819a/go.mod h1:6enWAqfQBFrE8X/XdJwZr8IKgh1chStuFR0mjU/UOUw=
github.com/weaveworks/promrus v1.2.0 h1:jOLf6pe6/vss4qGHjXmGz4oDJQA+AOCqEL3FvvZGz7M=
Expand Down
4 changes: 3 additions & 1 deletion pkg/cortex/modules.go
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/push"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand Down Expand Up @@ -248,7 +249,7 @@ func (t *Cortex) initDistributor(cfg *Config) (err error) {
}

t.server.HTTP.HandleFunc("/all_user_stats", t.distributor.AllUserStatsHandler)
t.server.HTTP.Handle("/api/prom/push", t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.distributor.PushHandler)))
t.server.HTTP.Handle("/api/prom/push", t.httpAuthMiddleware.Wrap(push.Handler(cfg.Distributor, t.distributor.Push)))
t.server.HTTP.Handle("/ha-tracker", t.distributor.Replicas)
return
}
Expand Down Expand Up @@ -361,6 +362,7 @@ func (t *Cortex) initIngester(cfg *Config) (err error) {
t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.ingester.ReadinessHandler))
t.server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler))
t.server.HTTP.Path("/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler))
t.server.HTTP.Handle("/push", t.httpAuthMiddleware.Wrap(push.Handler(cfg.Distributor, t.ingester.Push)))
return
}

Expand Down
37 changes: 0 additions & 37 deletions pkg/distributor/billing.go

This file was deleted.

18 changes: 1 addition & 17 deletions pkg/distributor/distributor.go
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
billing "github.com/weaveworks/billing-client"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -99,7 +98,6 @@ type Distributor struct {
ingestersRing ring.ReadRing
ingesterPool *ingester_client.Pool
limits *validation.Overrides
billingClient *billing.Client

// The global rate limiter requires a distributors ring to count
// the number of healthy instances
Expand All @@ -115,9 +113,7 @@ type Distributor struct {
// Config contains the configuration require to
// create a Distributor
type Config struct {
EnableBilling bool `yaml:"enable_billing,omitempty"`
BillingConfig billing.Config `yaml:"billing,omitempty"`
PoolConfig ingester_client.PoolConfig `yaml:"pool,omitempty"`
PoolConfig ingester_client.PoolConfig `yaml:"pool,omitempty"`

HATrackerConfig HATrackerConfig `yaml:"ha_tracker,omitempty"`

Expand All @@ -136,12 +132,10 @@ type Config struct {

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.BillingConfig.RegisterFlags(f)
cfg.PoolConfig.RegisterFlags(f)
cfg.HATrackerConfig.RegisterFlags(f)
cfg.DistributorRing.RegisterFlags(f)

f.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.")
f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "remote_write API max receive message size (bytes).")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
Expand All @@ -162,15 +156,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
}
}

var billingClient *billing.Client
if cfg.EnableBilling {
var err error
billingClient, err = billing.NewClient(cfg.BillingConfig)
if err != nil {
return nil, err
}
}

replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout

Expand Down Expand Up @@ -204,7 +189,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
cfg: cfg,
ingestersRing: ingestersRing,
ingesterPool: ingester_client.NewPool(cfg.PoolConfig, ingestersRing, cfg.ingesterClientFactory, util.Logger),
billingClient: billingClient,
distributorsRing: distributorsRing,
limits: limits,
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
Expand Down
40 changes: 0 additions & 40 deletions pkg/distributor/http_server.go
Expand Up @@ -3,49 +3,9 @@ package distributor
import (
"net/http"

"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
)

// PushHandler is a http.Handler which accepts WriteRequests.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Write-Version"))
var req client.PreallocWriteRequest
req.Source = client.API
buf, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), d.cfg.MaxRecvMsgSize, &req, compressionType)
logger := util.WithContext(r.Context(), util.Logger)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if d.cfg.EnableBilling {
var samples int64
for _, ts := range req.Timeseries {
samples += int64(len(ts.Samples))
}
if err := d.emitBillingRecord(r.Context(), buf, samples); err != nil {
level.Error(logger).Log("msg", "error emitting billing record", "err", err)
}
}

if _, err := d.Push(r.Context(), &req.WriteRequest); err != nil {
resp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if resp.GetCode() != 202 {
level.Error(logger).Log("msg", "push error", "err", err)
}
http.Error(w, string(resp.Body), int(resp.Code))
}
}

// UserStats models ingestion statistics for one user.
type UserStats struct {
IngestionRate float64 `json:"ingestionRate"`
Expand Down
41 changes: 41 additions & 0 deletions pkg/util/push/push.go
@@ -0,0 +1,41 @@
package push

import (
"context"
"net/http"

"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
)

// Handler is a http.Handler which accepts WriteRequests.
func Handler(cfg distributor.Config, push func(context.Context, *client.WriteRequest) (*client.WriteResponse, error)) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Write-Version"))
var req client.PreallocWriteRequest
req.Source = client.API
_, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, compressionType)
logger := util.WithContext(r.Context(), util.Logger)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if _, err := push(r.Context(), &req.WriteRequest); err != nil {
resp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if resp.GetCode() != 202 {
level.Error(logger).Log("msg", "push error", "err", err)
}
http.Error(w, string(resp.Body), int(resp.Code))
}
})
}

0 comments on commit fc47e09

Please sign in to comment.