Skip to content

Commit

Permalink
D2 custom partition is a new d2 partition type where the cluster prov…
Browse files Browse the repository at this point in the history
…ides its own function to decide the partitionId for the given URI.The purpose for this is to provide a flexilbe way for the clusters to customize the partition mapping method to meet their own requirements. To use this partition type, the cluster needs to:

. Implement BasePartitionAccessor interface with the customized logic.
. Register the PartitionAccessor through PartitionAccessorRegistry at the d2client side.
. Update the cluster property in d2.src to enable the customized PartitionAccessor.

The full document can be found https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Customized+D2+Partitioning.

The pdsc change is backward incompatible so requires synchronized update on the client side.

RB=1087090
G=si-core-reviewers
R=ssheng,xzhu,dhoa
A=ssheng,dhoa
  • Loading branch information
cx-super committed Sep 28, 2017
1 parent bf0d8a7 commit 04136c1
Show file tree
Hide file tree
Showing 24 changed files with 933 additions and 74 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG
@@ -1,3 +1,13 @@
15.0.6
-------


15.0.5
-------
(RB=1087090)
D2 customized partition implementation


15.0.4 15.0.4
------- -------
(RB=1109051) (RB=1109051)
Expand Down
Expand Up @@ -9,10 +9,13 @@
"type": { "type": {
"type" : "enum", "type" : "enum",
"name" : "PartitionTypeEnum", "name" : "PartitionTypeEnum",
"symbols" : ["HASH", "RANGE", "NONE"] "symbols" : ["HASH", "RANGE", "CUSTOM", "NONE"]
}, },
"doc": "The type of partitioning. We support HASH, RANGE, NONE.", "doc": "The type of partitioning. We support HASH, RANGE, NONE.",
"symbolDocs": {"RANGE":"partitioning based on range e.g. Id 1-1000 goes to bucket A, Id 1001-2000 goes to bucket B, etc.", "HASH":"Partitioning based on hash.", "NONE":"No Partitioning."} "symbolDocs": {"RANGE":"partitioning based on range e.g. Id 1-1000 goes to bucket A, Id 1001-2000 goes to bucket B, etc.",
"HASH":"Partitioning based on hash.",
"CUSTOM":"Partitioning base on the customized function provided by the service",
"NONE":"No Partitioning."}
}, },
{ {
"name": "partitionKeyRegex", "name": "partitionKeyRegex",
Expand All @@ -28,11 +31,11 @@
}, },
{ {
"name": "partitionTypeSpecificData", "name": "partitionTypeSpecificData",
"doc": "If the partition type is RANGE, then we have rangedPartitionProperties. If it's type HASH, we should have a hashAlgorithm. Otherwise we won't have any extra data in this field", "doc": "If the partition type is RANGE, then we have RangedPartitionProperties. If it's type HASH, we should have a HashAlgorithm. If it is type CUSTOM, we have PartitionAccessorList. Otherwise we won't have any extra data in this field",
"type": [ "type": [
{ {
"type" : "record", "type" : "record",
"name" : "rangedPartitionProperties", "name" : "RangedPartitionProperties",
"fields": [ "fields": [
{ {
"name": "partitionSize", "name": "partitionSize",
Expand All @@ -47,11 +50,28 @@
] ]
}, },
{ {
"name": "hashAlgorithm", "name": "HashAlgorithm",
"type" : "enum", "type" : "enum",
"symbols" : ["MODULO", "MD5"], "symbols" : ["MODULO", "MD5"],
"doc": "The hashing algorithm used in HASH based partitioning. Supported algorithms are: MODULO or MD5. Not used for RANGE based partition.", "doc": "The hashing algorithm used in HASH based partitioning. Supported algorithms are: MODULO or MD5. Not used for RANGE based partition.",
"symbolDocs": {"MODULO":"Mod the key with partitionCount to get the partitionKey", "MD5":"Hash the key and mod it with partitionCount to get the partitionKey"} "symbolDocs": {"MODULO":"Mod the key with partitionCount to get the partitionKey", "MD5":"Hash the key and mod it with partitionCount to get the partitionKey"}
},
{
"name": "PartitionAccessorList",
"type": "record",
"doc": "The list of class names that implement BasePartitionAccessor. D2 goes through the list and uses the first one that is registered to PartitionAccessorRegistry. This list is used when the service needs to provide/deploy multiple versions of implementation.",
"fields":
[
{
"name": "classNames",
"type":
{
"type": "array",
"items": "string"
},
"doc":"Class names for the implemented BasePartitionAccessor"
}
]
} }
], ],
"optional": true "optional": true
Expand Down
22 changes: 14 additions & 8 deletions d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java
Expand Up @@ -26,6 +26,7 @@
import com.linkedin.d2.balancer.clients.RetryClient; import com.linkedin.d2.balancer.clients.RetryClient;
import com.linkedin.d2.balancer.event.EventEmitter; import com.linkedin.d2.balancer.event.EventEmitter;
import com.linkedin.d2.balancer.util.healthcheck.HealthCheckOperations; import com.linkedin.d2.balancer.util.healthcheck.HealthCheckOperations;
import com.linkedin.d2.balancer.util.partitions.PartitionAccessorRegistry;
import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl; import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl;
import com.linkedin.r2.message.RequestContext; import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.rest.RestRequest; import com.linkedin.r2.message.rest.RestRequest;
Expand All @@ -35,13 +36,11 @@
import com.linkedin.r2.transport.common.TransportClientFactory; import com.linkedin.r2.transport.common.TransportClientFactory;
import com.linkedin.r2.transport.http.client.HttpClientFactory; import com.linkedin.r2.transport.http.client.HttpClientFactory;
import com.linkedin.r2.util.NamedThreadFactory; import com.linkedin.r2.util.NamedThreadFactory;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;

import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import java.net.URI; import java.net.URI;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
Expand Down Expand Up @@ -94,14 +93,15 @@ public D2Client build()
_config.clientServicesConfig, _config.clientServicesConfig,
_config.d2ServicePath, _config.d2ServicePath,
_config.useNewEphemeralStoreWatcher, _config.useNewEphemeralStoreWatcher,
_config._healthCheckOperations, _config.healthCheckOperations,
_config._executorService, _config._executorService,
_config.retry, _config.retry,
_config.retryLimit, _config.retryLimit,
_config.warmUp, _config.warmUp,
_config.warmUpTimeoutSeconds, _config.warmUpTimeoutSeconds,
_config.warmUpConcurrentRequests, _config.warmUpConcurrentRequests,
_config._eventEmitter); _config.eventEmitter,
_config.partitionAccessorRegistry);


final LoadBalancerWithFacilities loadBalancer = loadBalancerFactory.create(cfg); final LoadBalancerWithFacilities loadBalancer = loadBalancerFactory.create(cfg);


Expand Down Expand Up @@ -231,7 +231,7 @@ public D2ClientBuilder setD2ServicePath(String d2ServicePath)


public D2ClientBuilder setHealthCheckOperations(HealthCheckOperations healthCheckOperations) public D2ClientBuilder setHealthCheckOperations(HealthCheckOperations healthCheckOperations)
{ {
_config._healthCheckOperations = healthCheckOperations; _config.healthCheckOperations = healthCheckOperations;
return this; return this;
} }


Expand Down Expand Up @@ -288,7 +288,7 @@ public D2ClientBuilder setRetryLimit(int retryLimit)


public D2ClientBuilder setEventEmitter(EventEmitter eventEmitter) public D2ClientBuilder setEventEmitter(EventEmitter eventEmitter)
{ {
_config._eventEmitter = eventEmitter; _config.eventEmitter = eventEmitter;
return this; return this;
} }


Expand Down Expand Up @@ -336,6 +336,12 @@ public D2ClientBuilder setWarmUpConcurrentRequests(int warmUpConcurrentRequests)
return this; return this;
} }


