Skip to content

Commit

Permalink
Merge pull request #905 from rax-maas/discovery-throttling
Browse files Browse the repository at this point in the history
add discovery throttling
  • Loading branch information
iWebi committed Aug 13, 2022
2 parents 7967908 + c9b43af commit 72bd95c
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,28 @@
package com.rackspacecloud.blueflood.inputs.processors;

import com.codahale.metrics.Meter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ListenableFuture;
import com.rackspacecloud.blueflood.cache.LocatorCache;
import com.rackspacecloud.blueflood.concurrent.FunctionWithThreadPool;
import com.rackspacecloud.blueflood.io.DiscoveryIO;
import com.rackspacecloud.blueflood.service.Configuration;
import com.rackspacecloud.blueflood.service.CoreConfig;
import com.rackspacecloud.blueflood.types.IMetric;
import com.rackspacecloud.blueflood.types.Locator;
import com.rackspacecloud.blueflood.utils.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class DiscoveryWriter extends FunctionWithThreadPool<List<List<IMetric>>, Void> {

Expand All @@ -42,12 +47,45 @@ public class DiscoveryWriter extends FunctionWithThreadPool<List<List<IMetric>>,
private static final Meter locatorsWritten =
Metrics.meter(DiscoveryWriter.class, "Locators Written to Discovery");
private static final Logger log = LoggerFactory.getLogger(DiscoveryWriter.class);
private int maxNewLocatorsPerMinute;
private int maxNewLocatorsPerMinutePerTenant;
private final Cache<Object, Object> newLocatorsThrottle;
private final LoadingCache<String, Cache<Locator, Locator>> newLocatorsThrottlePerTenant;
private final boolean canIndex;

public DiscoveryWriter(ThreadPoolExecutor threadPool) {
super(threadPool);
registerIOModules();
this.canIndex = discoveryIOs.size() > 0;
maxNewLocatorsPerMinute = Configuration.getInstance().getIntegerProperty(
CoreConfig.DISCOVERY_MAX_NEW_LOCATORS_PER_MINUTE);
maxNewLocatorsPerMinutePerTenant = Configuration.getInstance().getIntegerProperty(
CoreConfig.DISCOVERY_MAX_NEW_LOCATORS_PER_MINUTE_PER_TENANT);
newLocatorsThrottle = CacheBuilder.newBuilder()
.concurrencyLevel(16).expireAfterWrite(1, TimeUnit.MINUTES).build();
// This throttle is a cache of caches, one per tenant. It's convenient to have it automatically build the nested
// cache on demand, rather than having to do it manually.
newLocatorsThrottlePerTenant = CacheBuilder.newBuilder()
.concurrencyLevel(16)
.expireAfterAccess(1, TimeUnit.MINUTES)
.build(new CacheLoader<String, Cache<Locator, Locator>>() {
@Override
public Cache<Locator, Locator> load(String key) {
return CacheBuilder.newBuilder()
.concurrencyLevel(16)
.expireAfterWrite(1, TimeUnit.MINUTES)
.build();
}
}
);
}

public void setMaxNewLocatorsPerMinute(int maxNewLocatorsPerMinute) {
this.maxNewLocatorsPerMinute = maxNewLocatorsPerMinute;
}

public void setMaxNewLocatorsPerMinutePerTenant(int maxNewLocatorsPerMinutePerTenant) {
this.maxNewLocatorsPerMinutePerTenant = maxNewLocatorsPerMinutePerTenant;
}

public void registerIO(DiscoveryIO io) {
Expand Down Expand Up @@ -82,7 +120,16 @@ public void registerIOModules() {
}
}

private static List<IMetric> condense(List<List<IMetric>> input) {
private List<IMetric> condense(List<List<IMetric>> input) throws ExecutionException {
newLocatorsThrottle.cleanUp();
Set<String> tenants = input.stream()
.flatMap(Collection::stream)
.map(IMetric::getLocator)
.map(Locator::getTenantId)
.collect(Collectors.toSet());
for (String tenant : tenants) {
newLocatorsThrottlePerTenant.get(tenant).cleanUp();
}
List<IMetric> willIndex = new ArrayList<IMetric>();
for (List<IMetric> list : input) {
// make mockito happy.
Expand All @@ -91,7 +138,11 @@ private static List<IMetric> condense(List<List<IMetric>> input) {
}

for (IMetric m : list) {
if (!LocatorCache.getInstance().isLocatorCurrentInDiscoveryLayer(m.getLocator())) {
boolean isAlreadySeen = LocatorCache.getInstance().isLocatorCurrentInDiscoveryLayer(m.getLocator());
boolean isThrottlingGlobally = newLocatorsThrottle.size() >= maxNewLocatorsPerMinute;
Cache<Locator, Locator> tenantThrottle = newLocatorsThrottlePerTenant.get(m.getLocator().getTenantId());
boolean isTenantThrottled = tenantThrottle.size() >= maxNewLocatorsPerMinutePerTenant;
if (!isAlreadySeen && !isThrottlingGlobally && !isTenantThrottled) {
willIndex.add(m);
}
}
Expand All @@ -113,23 +164,28 @@ public ListenableFuture<Boolean> processMetrics(final List<List<IMetric>> input)
public Boolean call() throws Exception {
boolean success = true;
// filter out the metrics that are current.
final List<IMetric> willIndex = DiscoveryWriter.condense(input);

locatorsWritten.mark(willIndex.size());
for (DiscoveryIO io : discoveryIOs) {
try {
io.insertDiscovery(willIndex);
} catch (Exception ex) {
getLogger().error(ex.getMessage(), ex);
writeErrorMeters.get(io.getClass()).mark();
success = false;
final List<IMetric> willIndex = condense(input);

if (willIndex.size() > 0) {
locatorsWritten.mark(willIndex.size());
for (DiscoveryIO io : discoveryIOs) {
try {
io.insertDiscovery(willIndex);
} catch (Exception ex) {
getLogger().error(ex.getMessage(), ex);
writeErrorMeters.get(io.getClass()).mark();
success = false;
}
}
}

if(success) {
//when all metrics have been written successfully, mark them as current.
for(IMetric indexedMetric: willIndex) {
LocatorCache.getInstance().setLocatorCurrentInDiscoveryLayer(indexedMetric.getLocator());
Locator locator = indexedMetric.getLocator();
LocatorCache.getInstance().setLocatorCurrentInDiscoveryLayer(locator);
newLocatorsThrottle.put(locator, locator);
newLocatorsThrottlePerTenant.get(locator.getTenantId()).put(locator, locator);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,40 @@ public enum CoreConfig implements ConfigDefaults {
MAX_ROLLUP_READ_THREADS("20"),
MAX_ROLLUP_WRITE_THREADS("5"),

// Discovery refers to indexing the details of locators so that they're easily searchable. Discovery modules
// implement specific discovery mechanisms. Elasticsearch is an example of a discovery backend. Locators are indexed
// in Elasticsearch and retrieved later to fulfill query requests.
//
// To avoid overwhelming a backend service like Elasticsearch, in-memory caches keep track of details that have
// already been written. For any given time series identified by a unique locator, that time series only needs
// discovery data written one time, so after writing a locator, the fact that it's written is cached, and we don't
// write the same locator again. See the LOCATOR_CACHE_* and TOKEN_CACHE_* settings for configuring the
// discovery-related caches.
//
// Blueflood startup is a special case that has to be dealt with. At startup, the caches are empty, so Blueflood
// treats every locator it sees as new. This can easily lead to an excessive number of calls to a discovery backend
// like Elasticsearch. In addition to the caches, Blueflood has discovery throttling to help avoid this case. This
// throttling can be used to slow down discovery writes on ingestion. Slowing it down reduces overall impact on
// Elasticsearch performance. The tradeoff is that it slows the rate at which new locators are discovered, so they
// can't be queried immediately.

// Sets the maximum number of new locators to write to discovery backends per minute. This is a global limit that
// covers all tenants. It's not a fair throttle, meaning that if one tenant floods the system and maxes out the
// throttle, all other tenants will be throttled as well. As such, set this as high as you can without causing
// instability in your discovery backends. The configured number isn't precise. Ingestion requests are always fully
// processed, and throttling happens between requests.
//
// When setting the global throttle, be sure to take into account how many Blueflood nodes you have, as well as how
// many of them typically restart at the same time. This corresponds directly to the load Blueflood will put on
// discovery backends while the caches fill at startup. Effectively, this setting should be something like:
// (max acceptable backend writes) / (number of Blueflood nodes that can start/restart simultaneously)
DISCOVERY_MAX_NEW_LOCATORS_PER_MINUTE("50000"),
// Sets the maximum number of new locators to write to discovery backends per tenant per minute. This is a fair
// throttle that will throttle each tenant independently. Normally you'll set this significantly lower than
// DISCOVERY_MAX_NEW_LOCATORS_PER_MINUTE to give all tenants a fair chance of their metrics being processed for
// discovery. The global throttle overrides the per-tenant throttle. Even if a tenant hasn't reached its per-tenant
// limit, the global throttle can halt the tenant's discovery writes.
DISCOVERY_MAX_NEW_LOCATORS_PER_MINUTE_PER_TENANT("500"),
DISCOVERY_WRITER_MIN_THREADS("5"),
DISCOVERY_WRITER_MAX_THREADS("50"),

Expand Down
Loading

0 comments on commit 72bd95c

Please sign in to comment.