Skip to content

Commit

Permalink
Use BasicLifecycler for distributors and auto-forget (grafana#2154)
Browse files Browse the repository at this point in the history
Use the BasicLifecycler in distributors for managing their lifecycle so
that we can take advantage of the "auto-forget" delegates feature. This
prevents the ring from filling up with "unhealthy" distributors that are
never removed. This wasn't a bug but it was confusing for users and
operators.

Fixes grafana#2138

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
  • Loading branch information
56quarters authored and mason committed Jul 11, 2022
1 parent ed69f1a commit 4247214
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 159 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* [ENHANCEMENT] Compactor: Run sanity check on blocks storage configuration at startup. #2143
* [ENHANCEMENT] Compactor: Add HTTP API for uploading TSDB blocks. Enabled with `-compactor.block-upload-enabled`. #1694 #2126
* [ENHANCEMENT] Ingester: Enable querying overlapping blocks by default. #2187
* [ENHANCEMENT] Distributor: Auto-forget unhealthy distributors after ten failed ring heartbeats. #2154
* [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883
* [BUGFIX] Ingester: fixed deceiving error log "failed to update cached shipped blocks after shipper initialisation", occurring for each new tenant in the ingester. #1893
* [BUGFIX] Ring: fix bug where instances may appear unhealthy in the hash ring web UI even though they are not. #1933
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ multitenancy_enabled: false
distributor:
pool:
health_check_ingesters: true
ring:
kvstore:
store: consul
consul:
host: consul:8500

ingester_client:
grpc_client_config:
Expand Down
2 changes: 1 addition & 1 deletion development/tsdb-blocks-storage-s3/compose-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ cd $SCRIPT_DIR && make
# -gcflags "all=-N -l" disables optimizations that allow for better run with combination with Delve debugger.
# GOARCH is not changed.
CGO_ENABLED=0 GOOS=linux go build -mod=vendor -gcflags "all=-N -l" -o ${SCRIPT_DIR}/mimir ${SCRIPT_DIR}/../../cmd/mimir
docker-compose -f ${SCRIPT_DIR}/docker-compose.yml build distributor
docker-compose -f ${SCRIPT_DIR}/docker-compose.yml build distributor-1
docker-compose -f ${SCRIPT_DIR}/docker-compose.yml up $@
4 changes: 2 additions & 2 deletions development/tsdb-blocks-storage-s3/config/grafana-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ prometheus:
scrape_configs:
- job_name: tsdb-blocks-storage-s3/distributor
static_configs:
- targets: ['distributor:8001']
- targets: ['distributor-1:8000', 'distributor-2:8001']
labels:
cluster: 'docker-compose'
namespace: 'tsdb-blocks-storage-s3'
Expand Down Expand Up @@ -61,4 +61,4 @@ prometheus:
namespace: 'tsdb-blocks-storage-s3'

remote_write:
- url: http://distributor:8001/api/v1/push
- url: http://distributor-1:8000/api/v1/push
1 change: 0 additions & 1 deletion development/tsdb-blocks-storage-s3/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ distributor:
pool:
health_check_ingesters: true
ring:
instance_addr: 127.0.0.1
kvstore:
store: consul
consul:
Expand Down
4 changes: 2 additions & 2 deletions development/tsdb-blocks-storage-s3/config/prometheus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ global:
scrape_configs:
- job_name: tsdb-blocks-storage-s3/distributor
static_configs:
- targets: ['distributor:8001']
- targets: ['distributor-1:8000', 'distributor-2:8001']
labels:
cluster: 'docker-compose'
namespace: 'tsdb-blocks-storage-s3'
Expand Down Expand Up @@ -54,5 +54,5 @@ scrape_configs:
namespace: 'tsdb-blocks-storage-s3'

remote_write:
- url: http://distributor:8001/api/v1/push
- url: http://distributor-1:8000/api/v1/push
send_exemplars: true
7 changes: 6 additions & 1 deletion development/tsdb-blocks-storage-s3/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ std.manifestYamlDoc({
{},

distributor:: {
distributor: mimirService({
'distributor-1': mimirService({
target: 'distributor',
httpPort: 8000,
}),

'distributor-2': mimirService({
target: 'distributor',
httpPort: 8001,
}),
Expand Down
25 changes: 24 additions & 1 deletion development/tsdb-blocks-storage-s3/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,30 @@
"image": "consul"
"ports":
- "8500:8500"
"distributor":
"distributor-1":
"build":
"context": "."
"dockerfile": "dev.dockerfile"
"command":
- "sh"
- "-c"
- "sleep 3 && exec ./mimir -config.file=./config/mimir.yaml -target=distributor -server.http-listen-port=8000 -server.grpc-listen-port=9000 -activity-tracker.filepath=/activity/distributor-8000 "
"depends_on":
- "minio"
- "consul"
"environment":
- "JAEGER_AGENT_HOST=jaeger"
- "JAEGER_AGENT_PORT=6831"
- "JAEGER_SAMPLER_PARAM=1"
- "JAEGER_SAMPLER_TYPE=const"
- "JAEGER_TAGS=app=distributor"
"image": "mimir"
"ports":
- "8000:8000"
"volumes":
- "./config:/mimir/config"
- "./activity:/activity"
"distributor-2":
"build":
"context": "."
"dockerfile": "dev.dockerfile"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ multitenancy_enabled: false
distributor:
pool:
health_check_ingesters: true
ring:
kvstore:
store: consul
consul:
host: consul:8500

ingester_client:
grpc_client_config:
Expand Down
141 changes: 98 additions & 43 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/limiter"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
Expand Down Expand Up @@ -65,8 +66,12 @@ var (
)

const (
// DistributorRingKey is the key under which we store the distributors ring in the KVStore.
DistributorRingKey = "distributor"
// distributorRingKey is the key under which we store the distributors ring in the KVStore.
distributorRingKey = "distributor"

// ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
// in the ring will be automatically removed after.
ringAutoForgetUnhealthyPeriods = 10
)

const (
Expand All @@ -87,8 +92,9 @@ type Distributor struct {

// The global rate limiter requires a distributors ring to count
// the number of healthy instances
distributorsLifeCycler *ring.Lifecycler
distributorsLifecycler *ring.BasicLifecycler
distributorsRing *ring.Ring
healthyInstancesCount *atomic.Uint32

// For handling HA replicas.
HATracker *haTracker
Expand Down Expand Up @@ -206,44 +212,16 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
subservices := []services.Service(nil)
subservices = append(subservices, haTracker)

// Create the configured ingestion rate limit strategy (local or global). In case
// it's an internal dependency and can't join the distributors ring, we skip rate
// limiting.
var ingestionRateStrategy, requestRateStrategy limiter.RateLimiterStrategy
var distributorsLifeCycler *ring.Lifecycler
var distributorsRing *ring.Ring

if !canJoinDistributorsRing {
requestRateStrategy = newInfiniteRateStrategy()
ingestionRateStrategy = newInfiniteRateStrategy()
} else {
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", DistributorRingKey, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, err
}

distributorsRing, err = ring.New(cfg.DistributorRing.ToRingConfig(), "distributor", DistributorRingKey, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, errors.Wrap(err, "failed to initialize distributors' ring client")
}
subservices = append(subservices, distributorsLifeCycler, distributorsRing)

requestRateStrategy = newGlobalRateStrategy(newRequestRateStrategy(limits), distributorsLifeCycler)
ingestionRateStrategy = newGlobalRateStrategy(newIngestionRateStrategy(limits), distributorsLifeCycler)
}

d := &Distributor{
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
distributorsLifeCycler: distributorsLifeCycler,
distributorsRing: distributorsRing,
limits: limits,
requestRateLimiter: limiter.NewRateLimiter(requestRateStrategy, 10*time.Second),
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
healthyInstancesCount: atomic.NewUint32(0),
limits: limits,
forwarder: forwarding.NewForwarder(reg, cfg.Forwarding),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),

queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Expand Down Expand Up @@ -351,7 +329,31 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
return d.ingestionRate.Rate()
})

d.forwarder = forwarding.NewForwarder(reg, d.cfg.Forwarding)
// Create the configured ingestion rate limit strategy (local or global). In case
// it's an internal dependency and we can't join the distributors ring, we skip rate
// limiting.
var ingestionRateStrategy, requestRateStrategy limiter.RateLimiterStrategy
var distributorsLifecycler *ring.BasicLifecycler
var distributorsRing *ring.Ring

if !canJoinDistributorsRing {
requestRateStrategy = newInfiniteRateStrategy()
ingestionRateStrategy = newInfiniteRateStrategy()
} else {
distributorsRing, distributorsLifecycler, err = newRingAndLifecycler(cfg.DistributorRing, d.healthyInstancesCount, log, reg)
if err != nil {
return nil, err
}

subservices = append(subservices, distributorsLifecycler, distributorsRing)
requestRateStrategy = newGlobalRateStrategy(newRequestRateStrategy(limits), d)
ingestionRateStrategy = newGlobalRateStrategy(newIngestionRateStrategy(limits), d)
}

d.requestRateLimiter = limiter.NewRateLimiter(requestRateStrategy, 10*time.Second)
d.ingestionRateLimiter = limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second)
d.distributorsLifecycler = distributorsLifecycler
d.distributorsRing = distributorsRing

d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
d.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(d.cleanupInactiveUser)
Expand All @@ -361,16 +363,60 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
if err != nil {
return nil, err
}

d.subservicesWatcher = services.NewFailureWatcher()
d.subservicesWatcher.WatchManager(d.subservices)

d.Service = services.NewBasicService(d.starting, d.running, d.stopping)
return d, nil
}

// newRingAndLifecycler creates a new distributor ring and lifecycler with all required lifecycler delegates
func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger log.Logger, reg prometheus.Registerer) (*ring.Ring, *ring.BasicLifecycler, error) {
kvStore, err := kv.NewClient(cfg.KVStore, ring.GetCodec(), kv.RegistererWithKVName(reg, "distributor-lifecycler"), logger)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to initialize distributors' KV store")
}

lifecyclerCfg, err := cfg.ToBasicLifecyclerConfig(logger)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to build distributors' lifecycler config")
}

var delegate ring.BasicLifecyclerDelegate
delegate = ring.NewInstanceRegisterDelegate(ring.ACTIVE, ringNumTokens)
delegate = newHealthyInstanceDelegate(instanceCount, cfg.HeartbeatTimeout, delegate)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, logger)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.HeartbeatTimeout, delegate, logger)

