Skip to content

Commit

Permalink
Merge pull request #787 from ChandraAddala/es-token-search
Browse files Browse the repository at this point in the history
Metric Name search
  • Loading branch information
ChandraAddala committed Mar 1, 2017
2 parents 7cfbef6 + a2cae03 commit 37f2666
Show file tree
Hide file tree
Showing 33 changed files with 1,473 additions and 84 deletions.
Expand Up @@ -10,6 +10,20 @@

import java.util.concurrent.TimeUnit;

/**
* This class is used to cache locator's that were written recently to our persistence layers by the available writers.
* All the writers, check the cache to see if it is written recently(isCurrent) before writing them again.
*
* Different writers that we have:
*
* {@link com.rackspacecloud.blueflood.inputs.processors.BatchWriter} This writes to cassandra
* {@link com.rackspacecloud.blueflood.inputs.processors.DiscoveryWriter} This supports metric discovery (/metric/search)
* {@link com.rackspacecloud.blueflood.inputs.processors.TokenDiscoveryWriter} This support metric tokens discovery (metric_name/search)
*
* Each writer maintains its own indicator in {@link LocatorCacheEntry} to indicate whether a locator is current. This
* is useful in cases, where persisting a locator with one writer is successful but not with other writers.
*
*/
public class LocatorCache {

// this collection is used to reduce the number of locators that get written.
Expand Down Expand Up @@ -102,6 +116,17 @@ public synchronized boolean isLocatorCurrentInDiscoveryLayer(Locator loc) {
return entry != null && entry.isDiscoveryCurrent();
}

/**
* Checks if Locator is recently inserted in the token discovery layer
*
* @param loc
* @return
*/
public synchronized boolean isLocatorCurrentInTokenDiscoveryLayer(Locator loc) {
LocatorCacheEntry entry = insertedLocators.getIfPresent(loc.toString());
return entry != null && entry.isTokenDiscoveryCurrent();
}

/**
* Check if the delayed locator is recently inserted for a given slot
*
Expand Down Expand Up @@ -144,6 +169,14 @@ public synchronized void setLocatorCurrentInDiscoveryLayer(Locator loc) {
getOrCreateInsertedLocatorEntry(loc).setDiscoveryCurrent();
}

/**
* Marks the Locator as recently inserted in the token discovery layer
* @param loc
*/
public synchronized void setLocatorCurrentInTokenDiscoveryLayer(Locator loc) {
getOrCreateInsertedLocatorEntry(loc).setTokenDiscoveryCurrent();
}

/**
* Marks the delayed locator as recently inserted for a given slot
* @param slot
Expand All @@ -170,6 +203,7 @@ public synchronized void resetInsertedLocatorsCache() {
private class LocatorCacheEntry {
private boolean discoveryCurrent = false;
private boolean batchCurrent = false;
private boolean tokenDiscoveryCurrent = false;

void setDiscoveryCurrent() {
this.discoveryCurrent = true;
Expand All @@ -179,13 +213,21 @@ void setBatchCurrent() {
this.batchCurrent = true;
}

void setTokenDiscoveryCurrent() {
this.tokenDiscoveryCurrent = true;
}

boolean isDiscoveryCurrent() {
return discoveryCurrent;
}

boolean isBatchCurrent() {
return batchCurrent;
}

boolean isTokenDiscoveryCurrent() {
return tokenDiscoveryCurrent;
}
}

}
@@ -0,0 +1,75 @@
package com.rackspacecloud.blueflood.cache;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.rackspacecloud.blueflood.types.Token;
import com.rackspacecloud.blueflood.utils.Metrics;

import java.util.concurrent.TimeUnit;

public class TokenCache {

// this collection is used to reduce the number of tokens that get written.
// Simply, if a token has been seen within the last 10 minutes, don't bother.
private final Cache<String, Boolean> insertedTokens;

private static TokenCache instance = new TokenCache(10, TimeUnit.MINUTES,
3, TimeUnit.DAYS);


static {
Metrics.getRegistry().register(MetricRegistry.name(TokenCache.class, "Current Tokens Count"),
(Gauge<Long>) instance::getCurrentLocatorCount);
}

public static TokenCache getInstance() {
return instance;
}

protected TokenCache(long expireAfterAccessDuration, TimeUnit expireAfterAccessTimeUnit,
long expireAfterWriteDuration, TimeUnit expireAfterWriteTimeUnit) {

insertedTokens =
CacheBuilder.newBuilder()
.expireAfterAccess(expireAfterAccessDuration, expireAfterAccessTimeUnit)
.expireAfterWrite(expireAfterWriteDuration, expireAfterWriteTimeUnit)
.concurrencyLevel(16)
.build();
}

@VisibleForTesting
public static TokenCache getInstance(long expireAfterAccessDuration, TimeUnit expireAfterAccessTimeUnit,
long expireAfterWriteDuration, TimeUnit expireAfterWriteTimeUnit) {

return new TokenCache(expireAfterAccessDuration, expireAfterAccessTimeUnit,
expireAfterWriteDuration, expireAfterWriteTimeUnit);
}

public long getCurrentLocatorCount() {
return insertedTokens.size();
}

/**
* Checks if token is recently inserted
*
*/
public synchronized boolean isTokenCurrent(Token token) {
return insertedTokens.getIfPresent(token.getId()) != null;
}

/**
* Marks the token as recently inserted
*/
public synchronized void setTokenCurrent(Token token) {
insertedTokens.put(token.getId(), Boolean.TRUE);
}

@VisibleForTesting
public synchronized void resetCache() {
insertedTokens.invalidateAll();
}

}
@@ -0,0 +1,160 @@
package com.rackspacecloud.blueflood.inputs.processors;

