Permalink
Browse files

[pegasus] Refactor DegraderLoadBalancerStrategyV3 class

RB=1161557
G=si-core-reviewers
R=xzhu,dhoa,cxu,fcapponi
A=dhoa,cxu
  • Loading branch information...
ssheng committed Nov 17, 2017
1 parent 0f931a1 commit 52436263311726ca85d1224f723edcc9d0ec1174
Showing with 957 additions and 4,038 deletions.
  1. +2 −1 CHANGELOG
  2. +176 −0 d2/src/main/java/com/linkedin/d2/balancer/strategies/degrader/D2MonitorEventEmitter.java
  3. +192 −0 d2/src/main/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerState.java
  4. +0 −47 ...main/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStrategyFactoryV2.java
  5. +0 −47 ...in/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStrategyFactoryV2_1.java
  6. +0 −1,276 d2/src/main/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStrategyV2.java
  7. +0 −1,139 d2/src/main/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStrategyV2_1.java
  8. +16 −616 d2/src/main/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStrategyV3.java
  9. +73 −0 d2/src/main/java/com/linkedin/d2/balancer/strategies/degrader/Partition.java
  10. +269 −0 ...rc/main/java/com/linkedin/d2/balancer/strategies/degrader/PartitionDegraderLoadBalancerState.java
  11. +22 −0 ...java/com/linkedin/d2/balancer/strategies/degrader/PartitionDegraderLoadBalancerStateListener.java
  12. +6 −8 d2/src/main/java/com/linkedin/d2/balancer/util/LoadBalancerClientCli.java
  13. +2 −1 d2/src/main/java/com/linkedin/d2/balancer/util/hashing/ConsistentHashRing.java
  14. +0 −104 d2/src/main/java/com/linkedin/d2/jmx/DegraderLoadBalancerStrategyV2Jmx.java
  15. +0 −70 d2/src/main/java/com/linkedin/d2/jmx/DegraderLoadBalancerStrategyV2JmxMBean.java
  16. +0 −67 d2/src/main/java/com/linkedin/d2/jmx/DegraderLoadBalancerStrategyV2_1Jmx.java
  17. +0 −38 d2/src/main/java/com/linkedin/d2/jmx/DegraderLoadBalancerStrategyV2_1JmxMBean.java
  18. +1 −28 d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java
  19. +1 −3 d2/src/test/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStateTest.java
  20. +197 −593 d2/src/test/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerTest.java
