Permalink
Browse files

Support BannedUris property for cluster

'BannedUris' property is used to ban specific hosts for the given cluster. D2client will not pick the hosts given in the list. This property was moved to serviceProperties. We now add it back to cluster so user can ban the hosts at cluster level.

RB=1317765
G=si-core-reviewers
R=crzhang,ssheng,dhoa,fcapponi
A=crzhang,fcapponi
  • Loading branch information...
ChaoLinkedIn committed May 16, 2018
1 parent 93c981f commit 0e042fb18c1d1ca31c044f8f258417a98e776ea5
View
@@ -12,6 +12,10 @@ Retry markUp/Down in event of KeeperException.SessionExpiredException
(RB=1360221)
Fix warnings in ZooKeeperAnnouncerTest resulting in failed compile.
(RB=1317765)
Support BannedUris property for cluster
23.0.2
------
(RB=1351456)
@@ -28,6 +28,14 @@
"default": [],
"doc": "The URIs of machines that belong to this cluster"
},
{
"name": "bannedUris",
"type": {
"type": "array",
"items": "string"
},
"doc": "banned Uris for this cluster"
},
{
"name": "partitionConfiguration",
"type": "D2ClusterPartitionConfiguration",
@@ -30,9 +30,7 @@
private final PartitionProperties _partitionProperties;
private final List<String> _sslSessionValidationStrings;
//deprecated because we are moving these properties down to ServiceProperties
@Deprecated
private final Set<URI> _banned;
private final Set<URI> _bannedUris;
@Deprecated
private final List<String> _prioritizedSchemes;
@@ -50,30 +48,30 @@ public ClusterProperties(String clusterName,
List<String> prioritizedSchemes,
Map<String, String> properties)
{
this(clusterName, prioritizedSchemes, properties, new HashSet<URI>());
this(clusterName, prioritizedSchemes, properties, new HashSet<>());
}
public ClusterProperties(String clusterName,
List<String> prioritizedSchemes,
Map<String, String> properties,
Set<URI> banned)
Set<URI> bannedUris)
{
this(clusterName, prioritizedSchemes, properties, banned, NullPartitionProperties.getInstance());
this(clusterName, prioritizedSchemes, properties, bannedUris, NullPartitionProperties.getInstance());
}
public ClusterProperties(String clusterName,
List<String> prioritizedSchemes,
Map<String, String> properties,
Set<URI> banned,
Set<URI> bannedUris,
PartitionProperties partitionProperties)
{
this(clusterName, prioritizedSchemes, properties, banned, partitionProperties, Collections.emptyList());
this(clusterName, prioritizedSchemes, properties, bannedUris, partitionProperties, Collections.emptyList());
}
public ClusterProperties(String clusterName,
List<String> prioritizedSchemes,
Map<String, String> properties,
Set<URI> banned,
Set<URI> bannedUris,
PartitionProperties partitionProperties,
List<String> sslSessionValidationStrings)
@@ -83,20 +81,20 @@ public ClusterProperties(String clusterName,
(prioritizedSchemes != null) ? Collections.unmodifiableList(prioritizedSchemes)
: Collections.<String>emptyList();
_properties = (properties == null) ? Collections.<String,String>emptyMap() : Collections.unmodifiableMap(properties);
_banned = Collections.unmodifiableSet(banned);
_bannedUris = bannedUris != null ? Collections.unmodifiableSet(bannedUris) : Collections.emptySet();
_partitionProperties = partitionProperties;
_sslSessionValidationStrings = sslSessionValidationStrings == null ? Collections.emptyList() : Collections.unmodifiableList(
sslSessionValidationStrings);
}
public boolean isBanned(URI uri)
{
return _banned.contains(uri);
return _bannedUris.contains(uri);
}
public Set<URI> getBanned()
public Set<URI> getBannedUris()
{
return _banned;
return _bannedUris;
}
public String getClusterName()
@@ -128,7 +126,7 @@ public PartitionProperties getPartitionProperties()
public String toString()
{
return "ClusterProperties [_clusterName=" + _clusterName + ", _prioritizedSchemes="
+ _prioritizedSchemes + ", _properties=" + _properties + ", _banned=" + _banned
+ _prioritizedSchemes + ", _properties=" + _properties + ", _bannedUris=" + _bannedUris
+ ", _partitionProperties=" + _partitionProperties + ", _sslSessionValidationStrings=" + _sslSessionValidationStrings
+ "]";
}
@@ -138,7 +136,7 @@ public int hashCode()
{
final int prime = 31;
int result = 1;
result = prime * result + ((_banned == null) ? 0 : _banned.hashCode());
result = prime * result + ((_bannedUris == null) ? 0 : _bannedUris.hashCode());
result = prime * result + ((_clusterName == null) ? 0 : _clusterName.hashCode());
result =
prime * result
@@ -153,20 +151,34 @@ public int hashCode()
public boolean equals(Object obj)
{
if (this == obj)
{
return true;
}
if (obj == null)
{
return false;
}
if (getClass() != obj.getClass())
{
return false;
}
ClusterProperties other = (ClusterProperties) obj;
if (!_banned.equals(other._banned))
if (!_bannedUris.equals(other._bannedUris))
{
return false;
}
if (!_clusterName.equals(other._clusterName))
{
return false;
}
if (!_prioritizedSchemes.equals(other._prioritizedSchemes))
{
return false;
}
if (!_properties.equals(other._properties))
{
return false;
}
if (!_partitionProperties.equals(other._partitionProperties))
{
return false;
@@ -22,6 +22,7 @@
import com.linkedin.d2.discovery.PropertyBuilder;
import com.linkedin.d2.discovery.PropertySerializationException;
import com.linkedin.d2.discovery.PropertySerializer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,12 +82,9 @@ public ClusterProperties fromBytes(byte[] bytes) throws PropertySerializationExc
@Override
public ClusterProperties fromMap(Map<String, Object> map)
{
List<URI> bannedList = mapGet(map, PropertyKeys.BANNED_URIS);
if (bannedList == null)
{
bannedList = Collections.emptyList();
}
Set<URI> banned = new HashSet<URI>(bannedList);
List<String> bannedList = mapGet(map, PropertyKeys.BANNED_URIS);
Set<URI> banned = (bannedList == null) ? Collections.emptySet()
: bannedList.stream().map(URI::create).collect(Collectors.toSet());
String clusterName = PropertyUtil.checkAndGetValue(map, PropertyKeys.CLUSTER_NAME, String.class, "ClusterProperties");
List<String> prioritizedSchemes = mapGet(map, PropertyKeys.PRIORITIZED_SCHEMES);
@@ -62,7 +62,7 @@
public static final String SERVICES = "services";
public static final String TRANSPORT_CLIENT_PROPERTIES = "transportClientProperties";
public static final String PRIORITIZED_SCHEMES = "prioritizedSchemes";
public static final String BANNED_URIS = "bannedUri";
public static final String BANNED_URIS = "bannedUris";
public static final String DEFAULT_ROUTING = "defaultRouting";
public static final String ALLOWED_CLIENT_OVERRIDE_KEYS = "allowedClientOverrideKeys";
public static final String SERVICE_METADATA_PROPERTIES = "serviceMetadataProperties";
@@ -59,6 +59,7 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -265,7 +266,7 @@ public void getClient(Request request, RequestContext requestContext, Callback<T
Ring<URI> ring = null;
for (LoadBalancerState.SchemeStrategyPair pair : orderedStrategies)
{
List<TrackerClient> clients = getPotentialClients(serviceName, service, uris, pair.getScheme(), partitionId);
List<TrackerClient> clients = getPotentialClients(serviceName, service, cluster, uris, pair.getScheme(), partitionId);
ring = pair.getStrategy().getRing(uriItem.getVersion(), partitionId, clients);
if (!ring.isEmpty())
@@ -316,7 +317,8 @@ public TransportClientFactory getClientFactory(String scheme)
{
for (LoadBalancerState.SchemeStrategyPair pair : orderedStrategies)
{
List<TrackerClient> trackerClients = getPotentialClients(serviceName, service, uris, pair.getScheme(), partitionId);
List<TrackerClient> trackerClients = getPotentialClients(serviceName, service, cluster, uris,
pair.getScheme(), partitionId);
Ring<URI> ring = pair.getStrategy().getRing(uriItem.getVersion(), partitionId, trackerClients);
// ring will never be null; it can be empty
ringMap.put(partitionId, ring);
@@ -529,7 +531,8 @@ private ClusterProperties getClusterProperties(String serviceName,
{
for (LoadBalancerState.SchemeStrategyPair pair : orderedStrategies)
{
List<TrackerClient> trackerClients = getPotentialClients(serviceName, service, uris, pair.getScheme(), partitionId);
List<TrackerClient> trackerClients = getPotentialClients(serviceName, service, cluster, uris,
pair.getScheme(), partitionId);
int size = trackerClients.size() <= limitHostPerPartition ? trackerClients.size() : limitHostPerPartition;
List<URI> rankedUri = new ArrayList<>(size);
@@ -689,13 +692,14 @@ public void getLoadBalancedServiceProperties(String serviceName, boolean waitFor
// supports partitioning
private List<TrackerClient> getPotentialClients(String serviceName,
ServiceProperties serviceProperties,
ClusterProperties clusterProperties,
UriProperties uris,
String scheme,
int partitionId)
{
Set<URI> possibleUris = uris.getUriBySchemeAndPartition(scheme, partitionId);
List<TrackerClient> clientsToBalance = getPotentialClients(serviceName, serviceProperties, possibleUris);
List<TrackerClient> clientsToBalance = getPotentialClients(serviceName, serviceProperties, clusterProperties, possibleUris);
if (clientsToBalance.isEmpty())
{
info(_log, "Can not find a host for service: ", serviceName, ", scheme: ", scheme, ", partition: ", partitionId);
@@ -705,6 +709,7 @@ public void getLoadBalancedServiceProperties(String serviceName, boolean waitFor
private List<TrackerClient> getPotentialClients(String serviceName,
ServiceProperties serviceProperties,
ClusterProperties clusterProperties,
Set<URI> possibleUris)
{
List<TrackerClient> clientsToLoadBalance = new ArrayList<TrackerClient>();
@@ -714,7 +719,7 @@ public void getLoadBalancedServiceProperties(String serviceName, boolean waitFor
for (URI possibleUri : possibleUris)
{
// don't pay attention to this uri if it's banned
if (!serviceProperties.isBanned(possibleUri))
if (!serviceProperties.isBanned(possibleUri) && !clusterProperties.isBanned(possibleUri))
{
TrackerClient possibleTrackerClient = _state.getClient(serviceName, possibleUri);
@@ -807,8 +812,7 @@ private TrackerClient chooseTrackerClient(Request request, RequestContext reques
String scheme = pair.getScheme();
clientsToLoadBalance = getPotentialClients(serviceName, serviceProperties, uris, scheme,
partitionId);
clientsToLoadBalance = getPotentialClients(serviceName, serviceProperties, cluster, uris, scheme, partitionId);
trackerClient =
strategy.getTrackerClient(request, requestContext, uriItem.getVersion(), partitionId, clientsToLoadBalance);
@@ -235,6 +235,8 @@ public int configure() throws Exception
final String masterColo = (String)clusterConfig.remove(PropertyKeys.MASTER_COLO);
final String enableSymlinkString = (String)clusterConfig.remove(PropertyKeys.ENABLE_SYMLINK);
final boolean enableSymlink;
@SuppressWarnings("unchecked")
final List<String> bannedUris = (List<String>) clusterConfig.remove(PropertyKeys.BANNED_URIS);
regularClusterToServicesMapping.put(clusterName, servicesConfigs.keySet().stream().collect(Collectors.toList()));
@@ -275,6 +277,10 @@ public int configure() throws Exception
clusterProperties.put(PropertyKeys.CLUSTER_VARIANTS, String.join(LIST_SEPARATOR, clusterVariantConfig.keySet()));
}
clusterConfig.put(PropertyKeys.CLUSTER_PROPERTIES, clusterProperties);
if (bannedUris != null)
{
clusterConfig.put(PropertyKeys.BANNED_URIS, bannedUris);
}
// rather than handling the coloVariant case separately from the regular cluster case, we will
// treat regular clusters as having an empty-string coloVariant list. This allows us to have a
@@ -18,6 +18,8 @@
import com.linkedin.d2.discovery.PropertySerializationException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import org.testng.annotations.Test;
import java.net.URI;
@@ -43,6 +45,10 @@ public void testClusterPropertiesSerializer() throws PropertySerializationExcept
ClusterPropertiesJsonSerializer foo = new ClusterPropertiesJsonSerializer();
List<String> schemes = new ArrayList<String>();
Map<String, String> supProperties = new HashMap<String, String>();
Set<URI> bannedSet = new HashSet<>();
bannedSet.add(URI.create("https://test1.linkedin.com:12345/test"));
bannedSet.add(URI.create("https://test2.linkedin.com:56789/test"));
ClusterProperties property = new ClusterProperties("test");
assertEquals(foo.fromBytes(foo.toBytes(property)), property);
@@ -59,11 +65,11 @@ public void testClusterPropertiesSerializer() throws PropertySerializationExcept
RangeBasedPartitionProperties rbp = new RangeBasedPartitionProperties("blah", 0, 5000000, 100);
property = new ClusterProperties("test", schemes, supProperties, new HashSet<URI>(), rbp);
property = new ClusterProperties("test", schemes, supProperties, bannedSet, rbp);
assertEquals(foo.fromBytes(foo.toBytes(property)), property);
HashBasedPartitionProperties hbp = new HashBasedPartitionProperties("blah", 150, HashBasedPartitionProperties.HashAlgorithm.valueOf("md5".toUpperCase()));
property = new ClusterProperties("test", schemes, supProperties, new HashSet<URI>(), hbp);
property = new ClusterProperties("test", schemes, supProperties, bannedSet, hbp);
assertEquals(foo.fromBytes(foo.toBytes(property)), property);
property = new ClusterProperties("test", schemes, supProperties, new HashSet<URI>(), NullPartitionProperties.getInstance(),
Oops, something went wrong.

0 comments on commit 0e042fb

Please sign in to comment.