import com.codahale.metrics.Meter;
import com.google.common.util.concurrent.ListenableFuture;
import com.rackspacecloud.blueflood.cache.LocatorCache;
import com.rackspacecloud.blueflood.cache.TokenCache;
import com.rackspacecloud.blueflood.concurrent.FunctionWithThreadPool;
import com.rackspacecloud.blueflood.io.TokenDiscoveryIO;
import com.rackspacecloud.blueflood.service.Configuration;
import com.rackspacecloud.blueflood.service.CoreConfig;
import com.rackspacecloud.blueflood.types.IMetric;
import com.rackspacecloud.blueflood.types.Locator;
import com.rackspacecloud.blueflood.types.Token;
import com.rackspacecloud.blueflood.utils.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;

/**
* A separate discovery writer for tokens. This class is modelled after {@link DiscoveryWriter}
*/
public class TokenDiscoveryWriter extends FunctionWithThreadPool<List<List<IMetric>>, Void> {

private final List<TokenDiscoveryIO> tokenDiscoveryIOs = new ArrayList<>();
private final Map<Class<? extends TokenDiscoveryIO>, Meter> writeErrorMeters = new HashMap<>();
private static final Logger log = LoggerFactory.getLogger(TokenDiscoveryWriter.class);

public TokenDiscoveryWriter(ThreadPoolExecutor executor) {
super(executor);
registerIOModules();
}

public void registerIO(TokenDiscoveryIO io) {
tokenDiscoveryIOs.add(io);
writeErrorMeters.put(io.getClass(),
Metrics.meter(io.getClass(), "TokenDiscoveryWriter Write Errors")
);
}

public void registerIOModules() {
List<String> modules = Configuration.getInstance().getListProperty(CoreConfig.TOKEN_DISCOVERY_MODULES);

ClassLoader classLoader = TokenDiscoveryIO.class.getClassLoader();
for (String module : modules) {
log.info("Loading token discovery module " + module);
try {
Class discoveryClass = classLoader.loadClass(module);
TokenDiscoveryIO discoveryIOModule = (TokenDiscoveryIO) discoveryClass.newInstance();
log.info("Registering token discovery module " + module);
registerIO(discoveryIOModule);
} catch (InstantiationException e) {
log.error("Unable to create instance of token discovery class for: " + module, e);
} catch (IllegalAccessException e) {
log.error("Error starting token discovery module: " + module, e);
} catch (ClassNotFoundException e) {
log.error("Unable to locate token discovery module: " + module, e);
} catch (RuntimeException e) {
log.error("Error starting token discovery module: " + module, e);
} catch (Throwable e) {
log.error("Error starting token discovery module: " + module, e);
}
}
}

/**
* For all of these {@link Locator}'s, get a unique list of {@link Token}'s which are not in token cache
*
* @param locators
* @return
*/
static Set<Token> getUniqueTokens(final List<Locator> locators) {

return Token.getUniqueTokens(locators.stream())
.filter(token -> !TokenCache.getInstance().isTokenCurrent(token))
.collect(toSet());
}

/**
* Get all {@link Locator}'s corresponding to the metrics which are not current.
*
* @param input
* @return
*/
static List<Locator> getLocators(final List<List<IMetric>> input) {

//converting list of lists of metrics to flat list of locators that are not current.
return input.stream()
.flatMap(List::stream)
.map(IMetric::getLocator)
.filter(locator -> !LocatorCache.getInstance().isLocatorCurrentInTokenDiscoveryLayer(locator))
.collect(toList());
}

/**
* For a given batch of metrics, insert unique tokens using {@link TokenDiscoveryIO}.
*
* This methods has the following steps.
* 1) For a batch of metrics, get all locators, which are not current by calling {@link #getLocators(List)}.
* 2) For all these locators, get unique tokens, which are not current, by calling {@link #getUniqueTokens(List)}
* 3) insert tokens
* 4) After successful insertion, update both {@link LocatorCache} and {@link TokenCache}
*
* @param input
* @return
*/
public ListenableFuture<Boolean> processTokens(final List<List<IMetric>> input) {

return getThreadPool().submit(() -> {
boolean success = true;

List<Locator> locators = getLocators(input);

List<Token> tokens = new ArrayList<>();
tokens.addAll(getUniqueTokens(locators));

if (tokens.size() > 0) {
for (TokenDiscoveryIO io : tokenDiscoveryIOs) {
try {
io.insertDiscovery(tokens);
} catch (Exception ex) {
getLogger().error(ex.getMessage(), ex);
writeErrorMeters.get(io.getClass()).mark();
success = false;
}
}
}

if (success && tokens.size() > 0) {

tokens.stream()
.filter(token -> !token.isLeaf()) //do not cache leaf nodes
.forEach(token -> {

//updating token cache
TokenCache.getInstance().setTokenCurrent(token);
});

locators.stream()
.forEach(locator -> {

//updating locator cache
LocatorCache.getInstance().setLocatorCurrentInTokenDiscoveryLayer(locator);
});
}

return success;
});
}

@Override
public Void apply(List<List<IMetric>> input) throws Exception {
processTokens(input);
return null;
}
}
Expand Up @@ -3,12 +3,11 @@
import com.rackspacecloud.blueflood.types.IMetric;
import java.util.List;