distributorsLifecycler, err := ring.NewBasicLifecycler(lifecyclerCfg, "distributor", distributorRingKey, kvStore, delegate, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, nil, errors.Wrap(err, "failed to initialize distributors' lifecycler")
}

distributorsRing, err := ring.New(cfg.ToRingConfig(), "distributor", distributorRingKey, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, nil, errors.Wrap(err, "failed to initialize distributors' ring client")
}

return distributorsRing, distributorsLifecycler, nil
}

func (d *Distributor) starting(ctx context.Context) error {
// Only report success if all sub-services start properly
return services.StartManagerAndAwaitHealthy(ctx, d.subservices)
if err := services.StartManagerAndAwaitHealthy(ctx, d.subservices); err != nil {
return errors.Wrap(err, "unable to start distributor subservices")
}

// Distributors get embedded in rulers and queriers to talk to ingesters on the query path. In that
// case they won't have a distributor lifecycler or ring so don't try to join the distributor ring.
if d.distributorsLifecycler != nil && d.distributorsRing != nil {
level.Info(d.log).Log("msg", "waiting until distributor is ACTIVE in the ring")
if err := ring.WaitInstanceState(ctx, d.distributorsRing, d.distributorsLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
return err
}
}

return nil
}

func (d *Distributor) running(ctx context.Context) error {
Expand Down Expand Up @@ -1420,3 +1466,12 @@ func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request) {
util.WriteHTMLResponse(w, ringNotEnabledPage)
}
}

// HealthyInstancesCount implements the ReadLifecycler interface
//
// We use a ring lifecycler delegate to count the number of members of the
// ring. The count is then used to enforce rate limiting correctly for each
// distributor. $EFFECTIVE_RATE_LIMIT = $GLOBAL_RATE_LIMIT / $NUM_INSTANCES
func (d *Distributor) HealthyInstancesCount() int {
return int(d.healthyInstancesCount.Load())
}
Loading

0 comments on commit 4247214

Please sign in to comment.