Permalink
Browse files

Preliminary D2Event schema and D2Event support

Pass in EventEmitter object from container

EnumSet improvement

Add weightedRateLimiter

Update schema

D2Monitor to emit d2events

Simplify schema again

D2Monitor defines d2 internal events/activities/statistics that can be emitted through
EventEmitter. The purpose is to collect internal information about d2 from d2client in
order to build up a global view of service/cluster overview and individual server status.

Prepare to emit

Pass the clusterName to the strategy

Remove the files that already checked in.
  • Loading branch information...
ChaoLinkedIn committed Sep 23, 2016
1 parent ad53f07 commit 05922f9fd2bc3ca94b70a082b6f91dafa0834df2
Showing with 398 additions and 62 deletions.
  1. +9 −1 d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java
  2. +12 −2 d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java
  3. +4 −3 d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java
  4. +12 −11 d2/src/main/java/com/linkedin/d2/balancer/event/D2MonitorBuilder.java
  5. +10 −0 d2/src/main/java/com/linkedin/d2/balancer/event/EventEmitter.java
  6. +2 −0 d2/src/main/java/com/linkedin/d2/balancer/properties/PropertyKeys.java
  7. +6 −5 d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancerState.java
  8. +5 −0 d2/src/main/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerQuarantine.java
  9. +64 −6 ...rc/main/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStrategyConfig.java
  10. +6 −2 ...main/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStrategyFactoryV3.java
  11. +111 −17 d2/src/main/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStrategyV3.java
  12. +3 −2 d2/src/test/java/com/linkedin/d2/balancer/simple/LoadBalancerSimulator.java
  13. +105 −3 d2/src/test/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancerDelayTest.java
  14. +8 −2 d2/src/test/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStateTest.java
  15. +2 −2 ...est/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStrategyConfigTest.java
  16. +17 −4 d2/src/test/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerTest.java
  17. +7 −0 degrader/src/main/java/com/linkedin/util/degrader/DegraderControl.java
  18. +2 −0 degrader/src/main/java/com/linkedin/util/degrader/DegraderControlMBean.java
  19. +13 −2 degrader/src/main/java/com/linkedin/util/degrader/DegraderImpl.java