public D2ClientBuilder setPartitionAccessorRegistry(PartitionAccessorRegistry registry)
{
_config.partitionAccessorRegistry = registry;
return this;
}

private Map<String, TransportClientFactory> createDefaultTransportClientFactories() private Map<String, TransportClientFactory> createDefaultTransportClientFactories()
{ {
final Map<String, TransportClientFactory> clientFactories = new HashMap<String, TransportClientFactory>(); final Map<String, TransportClientFactory> clientFactories = new HashMap<String, TransportClientFactory>();
Expand Down
24 changes: 16 additions & 8 deletions d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java
Expand Up @@ -19,6 +19,7 @@
import com.linkedin.d2.balancer.event.EventEmitter; import com.linkedin.d2.balancer.event.EventEmitter;
import com.linkedin.d2.balancer.util.WarmUpLoadBalancer; import com.linkedin.d2.balancer.util.WarmUpLoadBalancer;
import com.linkedin.d2.balancer.util.healthcheck.HealthCheckOperations; import com.linkedin.d2.balancer.util.healthcheck.HealthCheckOperations;
import com.linkedin.d2.balancer.util.partitions.PartitionAccessorRegistry;
import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl; import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl;
import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl.ComponentFactory; import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl.ComponentFactory;
import com.linkedin.r2.transport.common.TransportClientFactory; import com.linkedin.r2.transport.common.TransportClientFactory;
Expand Down Expand Up @@ -51,7 +52,7 @@ public class D2ClientConfig
boolean isSymlinkAware = false; boolean isSymlinkAware = false;
Map<String, Map<String, Object>> clientServicesConfig = Collections.<String, Map<String, Object>>emptyMap(); Map<String, Map<String, Object>> clientServicesConfig = Collections.<String, Map<String, Object>>emptyMap();
boolean useNewEphemeralStoreWatcher = false; boolean useNewEphemeralStoreWatcher = false;
HealthCheckOperations _healthCheckOperations = null; HealthCheckOperations healthCheckOperations = null;
/** /**
* By default is a single threaded executor * By default is a single threaded executor
*/ */
Expand All @@ -66,7 +67,8 @@ public class D2ClientConfig
BackupRequestsStrategyStatsConsumer backupRequestsStrategyStatsConsumer = null; BackupRequestsStrategyStatsConsumer backupRequestsStrategyStatsConsumer = null;
long backupRequestsLatencyNotificationInterval = 1; long backupRequestsLatencyNotificationInterval = 1;
TimeUnit backupRequestsLatencyNotificationIntervalUnit = TimeUnit.MINUTES; TimeUnit backupRequestsLatencyNotificationIntervalUnit = TimeUnit.MINUTES;
EventEmitter _eventEmitter = null; EventEmitter eventEmitter = null;
PartitionAccessorRegistry partitionAccessorRegistry = null;


private static final int DEAULT_RETRY_LIMIT = 3; private static final int DEAULT_RETRY_LIMIT = 3;


Expand Down Expand Up @@ -306,7 +308,9 @@ public D2ClientConfig(String zkHosts,
false, false,
0, 0,
0, 0,
null); null,
null
);
} }