View
@@ -1,6 +1,7 @@
16.0.6
------
(RB=1161557)
Refactor DegraderLoadBalancerStrategyV3 class
16.0.5
------
@@ -0,0 +1,176 @@
/*
Copyright (c) 2017 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.linkedin.d2.balancer.strategies.degrader;
import com.linkedin.d2.balancer.clients.TrackerClient;
import com.linkedin.d2.balancer.event.D2MonitorBuilder;
import com.linkedin.d2.balancer.event.EventEmitter;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
/**
* D2MonitorEventEmitter is responsible for building up the appropriate D2Monitor event, and sends it out through
* EventEmitter interface. To keep the total data volume under control, the following measures are taken:
*
* 1. Choose the right hosts/URI: D2Monitor contains all unhealthy clients and limited number of healthy clients for
* reference.
*
* 2. Use different intervals to emit data: when all hosts are in health state, highEventEmittingInterval is used. If
* there're hosts in unhealthy state, we use lowEventEmittingInterval.
*
* This class keeps track of last emitting timeStamp therefore stateful.
*/
class D2MonitorEventEmitter implements PartitionDegraderLoadBalancerStateListener
{
private final DegraderLoadBalancerStrategyConfig _config;
private final int _partitionId;
private final String _serviceName;
private long _lastEmittingTimeStamp;
public static final int MAX_HEALTHY_HOSTS_TO_EMIT = 2;
D2MonitorEventEmitter(String serviceName, final DegraderLoadBalancerStrategyConfig config, int partitionId)
{
_config = config;
_partitionId = partitionId;
_lastEmittingTimeStamp = config.getClock().currentTimeMillis();
_serviceName = serviceName;
}
@Override
public void onUpdate(PartitionDegraderLoadBalancerState state)
{
D2MonitorBuilder builder = new D2MonitorBuilder(_serviceName, _config.getClusterName(), _partitionId);
D2MonitorBuilder.D2MonitorClusterStatsBuilder clusterStatsBuilder = builder.getClusterStatsBuilder();
Set<TrackerClient> trackerClients = state.getTrackerClients();
// 1. Set cluster metrics
clusterStatsBuilder.setClusterNumHosts(trackerClients.size())
.setClusterCurrentCallCount(state.getCurrentClusterCallCount())
.setClusterCurrentAverageLatencyMs(state.getCurrentAvgClusterLatency())
.setClusterCurrentDroppedCalls(state.getCurrentClusterDropCount())
.setClusterCurrentErrorCount(state.getCurrentClusterErrorCount())
.setClusterDropLevel(state.getCurrentOverrideDropRate());
// 2. build up D2MonitorEvent with appropriate uris from the trackerClients
boolean isHealthy = createD2MonitorEvent(trackerClients, builder, state);
long clk = _config.getClock().currentTimeMillis();
long intervalMs = clk - _lastEmittingTimeStamp;
if (allowedToEmit(isHealthy, intervalMs))
{
// 3. emit the event
EventEmitter emitter = _config.getEventEmitter();
emitter.emitEvent(builder.build(intervalMs));
// 4. update the timeStamp
_lastEmittingTimeStamp = clk;
}
}
// To emit D2Monitor events, the following conditions need to meet:
// 1. Either lowEventEmittingInterval or highEventEmittingInterval is greater than 0.
// 2. The interval since last emitting meets one of the following requirements:
// 2.1. The interval is greater than lowEventEmittingInterval and current state is not healthy
// 2.2. The interval is greater than highEventEmittingInterval.
private boolean allowedToEmit(boolean isHealthy, long intervalMs)
{
return (((_config.getLowEventEmittingInterval() > 0)
&& (intervalMs >= _config.getLowEventEmittingInterval()) && !isHealthy)
|| ((_config.getHighEventEmittingInterval() > 0) && (intervalMs >= _config.getHighEventEmittingInterval())));
}
private boolean isClientHealthy(TrackerClient trackerClient, final Map<URI, Integer> pointsMap)
{
int perfectHealth = (int) (trackerClient.getPartitionWeight(_partitionId) * _config.getPointsPerWeight());
return pointsMap.get(trackerClient.getUri()) >= perfectHealth;
}
/**
* Create D2MonitorEvent
* @param trackerClients
* @param d2MonitorBuilder
* @param state
* @return true if all trackerClients are healthy. False otherwise
*/
private boolean createD2MonitorEvent(Set<TrackerClient> trackerClients, D2MonitorBuilder d2MonitorBuilder,
PartitionDegraderLoadBalancerState state)
{
List<TrackerClient> healthyClients = new ArrayList<>();
boolean isHealthy = true;
for (TrackerClient client : trackerClients)
{
if (isClientHealthy(client, state.getPointsMap()))
{
healthyClients.add(client);
}
else
{
// for unhealthy clients, always add them into the D2Monitor
d2MonitorBuilder.addUriInfoBuilder(client.getUri(), createUriInfoBuilder(client, state));
isHealthy = false;
}
}
// Randomly pick some healthy clients for reference
if (!healthyClients.isEmpty())
{
addRandomClientsToUriInfo(healthyClients, MAX_HEALTHY_HOSTS_TO_EMIT, d2MonitorBuilder, state);
}
return isHealthy;
}
private void addRandomClientsToUriInfo(List<TrackerClient> healthyClients, int num, final D2MonitorBuilder builder,
PartitionDegraderLoadBalancerState state)
{
// Randomly pick num of entries and add their UriInfo.
// The operation is equivalent to shuffle + limit, but we do not have to shuffle the whole list since
// the number of entries to add is generally much less than the size of health clients.
Random random = new Random();
for (int i = 0; i < Math.min(num, healthyClients.size()); ++i)
{
Collections.swap(healthyClients, i, random.nextInt(healthyClients.size() - i) + i);
TrackerClient nextClient = healthyClients.get(i);
builder.addUriInfoBuilder(nextClient.getUri(), createUriInfoBuilder(nextClient, state));
}
}
// Create UriInfoBuilder from corresponding TrackerClient
private D2MonitorBuilder.D2MonitorUriInfoBuilder createUriInfoBuilder(TrackerClient client,
PartitionDegraderLoadBalancerState state)
{
D2MonitorBuilder.D2MonitorUriInfoBuilder uriInfoBuilder =
new D2MonitorBuilder.D2MonitorUriInfoBuilder(client.getUri());
uriInfoBuilder.copyStats(client.getDegraderControl(_partitionId));
uriInfoBuilder.setTransmissionPoints(state.getPointsMap().get(client.getUri()));
DegraderLoadBalancerQuarantine quarantine = state.getQuarantineMap().get(client);
if (quarantine != null)
{
uriInfoBuilder.setQuarantineDuration(quarantine.getTimeTilNextCheck());
}
return uriInfoBuilder;
}
}
@@ -0,0 +1,192 @@
/*
Copyright (c) 2017 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.linkedin.d2.balancer.strategies.degrader;
import com.linkedin.d2.balancer.clients.TrackerClient;
import com.linkedin.d2.balancer.util.hashing.Ring;
import com.linkedin.d2.balancer.util.healthcheck.HealthCheck;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* A collection of Partition objects, one for each partition, lazily initialized.
*/
public class DegraderLoadBalancerState
{
private final ConcurrentMap<Integer, Partition> _partitions;
private final String _serviceName;
private final Map<String, String> _degraderProperties;
private final DegraderLoadBalancerStrategyConfig _config;
private final AtomicBoolean _quarantineEnabled;
private final AtomicInteger _quarantineRetries;
// _healthCheckMap keeps track of HealthCheck clients associated with TrackerClientUpdater
// It should only be accessed under the update lock.
// Note: after quarantine is enabled, there is no need to send health checking requests to all
// trackerClients anymore and we do not have to hold the healthCheck objects in healthCheckMap.
// When individual trackerClient is quarantined, the corresponding healthCheck will be
// generated again.
private final ConcurrentMap<TrackerClientUpdater, HealthCheck> _healthCheckMap;
DegraderLoadBalancerState(String serviceName, Map<String, String> degraderProperties,
DegraderLoadBalancerStrategyConfig config)
{
_degraderProperties = degraderProperties != null ? degraderProperties : Collections.<String, String>emptyMap();
_partitions = new ConcurrentHashMap<Integer, Partition>();
_serviceName = serviceName;
_config = config;
_quarantineEnabled = new AtomicBoolean(false);
_quarantineRetries = new AtomicInteger(0);
_healthCheckMap = new ConcurrentHashMap<>();
}
public Partition getPartition(int partitionId)
{
Partition partition = _partitions.get(partitionId);
if (partition == null)
{
// this is mainly executed in bootstrap time
// after the system is stabilized, i.e. after all partitionIds have been seen,
// there will be no need to initialize the map
// Note that we do this trick because partition count is not available in
// service configuration (it's in cluster configuration) and we do not want to
// intermingle the two configurations
Partition newValue = new Partition(partitionId,
new ReentrantLock(),
new PartitionDegraderLoadBalancerState
(-1, _config.getClock().currentTimeMillis(), false,
new DegraderRingFactory<>(_config),
new HashMap<URI, Integer>(),
PartitionDegraderLoadBalancerState.Strategy.
LOAD_BALANCE,
0, 0,
new HashMap<TrackerClient, Double>(),
_serviceName, _degraderProperties,
0, 0, 0,
new HashMap<>(), new HashMap<>(),
null),
Collections.singleton(new D2MonitorEventEmitter(_serviceName, _config, partitionId)));
Partition oldValue = _partitions.putIfAbsent(partitionId, newValue);
if (oldValue == null)
partition = newValue;
else // another thread already initialized this partition
partition = oldValue; // newValue is discarded
}
return partition;
}
Ring<URI> getRing(int partitionId)
{
if (_partitions.get(partitionId) != null)
{
PartitionDegraderLoadBalancerState state = _partitions.get(partitionId).getState();
return state.getRing();
}
else
{
return null;
}
}
// this method never returns null
public PartitionDegraderLoadBalancerState getPartitionState(int partitionId)
{
return getPartition(partitionId).getState();
}
void setPartitionState(int partitionId, PartitionDegraderLoadBalancerState newState)
{
getPartition(partitionId).setState(newState);
}
void putHealthCheckClient(TrackerClientUpdater updater, HealthCheck client)
{
_healthCheckMap.put(updater, client);
}
Map<TrackerClientUpdater, HealthCheck> getHealthCheckMap()
{
return _healthCheckMap;
}
String getServiceName()
{
return _serviceName;
}
boolean isQuarantineEnabled()
{
return _quarantineEnabled.get();
}
/**
* Attempts to enables quarantine. Quarantine is enabled only if quarantine is not already enabled. Otherwise,
* no side-effect is taken place.
*
* @return {@code true} if quarantine is not already enabled and is enabled as the result of this call;
* {@code false} otherwise.
*/
boolean tryEnableQuarantine()
{
return _quarantineEnabled.compareAndSet(false, true);
}
int incrementAndGetQuarantineRetries()
{
return _quarantineRetries.incrementAndGet();
}
public void shutdown(DegraderLoadBalancerStrategyConfig config)
{
// Need to shutdown quarantine and release the related transport client
if (config.getQuarantineMaxPercent() <= 0.0 || !_quarantineEnabled.get())
{
return;
}
for (Partition par : _partitions.values())
{
Lock lock = par.getLock();
lock.lock();
try
{
PartitionDegraderLoadBalancerState curState = par.getState();
curState.getQuarantineMap().values().forEach(DegraderLoadBalancerQuarantine::shutdown);
}
finally
{
lock.unlock();
}
}
}
@Override
public String toString()
{
return "PartitionStates: [" + _partitions + "]";
}
}
Oops, something went wrong.

0 comments on commit 5243626

Please sign in to comment.