Skip to content
This repository has been archived by the owner on Oct 30, 2020. It is now read-only.

Commit

Permalink
Consistent hashing handled by norbert.
Browse files Browse the repository at this point in the history
  • Loading branch information
Joshua Hartman committed Mar 14, 2012
1 parent 3b62517 commit 968630b
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 29 deletions.
2 changes: 1 addition & 1 deletion sensei-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@
<dependency>
<groupId>com.linkedin</groupId>
<artifactId>norbert_2.8.1</artifactId>
<version>0.6.13</version>
<version>0.6.14-SNAPSHOT</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ public <RequestMsg, ResponseMsg, T> T sendRequest(Set<Integer> partitions, Reque
}

@Override
public <RequestMsg, ResponseMsg> ResponseIterator<ResponseMsg> sendRequestToOneReplica(RequestBuilder<Integer, RequestMsg> requestBuilder, Serializer<RequestMsg, ResponseMsg> serializer) throws InvalidClusterException, NoNodesAvailableException, ClusterDisconnectedException {
return _networkClient.sendRequestToOneReplica(requestBuilder, serializer);
public <RequestMsg, ResponseMsg> ResponseIterator<ResponseMsg> sendRequestToOneReplica(Integer partitionedId, RequestBuilder<Integer, RequestMsg> requestBuilder, Serializer<RequestMsg, ResponseMsg> serializer) throws InvalidClusterException, NoNodesAvailableException, ClusterDisconnectedException {
return _networkClient.sendRequestToOneReplica(partitionedId, requestBuilder, serializer);
}

@Override
public <RequestMsg, ResponseMsg> ResponseIterator<ResponseMsg> sendRequestToOneReplica(RequestMsg request, Serializer<RequestMsg, ResponseMsg> serializer) throws InvalidClusterException, NoNodesAvailableException, ClusterDisconnectedException {
return _networkClient.sendRequestToOneReplica(request, serializer);
public <RequestMsg, ResponseMsg> ResponseIterator<ResponseMsg> sendRequestToOneReplica(Integer partitionedId, RequestMsg request, Serializer<RequestMsg, ResponseMsg> serializer) throws InvalidClusterException, NoNodesAvailableException, ClusterDisconnectedException {
return _networkClient.sendRequestToOneReplica(partitionedId, request, serializer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

import com.linkedin.norbert.NorbertException;
Expand Down Expand Up @@ -159,9 +160,22 @@ public RESULT call() throws Exception {
* @return one single result instance that is merged from the result list.
*/
public abstract RESULT mergeResults(REQUEST request, List<RESULT> resultList);
public abstract String getRouteParam(REQUEST req);



protected Integer getRouteParam(REQUEST req) {
String routeParamString = req.getRouteParam();
if(routeParamString == null)
return null;

else if("".equals(routeParamString))
return 0;

try {
return Integer.parseInt(routeParamString);
} catch (NumberFormatException nfe) {
return routeParamString.hashCode();
}
}


protected RESULT doBrowse(PartitionedNetworkClient<Integer> networkClient, final REQUEST req, IntSet partitions)
{
Expand Down Expand Up @@ -211,7 +225,7 @@ public RESULT call() throws Exception {
protected List<RESULT> doCall(final REQUEST req) throws ExecutionException {
List<RESULT> resultList = new ArrayList<RESULT>();
ResponseIterator<RESULT> responseIterator =
_networkClient.sendRequestToOneReplica(new RequestBuilder<Integer, REQUEST>() {
_networkClient.sendRequestToOneReplica(getRouteParam(req), new RequestBuilder<Integer, REQUEST>() {
private int count = 0;
@Override
public REQUEST apply(Node node, Set<Integer> nodePartitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,6 @@ public SenseiResult mergeResults(SenseiRequest request, List<SenseiResult> resul
return res;
}

@Override
public String getRouteParam(SenseiRequest req)
{
return req.getRouteParam();
}

@Override
public SenseiResult getEmptyResultInstance()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public SenseiSystemInfo mergeResults(SenseiRequest request, List<SenseiSystemInf
protected List<SenseiSystemInfo> doCall(final SenseiRequest req) throws ExecutionException {
final List<SenseiSystemInfo> resultList = new ArrayList<SenseiSystemInfo>();
ResponseIterator<SenseiSystemInfo> responseIterator =
_networkClient.sendRequestToOneReplica(new RequestBuilder<Integer, SenseiRequest>() {
_networkClient.sendRequestToOneReplica(getRouteParam(req), new RequestBuilder<Integer, SenseiRequest>() {
@Override
public SenseiRequest apply(Node node, Set<Integer> nodePartitions) {
synchronized (req) {
Expand All @@ -107,12 +107,6 @@ public SenseiRequest apply(Node node, Set<Integer> nodePartitions) {

}

@Override
public String getRouteParam(SenseiRequest req)
{
return req.getRouteParam();
}

@Override
public SenseiSystemInfo getEmptyResultInstance()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

import com.linkedin.norbert.NorbertException;
Expand Down Expand Up @@ -51,12 +52,6 @@ public SenseiMapReduceResult mergeResults(MapReduceRequest request, List<SenseiM
return (SenseiMapReduceResult) new SenseiMapReduceResult().setReduceResult(request.getMapReduceJob().reduce(mapRes));
}

@Override
public String getRouteParam(MapReduceRequest req) {

return req.getRouteParam();
}

@Override
public void setTimeoutMillis(long timeoutMillis) {
_timeoutMillis = timeoutMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void init(ServletConfig config) throws ServletException {
SenseiPluginRegistry pluginRegistry = builder.getPluginRegistry();
PartitionedLoadBalancerFactory<Integer> routerFactory = pluginRegistry.getBeanByFullPrefix(SenseiConfParams.SERVER_SEARCH_ROUTER_FACTORY, PartitionedLoadBalancerFactory.class);
if (routerFactory == null) {
routerFactory = new IntegerConsistentHashPartitionedLoadBalancerFactory(-1, true)
routerFactory = new IntegerConsistentHashPartitionedLoadBalancerFactory(-1, true);
}
ctx.setAttribute("sensei.search.router.factory", routerFactory);

Expand Down

0 comments on commit 968630b

Please sign in to comment.