public D2ClientConfig(String zkHosts, public D2ClientConfig(String zkHosts,
Expand Down Expand Up @@ -335,7 +339,8 @@ public D2ClientConfig(String zkHosts,
boolean warmUp, boolean warmUp,
int warmUpTimeoutSeconds, int warmUpTimeoutSeconds,
int warmUpConcurrentRequests, int warmUpConcurrentRequests,
EventEmitter emitter) EventEmitter emitter,
PartitionAccessorRegistry partitionAccessorRegistry)
{ {
this(zkHosts, this(zkHosts,
zkSessionTimeoutInMs, zkSessionTimeoutInMs,
Expand Down Expand Up @@ -368,7 +373,8 @@ public D2ClientConfig(String zkHosts,
1, 1,
TimeUnit.MINUTES, TimeUnit.MINUTES,
null, null,
emitter); emitter,
partitionAccessorRegistry);
} }


public D2ClientConfig(String zkHosts, public D2ClientConfig(String zkHosts,
Expand Down Expand Up @@ -402,7 +408,8 @@ public D2ClientConfig(String zkHosts,
long backupRequestsLatencyNotificationInterval, long backupRequestsLatencyNotificationInterval,
TimeUnit backupRequestsLatencyNotificationIntervalUnit, TimeUnit backupRequestsLatencyNotificationIntervalUnit,
ScheduledExecutorService backupRequestsExecutorService, ScheduledExecutorService backupRequestsExecutorService,
EventEmitter emitter) EventEmitter emitter,
PartitionAccessorRegistry partitionAccessorRegistry)
{ {
this.zkHosts = zkHosts; this.zkHosts = zkHosts;
this.zkSessionTimeoutInMs = zkSessionTimeoutInMs; this.zkSessionTimeoutInMs = zkSessionTimeoutInMs;
Expand All @@ -423,7 +430,7 @@ public D2ClientConfig(String zkHosts,
this.clientServicesConfig = clientServicesConfig; this.clientServicesConfig = clientServicesConfig;
this.d2ServicePath = d2ServicePath; this.d2ServicePath = d2ServicePath;
this.useNewEphemeralStoreWatcher = useNewEphemeralStoreWatcher; this.useNewEphemeralStoreWatcher = useNewEphemeralStoreWatcher;
this._healthCheckOperations = healthCheckOperations; this.healthCheckOperations = healthCheckOperations;
this._executorService = executorService; this._executorService = executorService;
this.retry = retry; this.retry = retry;
this.retryLimit = retryLimit; this.retryLimit = retryLimit;
Expand All @@ -435,6 +442,7 @@ public D2ClientConfig(String zkHosts,
this.backupRequestsLatencyNotificationInterval = backupRequestsLatencyNotificationInterval; this.backupRequestsLatencyNotificationInterval = backupRequestsLatencyNotificationInterval;
this.backupRequestsLatencyNotificationIntervalUnit = backupRequestsLatencyNotificationIntervalUnit; this.backupRequestsLatencyNotificationIntervalUnit = backupRequestsLatencyNotificationIntervalUnit;
this._backupRequestsExecutorService = backupRequestsExecutorService; this._backupRequestsExecutorService = backupRequestsExecutorService;
this._eventEmitter = emitter; this.eventEmitter = emitter;
this.partitionAccessorRegistry = partitionAccessorRegistry;
} }
} }
Expand Up @@ -88,7 +88,7 @@ private ZKFSLoadBalancer.TogglingLoadBalancerFactory createLoadBalancerFactory(D
} }


final Map<String, LoadBalancerStrategyFactory<? extends LoadBalancerStrategy>> loadBalancerStrategyFactories = final Map<String, LoadBalancerStrategyFactory<? extends LoadBalancerStrategy>> loadBalancerStrategyFactories =
createDefaultLoadBalancerStrategyFactories(config._healthCheckOperations, config._executorService, config._eventEmitter); createDefaultLoadBalancerStrategyFactories(config.healthCheckOperations, config._executorService, config.eventEmitter);


return new ZKFSTogglingLoadBalancerFactoryImpl(loadBalancerComponentFactory, return new ZKFSTogglingLoadBalancerFactoryImpl(loadBalancerComponentFactory,
config.lbWaitTimeout, config.lbWaitTimeout,
Expand All @@ -102,8 +102,8 @@ private ZKFSLoadBalancer.TogglingLoadBalancerFactory createLoadBalancerFactory(D
config.sslParameters, config.sslParameters,
config.isSSLEnabled, config.isSSLEnabled,
config.clientServicesConfig, config.clientServicesConfig,
config.useNewEphemeralStoreWatcher config.useNewEphemeralStoreWatcher,
); config.partitionAccessorRegistry);
} }


private Map<String, LoadBalancerStrategyFactory<? extends LoadBalancerStrategy>> createDefaultLoadBalancerStrategyFactories( private Map<String, LoadBalancerStrategyFactory<? extends LoadBalancerStrategy>> createDefaultLoadBalancerStrategyFactories(
Expand Down
Expand Up @@ -17,13 +17,16 @@
package com.linkedin.d2.balancer.config; package com.linkedin.d2.balancer.config;


import com.linkedin.d2.D2ClusterPartitionConfiguration; import com.linkedin.d2.D2ClusterPartitionConfiguration;
import com.linkedin.d2.HashAlgorithm;
import com.linkedin.d2.PartitionAccessorList;
import com.linkedin.d2.PartitionTypeEnum; import com.linkedin.d2.PartitionTypeEnum;
import com.linkedin.d2.balancer.properties.CustomizedPartitionProperties;
import com.linkedin.d2.balancer.properties.HashBasedPartitionProperties; import com.linkedin.d2.balancer.properties.HashBasedPartitionProperties;
import com.linkedin.d2.balancer.properties.NullPartitionProperties; import com.linkedin.d2.balancer.properties.NullPartitionProperties;
import com.linkedin.d2.balancer.properties.PartitionProperties; import com.linkedin.d2.balancer.properties.PartitionProperties;
import com.linkedin.d2.balancer.properties.RangeBasedPartitionProperties; import com.linkedin.d2.balancer.properties.RangeBasedPartitionProperties;
import com.linkedin.d2.hashAlgorithm; import com.linkedin.d2.RangedPartitionProperties;
import com.linkedin.d2.rangedPartitionProperties; import com.linkedin.data.template.StringArray;




/** /**
Expand All @@ -40,7 +43,7 @@ public static PartitionProperties toProperties(D2ClusterPartitionConfiguration c
{ {
case RANGE: case RANGE:
{ {
rangedPartitionProperties rangedPartitionProperties = RangedPartitionProperties rangedPartitionProperties =
config.getPartitionTypeSpecificData().getRangedPartitionProperties(); config.getPartitionTypeSpecificData().getRangedPartitionProperties();
partitionProperties = partitionProperties =
new RangeBasedPartitionProperties(config.getPartitionKeyRegex(), new RangeBasedPartitionProperties(config.getPartitionKeyRegex(),
Expand Down Expand Up @@ -68,6 +71,10 @@ public static PartitionProperties toProperties(D2ClusterPartitionConfiguration c
config.getPartitionCount(), config.getPartitionCount(),
algorithm); algorithm);
break; break;
case CUSTOM:
partitionProperties = new CustomizedPartitionProperties(config.getPartitionCount(),
config.getPartitionTypeSpecificData().getPartitionAccessorList().getClassNames());
break;
case NONE: case NONE:
partitionProperties = NullPartitionProperties.getInstance(); partitionProperties = NullPartitionProperties.getInstance();
break; break;
Expand All @@ -91,7 +98,7 @@ public static D2ClusterPartitionConfiguration toConfig(PartitionProperties prope
config.setPartitionCount(range.getPartitionCount()); config.setPartitionCount(range.getPartitionCount());


specificData = new D2ClusterPartitionConfiguration.PartitionTypeSpecificData(); specificData = new D2ClusterPartitionConfiguration.PartitionTypeSpecificData();
rangedPartitionProperties rangedPartitionProperties = new rangedPartitionProperties(); RangedPartitionProperties rangedPartitionProperties = new RangedPartitionProperties();
rangedPartitionProperties.setKeyRangeStart(range.getKeyRangeStart()); rangedPartitionProperties.setKeyRangeStart(range.getKeyRangeStart());
rangedPartitionProperties.setPartitionSize(range.getPartitionSize()); rangedPartitionProperties.setPartitionSize(range.getPartitionSize());
specificData.setRangedPartitionProperties(rangedPartitionProperties); specificData.setRangedPartitionProperties(rangedPartitionProperties);
Expand All @@ -105,9 +112,23 @@ public static D2ClusterPartitionConfiguration toConfig(PartitionProperties prope
config.setPartitionCount(hash.getPartitionCount()); config.setPartitionCount(hash.getPartitionCount());


specificData = new D2ClusterPartitionConfiguration.PartitionTypeSpecificData(); specificData = new D2ClusterPartitionConfiguration.PartitionTypeSpecificData();
specificData.setHashAlgorithm(hashAlgorithm.valueOf(hash.getHashAlgorithm().name())); specificData.setHashAlgorithm(HashAlgorithm.valueOf(hash.getHashAlgorithm().name()));
config.setPartitionTypeSpecificData(specificData);
break;
case CUSTOM:
{
CustomizedPartitionProperties properties = (CustomizedPartitionProperties) property;
config = new D2ClusterPartitionConfiguration();
config.setType(PartitionTypeEnum.CUSTOM);
config.setPartitionCount(properties.getPartitionCount());

specificData = new D2ClusterPartitionConfiguration.PartitionTypeSpecificData();
PartitionAccessorList partitionList = new PartitionAccessorList();
partitionList.setClassNames(new StringArray(properties.getPartitionAccessorList()));
specificData.setPartitionAccessorList(partitionList);
config.setPartitionTypeSpecificData(specificData); config.setPartitionTypeSpecificData(specificData);
break; break;
}
case NONE: case NONE:
config = new D2ClusterPartitionConfiguration(); config = new D2ClusterPartitionConfiguration();
config.setType(PartitionTypeEnum.NONE); config.setType(PartitionTypeEnum.NONE);
Expand Down
Expand Up @@ -127,6 +127,19 @@ public ClusterProperties fromMap(Map<String, Object> map)
new HashBasedPartitionProperties(partitionKeyRegex, partitionCount, algorithm); new HashBasedPartitionProperties(partitionKeyRegex, partitionCount, algorithm);
break; break;
} }
case CUSTOM:
{
int partitionCount = partitionPropertiesMap.containsKey(PropertyKeys.PARTITION_COUNT)
? PropertyUtil.checkAndGetValue(partitionPropertiesMap, PropertyKeys.PARTITION_COUNT, Number.class, scope).intValue()
: 0;

@SuppressWarnings("unchecked")
List<String> partitionAccessorList =partitionPropertiesMap.containsKey(PropertyKeys.PARTITION_ACCESSOR_LIST)
? PropertyUtil.checkAndGetValue(partitionPropertiesMap, PropertyKeys.PARTITION_ACCESSOR_LIST, List.class, scope)
: Collections.emptyList();
partitionProperties = new CustomizedPartitionProperties(partitionCount, partitionAccessorList);
break;
}
case NONE: case NONE:
partitionProperties = NullPartitionProperties.getInstance(); partitionProperties = NullPartitionProperties.getInstance();
break; break;
Expand Down
@@ -0,0 +1,51 @@
/*
Copyright (c) 2017 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.linkedin.d2.balancer.properties;

import java.util.List;

/**
* Properties for Custom Partition
*/

public class CustomizedPartitionProperties implements PartitionProperties
{
private final int _partitionCount;
private final List<String> _partitionAccessorList;

public CustomizedPartitionProperties(int partitionCount, List<String> partitionAccessorList)
{
_partitionCount = partitionCount;
_partitionAccessorList = partitionAccessorList;
}

@Override
public PartitionType getPartitionType()
{
return PartitionType.CUSTOM;
}

public int getPartitionCount()
{
return _partitionCount;
}

public List<String> getPartitionAccessorList()
{
return _partitionAccessorList;
}
}

0 comments on commit 04136c1

Please sign in to comment.