public interface DiscoveryIO {
public interface DiscoveryIO extends MetricNameSearchIO {

public void insertDiscovery(IMetric metric) throws Exception;
public void insertDiscovery(List<IMetric> metrics) throws Exception;
public List<SearchResult> search(String tenant, String query) throws Exception;
public List<SearchResult> search(String tenant, List<String> queries) throws Exception;
public List<MetricName> getMetricNames(String tenant, String prefix) throws Exception;

}
@@ -0,0 +1,7 @@
package com.rackspacecloud.blueflood.io;

import java.util.List;

public interface MetricNameSearchIO {
public List<MetricName> getMetricNames(String tenant, String query) throws Exception;
}
@@ -0,0 +1,12 @@
package com.rackspacecloud.blueflood.io;


import com.rackspacecloud.blueflood.types.Token;

import java.io.IOException;
import java.util.List;

public interface TokenDiscoveryIO extends MetricNameSearchIO {
public void insertDiscovery(Token token) throws IOException;
public void insertDiscovery(List<Token> tokens) throws IOException;
}
Expand Up @@ -46,6 +46,7 @@ public enum CoreConfig implements ConfigDefaults {
INGESTION_MODULES(""),
QUERY_MODULES(""),
DISCOVERY_MODULES(""),
TOKEN_DISCOVERY_MODULES(""),
EVENT_LISTENER_MODULES(""),
EVENTS_MODULES(""),

Expand All @@ -56,6 +57,9 @@ public enum CoreConfig implements ConfigDefaults {
DISCOVERY_WRITER_MIN_THREADS("5"),
DISCOVERY_WRITER_MAX_THREADS("10"),

TOKEN_DISCOVERY_WRITER_MIN_THREADS("5"),
TOKEN_DISCOVERY_WRITER_MAX_THREADS("10"),

// Maximum threads that would access the cache concurrently
META_CACHE_MAX_CONCURRENCY("50"),

Expand Down Expand Up @@ -175,6 +179,9 @@ public enum CoreConfig implements ConfigDefaults {
// 10 minutes
AFTER_CURRENT_COLLECTIONTIME_MS("600000"),

//Feature to improve token search. Enabling this will persist metric name tokens separately.
ENABLE_TOKEN_SEARCH_IMPROVEMENTS("false"),

// Cross-Origin Resource Sharing
CORS_ENABLED("false"),
CORS_ALLOWED_ORIGINS("*"),
Expand Down

0 comments on commit 37f2666

Please sign in to comment.