@@ -24,6 +24,7 @@
import com.linkedin.d2.balancer.clients.BackupRequestsClient;
import com.linkedin.d2.balancer.clients.DynamicClient;
import com.linkedin.d2.balancer.clients.RetryClient;
import com.linkedin.d2.balancer.event.EventEmitter;
import com.linkedin.d2.balancer.util.healthcheck.HealthCheckOperations;
import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl;
import com.linkedin.r2.message.RequestContext;
@@ -99,7 +100,8 @@ public D2Client build()
_config.retryLimit,
_config.warmUp,
_config.warmUpTimeoutSeconds,
_config.warmUpConcurrentRequests);
_config.warmUpConcurrentRequests,
_config._eventEmitter);
final LoadBalancerWithFacilities loadBalancer = loadBalancerFactory.create(cfg);
@@ -284,6 +286,12 @@ public D2ClientBuilder setRetryLimit(int retryLimit)
return this;
}
public D2ClientBuilder setEventEmitter(EventEmitter eventEmitter)
{
_config._eventEmitter = eventEmitter;
return this;
}
/**
* Specify {@link TransportClientFactory} to generate the client for specific protocol.
* Caller is responsible to maintain the life cycle of the factories.
@@ -16,6 +16,7 @@
package com.linkedin.d2.balancer;
import com.linkedin.d2.backuprequests.BackupRequestsStrategyStatsConsumer;
import com.linkedin.d2.balancer.event.EventEmitter;
import com.linkedin.d2.balancer.util.WarmUpLoadBalancer;
import com.linkedin.d2.balancer.util.healthcheck.HealthCheckOperations;
import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl;
@@ -58,13 +59,17 @@
ScheduledExecutorService _backupRequestsExecutorService = null;
boolean retry = false;
int retryLimit = DEAULT_RETRY_LIMIT;
<<<<<<< HEAD
boolean warmUp = false;
int warmUpTimeoutSeconds = WarmUpLoadBalancer.DEFAULT_SEND_REQUESTS_TIMEOUT_SECONDS;
int warmUpConcurrentRequests = WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS;
boolean backupRequestsEnabled = true;
BackupRequestsStrategyStatsConsumer backupRequestsStrategyStatsConsumer = null;
long backupRequestsLatencyNotificationInterval = 1;
TimeUnit backupRequestsLatencyNotificationIntervalUnit = TimeUnit.MINUTES;
=======
EventEmitter _eventEmitter = null;
>>>>>>> Preliminary D2Event schema and D2Event support
private static final int DEAULT_RETRY_LIMIT = 3;
@@ -251,7 +256,7 @@ public D2ClientConfig(String zkHosts,
isSymlinkAware,
clientServicesConfig,
d2ServicePath,
false, null, null, false, 3);
false, null, null, false, 3, null);
}
public D2ClientConfig(String zkHosts,
@@ -276,7 +281,8 @@ public D2ClientConfig(String zkHosts,
HealthCheckOperations healthCheckOperations,
ScheduledExecutorService executorService,
boolean retry,
int retryLimit)
int retryLimit,
EventEmitter emitter)
{
this(zkHosts,
zkSessionTimeoutInMs,
@@ -421,6 +427,7 @@ public D2ClientConfig(String zkHosts,
this._executorService = executorService;
this.retry = retry;
this.retryLimit = retryLimit;
<<<<<<< HEAD
this.warmUp = warmUp;
this.warmUpTimeoutSeconds = warmUpTimeoutSeconds;
this.warmUpConcurrentRequests = warmUpConcurrentRequests;
@@ -429,5 +436,8 @@ public D2ClientConfig(String zkHosts,
this.backupRequestsLatencyNotificationInterval = backupRequestsLatencyNotificationInterval;
this.backupRequestsLatencyNotificationIntervalUnit = backupRequestsLatencyNotificationIntervalUnit;
this._backupRequestsExecutorService = backupRequestsExecutorService;
=======
this._eventEmitter = emitter;
>>>>>>> Preliminary D2Event schema and D2Event support
}
}
@@ -16,6 +16,7 @@
package com.linkedin.d2.balancer;
import com.linkedin.d2.balancer.event.EventEmitter;
import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy;
import com.linkedin.d2.balancer.strategies.LoadBalancerStrategyFactory;
import com.linkedin.d2.balancer.strategies.degrader.DegraderLoadBalancerStrategyFactoryV3;
@@ -87,7 +88,7 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
}
final Map<String, LoadBalancerStrategyFactory<? extends LoadBalancerStrategy>> loadBalancerStrategyFactories =
createDefaultLoadBalancerStrategyFactories(config._healthCheckOperations, config._executorService);
createDefaultLoadBalancerStrategyFactories(config._healthCheckOperations, config._executorService, config._eventEmitter);
return new ZKFSTogglingLoadBalancerFactoryImpl(loadBalancerComponentFactory,
config.lbWaitTimeout,
@@ -106,14 +107,14 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
}
private Map<String, LoadBalancerStrategyFactory<? extends LoadBalancerStrategy>> createDefaultLoadBalancerStrategyFactories(
HealthCheckOperations healthCheckOperations, ScheduledExecutorService executorService)
HealthCheckOperations healthCheckOperations, ScheduledExecutorService executorService, EventEmitter emitter)
{
final Map<String, LoadBalancerStrategyFactory<? extends LoadBalancerStrategy>> loadBalancerStrategyFactories =
new HashMap<>();
final RandomLoadBalancerStrategyFactory randomStrategyFactory = new RandomLoadBalancerStrategyFactory();
final DegraderLoadBalancerStrategyFactoryV3 degraderStrategyFactoryV3 = new DegraderLoadBalancerStrategyFactoryV3(
healthCheckOperations, executorService);
healthCheckOperations, executorService, emitter);
loadBalancerStrategyFactories.put("random", randomStrategyFactory);
loadBalancerStrategyFactories.put("degrader", degraderStrategyFactoryV3);
@@ -40,18 +40,19 @@ public D2MonitorClusterStatsBuilder getClusterStatsBuilder()
return _clusterStatsBuilder;
}
public D2MonitorUriInfoBuilder getUriInfoBuilder(URI uri)
public D2MonitorUriInfoBuilder getOrCreateUriInfoBuilder(URI uri)
{
if (_uriInfoBuilderMap.containsKey(uri))
{
return _uriInfoBuilderMap.get(uri);
}
else
{
D2MonitorUriInfoBuilder builder = new D2MonitorUriInfoBuilder(uri);
_uriInfoBuilderMap.put(uri, builder);
return builder;
}
return _uriInfoBuilderMap.computeIfAbsent(uri, k -> new D2MonitorUriInfoBuilder(k));
}
public D2MonitorUriInfoBuilder addUriInfoBuilder(URI uri, D2MonitorUriInfoBuilder uriInfoBuilder)
{
return _uriInfoBuilderMap.putIfAbsent(uri, uriInfoBuilder);
}
public long getTimeStamp()
{
return _timeStamp;
}
public int getPartitionId()
@@ -0,0 +1,10 @@
package com.linkedin.d2.balancer.event;
/**
* {@link EventEmitter} defines the interface to emit D2Monitor event
*/
public interface EventEmitter
{
void emitEvent(D2Monitor event);
}
@@ -92,6 +92,8 @@
public static final String HTTP_LB_QUARANTINE_EXECUTOR_SERVICE = "http.loadBalancer.quarantine.executorService";
public static final String HTTP_LB_QUARANTINE_METHOD = "http.loadBalancer.quarantine.method";
public static final String HTTP_LB_ERROR_STATUS_REGEX = "http.loadBalancer.errorStatusRegex";
public static final String HTTP_LB_LOW_EVENT_EMITTING_INTERVAL = "http.loadBalancer.lowEmittingInterval";
public static final String HTTP_LB_HIGH_EVENT_EMITTING_INTERVAL = "http.loadBalancer.highEmittingInterval";
//used by service metadata properties
public static final String SERVICE_FOLLOW_REDIRECTION_MAX_HOP = "followRedirection.maxHop";
@@ -54,11 +54,6 @@
import com.linkedin.util.clock.Clock;
import com.linkedin.util.clock.SystemClock;
import com.linkedin.util.degrader.DegraderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
@@ -74,6 +69,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.linkedin.d2.discovery.util.LogUtil.debug;
import static com.linkedin.d2.discovery.util.LogUtil.info;
@@ -1503,6 +1502,8 @@ void refreshServiceStrategies(ServiceProperties serviceProperties)
// Save the service path as a property -- Quarantine may need this info to construct correct
// health checking path
loadBalancerStrategyProperties.put(PropertyKeys.PATH, serviceProperties.getPath());
// Also save the clusterName as a property
loadBalancerStrategyProperties.put(PropertyKeys.CLUSTER_NAME, serviceProperties.getClusterName());
LoadBalancerStrategy strategy = factory.newLoadBalancer(
serviceProperties.getServiceName(),
@@ -259,6 +259,11 @@ long getLastChecked()
return _lastChecked;
}
public long getTimeTilNextCheck()
{
return _timeTilNextCheck;
}
// For testing only
HealthCheck getHealthCheckClient()
{
@@ -16,6 +16,7 @@
package com.linkedin.d2.balancer.strategies.degrader;
import com.linkedin.d2.balancer.event.EventEmitter;
import com.linkedin.d2.balancer.properties.PropertyKeys;
import com.linkedin.d2.balancer.util.hashing.MPConsistentHashRing;
import com.linkedin.d2.balancer.util.healthcheck.HealthCheckOperations;
@@ -42,6 +43,7 @@
private final Map<String,Object> _hashConfig;
private final Clock _clock;
private static final Logger _log = LoggerFactory.getLogger(DegraderLoadBalancerStrategyConfig.class);
private final String _clusterName;
// this initialRecoveryLevel is the minimum proportion of hash ring points that a Tracker Client
// can have, and is a number from 0-1. A value of zero will remove the TC completely forever from
@@ -83,6 +85,16 @@
private final String _healthCheckPath;
private final long _quarantineLatency; // in Milliseconds
private final EventEmitter _eventEmitter;
// lowEventEmittingInterval and highEventEmittingInterval control the interval for d2monitor
// to emit events. lowEventEmittingInterval is used when there are abnormal events that need
// to emit at a higher frequency. highEventEmittingInterval is used when all the hosts are in
// healthy state. 'lowEventEmittingInterval == 0' disables d2monitor emitting.
//
// The settings directly depend on the number of clients and QPS.
private final long _lowEventEmittingInterval;
private final long _highEventEmittingInterval;
public static final Clock DEFAULT_CLOCK = SystemClock.instance();
public static final double DEFAULT_INITIAL_RECOVERY_LEVEL = 0.01;
public static final double DEFAULT_RAMP_FACTOR = 1.0;
@@ -113,6 +125,11 @@
public static final String DEFAULT_QUARANTINE_METHOD = RestMethod.OPTIONS;
private static final double QUARANTINE_MAXPERCENT_CAP = 0.5;
public static final long DEFAULT_LOW_EVENT_EMITTING_INTERVAL = 0; // Milliseconds. disable emitting by default
public static final long DEFAULT_HIGH_EVENT_EMITTING_INTERVAL = 60000; // Milliseconds
public static final String DEFAULT_CLUSTER_NAME = "UNDEFINED_CLUSTER";
public DegraderLoadBalancerStrategyConfig(long updateIntervalMs)
{
this(updateIntervalMs, DEFAULT_UPDATE_ONLY_AT_INTERVAL, 100, null, Collections.<String, Object>emptyMap(),
@@ -123,7 +140,8 @@ public DegraderLoadBalancerStrategyConfig(long updateIntervalMs)
DEFAULT_HASHRING_POINT_CLEANUP_RATE, null,
DEFAULT_NUM_PROBES, null,
DEFAULT_QUARANTINE_MAXPERCENT,
null, null, DEFAULT_QUARANTINE_METHOD, null, DegraderImpl.DEFAULT_LOW_LATENCY);
null, null, DEFAULT_QUARANTINE_METHOD, null, DegraderImpl.DEFAULT_LOW_LATENCY,
null, DEFAULT_LOW_EVENT_EMITTING_INTERVAL, DEFAULT_HIGH_EVENT_EMITTING_INTERVAL, DEFAULT_CLUSTER_NAME);
}
public DegraderLoadBalancerStrategyConfig(DegraderLoadBalancerStrategyConfig config)
@@ -151,7 +169,11 @@ public DegraderLoadBalancerStrategyConfig(DegraderLoadBalancerStrategyConfig con
config.getHealthCheckOperations(),
config.getHealthCheckMethod(),
config.getHealthCheckPath(),
config.getQuarantineLatency());
config.getQuarantineLatency(),
config.getEventEmitter(),
config.getLowEventEmittingInterval(),
config.getHighEventEmittingInterval(),
config.getClusterName());
}
public DegraderLoadBalancerStrategyConfig(long updateIntervalMs,
@@ -177,7 +199,11 @@ public DegraderLoadBalancerStrategyConfig(long updateIntervalMs,
HealthCheckOperations healthCheckOperations,
String healthCheckMethod,
String healthCheckPath,
long quarantineLatency)
long quarantineLatency,
EventEmitter emitter,
long lowEventEmittingInterval,
long highEventEmittingInterval,
String clusterName)
{
_updateIntervalMs = updateIntervalMs;
_updateOnlyAtInterval = updateOnlyAtInterval;
@@ -203,6 +229,10 @@ public DegraderLoadBalancerStrategyConfig(long updateIntervalMs,
_healthCheckMethod = healthCheckMethod;
_healthCheckPath = healthCheckPath;
_quarantineLatency = quarantineLatency;
_eventEmitter = emitter;
_lowEventEmittingInterval = lowEventEmittingInterval;
_highEventEmittingInterval = highEventEmittingInterval;
_clusterName = clusterName;
}
/**
@@ -226,12 +256,12 @@ public DegraderLoadBalancerStrategyConfig(long updateIntervalMs,
// @Deprecated -- could not be enforced since -Werror option.
static DegraderLoadBalancerStrategyConfig createHttpConfigFromMap(Map<String,Object> map)
{
return createHttpConfigFromMap(map, null, null, null);
return createHttpConfigFromMap(map, null, null, null, null);
}
static DegraderLoadBalancerStrategyConfig createHttpConfigFromMap(Map<String,Object> map,
HealthCheckOperations healthCheckOperations, ScheduledExecutorService overrideExecutorService,
Map<String, String> degraderProperties)
Map<String, String> degraderProperties, EventEmitter emitter)
{
Clock clock = MapUtil.getWithDefault(map, PropertyKeys.CLOCK,
DEFAULT_CLOCK, Clock.class);
@@ -335,6 +365,13 @@ static DegraderLoadBalancerStrategyConfig createHttpConfigFromMap(Map<String,Obj
healthCheckMethod = DEFAULT_QUARANTINE_METHOD;
}
Long lowEmittingInterval = MapUtil.getWithDefault(map, PropertyKeys.HTTP_LB_LOW_EVENT_EMITTING_INTERVAL,
DEFAULT_LOW_EVENT_EMITTING_INTERVAL, Long.class);
Long highEmittingInterval = MapUtil.getWithDefault(map, PropertyKeys.HTTP_LB_HIGH_EVENT_EMITTING_INTERVAL,
DEFAULT_HIGH_EVENT_EMITTING_INTERVAL, Long.class);
final String clusterName = MapUtil.getWithDefault(map, PropertyKeys.CLUSTER_NAME, DEFAULT_CLUSTER_NAME, String.class);
return new DegraderLoadBalancerStrategyConfig(
updateIntervalMs, updateOnlyAtInterval, pointsPerWeight, hashMethod, hashConfig,
clock, initialRecoveryLevel, ringRampFactor, highWaterMark, lowWaterMark,
@@ -343,7 +380,8 @@ static DegraderLoadBalancerStrategyConfig createHttpConfigFromMap(Map<String,Obj
consistentHashAlgorithm, numProbes,
servicePath, quarantineMaxPercent,
overrideExecutorService != null ? overrideExecutorService : executorService,
healthCheckOperations, healthCheckMethod, healthCheckPath, quarantineLatency);
healthCheckOperations, healthCheckMethod, healthCheckPath, quarantineLatency,
emitter, lowEmittingInterval, highEmittingInterval, clusterName);
}
/**
@@ -472,6 +510,26 @@ public long getQuarantineLatency()
return _quarantineLatency;
}
public EventEmitter getEventEmitter()
{
return _eventEmitter;
}
public long getLowEventEmittingInterval()
{
return _lowEventEmittingInterval;
}
public long getHighEventEmittingInterval()
{
return _highEventEmittingInterval;
}
public String getClusterName()
{
return _clusterName;
}
@Override
public String toString()
{
Oops, something went wrong.

0 comments on commit 05922f9

Please sign in to comment.