Skip to content

Commit

Permalink
Sensei broker and node parition timeout refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
yozhao committed Jan 22, 2014
1 parent 01c25cb commit f1706fa
Show file tree
Hide file tree
Showing 16 changed files with 242 additions and 260 deletions.
12 changes: 5 additions & 7 deletions bin/start-sensei-node.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
#!/usr/bin/env bash

#usage="Usage: start-sensei-node.sh <id> <port> <partitions> <conf-dir>"

# if no args specified, show usage
#if [ $# -le 3 ]; then
# echo $usage
# exit 1
#fi
usage="Usage: start-sensei-node.sh <conf-dir>"
if [ $# -ne 1 ]; then
echo $usage
exit 1
fi

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
Expand Down
34 changes: 3 additions & 31 deletions sensei-core/src/main/java/com/senseidb/conf/SenseiConfParams.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.senseidb.conf;

import java.util.Comparator;

public interface SenseiConfParams {
public class SenseiConfParams {
public static final String NODE_ID = "sensei.node.id";
public static final String PARTITIONS = "sensei.node.partitions";

Expand Down Expand Up @@ -67,6 +66,7 @@ public interface SenseiConfParams {
public static final String SERVER_BROKER_MAXTHREAD = "sensei.broker.maxThread";
public static final String SERVER_BROKER_MAXWAIT = "sensei.broker.maxWaittime";
public static final String SERVER_BROKER_TIMEOUT = "sensei.broker.timeout";
public static final String SERVER_BROKER_FINAGLE_THREAD = "sensei.broker.finagle.thread";

public static final String SENSEI_BROKER_POLL_INTERVAL = "sensei.broker.pollInterval";
public static final String SENSEI_BROKER_MIN_RESPONSES = "sensei.broker.minResponses";
Expand All @@ -87,33 +87,5 @@ public interface SenseiConfParams {
public static final String SENSEI_INDEX_ACTIVITY_PURGE_FREQUENCY_HOURS = "sensei.index.activity.purge.hours";
public static final String SENSEI_INDEX_ACTIVITY_PURGE_FREQUENCY_MINUTES = "sensei.index.activity.purge.minutes";

public static final Comparator<String> DEFAULT_VERSION_STRING_COMPARATOR = new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
if (o1 == null && o2 == null) {
return 0;
}
if (o1 == null) return -1;
if (o2 == null) return 1;
return o1.compareTo(o2);
}
};

public static final Comparator<String> DEFAULT_VERSION_LONG_COMPARATOR = new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
long l1, l2;
if (o1 == null || o1.length() == 0) {
l1 = 0L;
} else {
l1 = Long.parseLong(o1);
}
if (o2 == null || o2.length() == 0) {
l2 = 0L;
} else {
l2 = Long.parseLong(o2);
}
return Long.valueOf(l1).compareTo(Long.valueOf(l2));
}
};
public static final String SENSEI_NODE_PARTITION_TIMEOUT = "sensei.node.partition.timeout";
}
153 changes: 69 additions & 84 deletions sensei-core/src/main/java/com/senseidb/conf/SenseiServerBuilder.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.log4j.Logger;

Expand All @@ -22,6 +22,7 @@
import zu.finagle.client.ZuTransportClientProxy;
import zu.finagle.serialize.ZuSerializer;

import com.senseidb.conf.SenseiConfParams;
import com.senseidb.metrics.MetricsConstants;
import com.senseidb.search.req.AbstractSenseiRequest;
import com.senseidb.search.req.AbstractSenseiResult;
Expand Down Expand Up @@ -51,6 +52,7 @@ public abstract class AbstractConsistentHashBroker<REQUEST extends AbstractSense

protected long _timeout = 8000;
protected final ZuSerializer<REQUEST, RESULT> _serializer;
private int _finagleThreadNumber = 20;

private static Timer ScatterTimer = null;
private static Timer GatherTimer = null;
Expand Down Expand Up @@ -98,12 +100,22 @@ public abstract class AbstractConsistentHashBroker<REQUEST extends AbstractSense
* The serializer used to serialize/deserialize request/response pairs
*/
public AbstractConsistentHashBroker(ZuCluster clusterClient,
ZuSerializer<REQUEST, RESULT> serializer) {
ZuSerializer<REQUEST, RESULT> serializer, Configuration senseiConf) {
super();
_serializer = serializer;
ZuTransportClientProxy<REQUEST, RESULT> proxy = new ZuTransportClientProxy<REQUEST, RESULT>(
getMessageType(), _serializer);
serviceDecorator = new ZuFinagleServiceDecorator<REQUEST, RESULT>(proxy);
if (this instanceof SenseiSysBroker) {
// hard code config for SenseiSysBroker
_timeout = 8000;
_finagleThreadNumber = 4;
} else {
_timeout = senseiConf.getLong(SenseiConfParams.SERVER_BROKER_TIMEOUT, 8000);
_finagleThreadNumber = senseiConf.getInt(SenseiConfParams.SERVER_BROKER_FINAGLE_THREAD, 20);
}
serviceDecorator = new ZuFinagleServiceDecorator<REQUEST, RESULT>(proxy, Duration.apply(
_timeout, TimeUnit.MILLISECONDS), _finagleThreadNumber);

router = new ConsistentHashRoutingAlgorithm<Service<REQUEST, RESULT>>(serviceDecorator);
clusterClient.addClusterEventListener(router);
}
Expand Down Expand Up @@ -214,7 +226,7 @@ public RESULT call() throws Exception {
}

@SuppressWarnings("unchecked")
protected List<RESULT> doCall(REQUEST req) throws ExecutionException {
protected List<RESULT> doCall(REQUEST req) {
Set<Integer> shards = router.getShards();

Map<Service<REQUEST, RESULT>, REQUEST> serviceToRequest = new HashMap<Service<REQUEST, RESULT>, REQUEST>();
Expand Down Expand Up @@ -248,37 +260,39 @@ public void shutdown() {
logger.info("shutting down broker...");
}

@Override
public long getTimeout() {
return _timeout;
}

@Override
public void setTimeout(long timeout) {
this._timeout = timeout;
}

protected List<RESULT> executeRequestsInParallel(
final Map<Service<REQUEST, RESULT>, REQUEST> serviceToRequest, long timeout) {
long start = System.currentTimeMillis();
final long start = System.currentTimeMillis();
final List<Future<RESULT>> futures = new ArrayList<Future<RESULT>>();
for (Entry<Service<REQUEST, RESULT>, REQUEST> entry : serviceToRequest.entrySet()) {
final List<RESULT> results = new ArrayList<RESULT>();
for (final Entry<Service<REQUEST, RESULT>, REQUEST> entry : serviceToRequest.entrySet()) {
futures.add(entry.getKey().apply(entry.getValue())
.addEventListener(new FutureEventListener<RESULT>() {

@Override
public void onFailure(Throwable t) {
logger.error("Failed to get response", t);
logger.error("Failed to get response from " + getServiceAddress(entry.getKey()), t);
}

@Override
public void onSuccess(RESULT result) {
// do nothing as we wait for all results below
synchronized (results) {
results.add(result);
}
logger.info(String.format("Getting response from "
+ getServiceAddress(entry.getKey()) + " took %dms.",
(System.currentTimeMillis() - start)));
}
}));
}

Future<List<RESULT>> collected = Future.collect(futures);
List<RESULT> results = collected.apply(Duration.apply(timeout, TimeUnit.MILLISECONDS));
try {
collected.apply(Duration.apply(timeout, TimeUnit.MILLISECONDS));
} catch (Exception e) {
logger.error("Failed to get results from all nodes, exception: " + e.getMessage());
}

logger.info(String.format("Getting responses from %d nodes took %dms.", results.size(),
(System.currentTimeMillis() - start)));
return results;
Expand All @@ -292,4 +306,8 @@ protected static Set<InetSocketAddress> getNodesAddresses(
}
return nodes;
}

public InetSocketAddress getServiceAddress(Service<REQUEST, RESULT> service) {
return router.getServiceAddress(service);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

/**
* @author "Xiaoyang Gu<xgu@linkedin.com>"
*
*
* @param <REQUEST>
* @param <RESULT>
*/
Expand All @@ -24,19 +24,15 @@ public abstract class AbstractSenseiBroker<REQUEST extends AbstractSenseiRequest

/**
* The method that provides the search service.
*
*
* @param req
* @return
* @throws SenseiException
*/
@Override
public abstract RESULT browse(final REQUEST req) throws SenseiException;

public void shutdown() {
logger.info("shutting down broker...");
}

public abstract void setTimeout(long timeoutMillis);

public abstract long getTimeout();

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;
import java.util.Set;

import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.lucene.search.SortField;

Expand Down Expand Up @@ -39,8 +40,8 @@ public class SenseiBroker extends AbstractConsistentHashBroker<SenseiRequest, Se
SenseiBroker.class, "numberOfNodesInTheCluster"));
private volatile boolean disconnected;

public SenseiBroker(ZuCluster clusterClient) {
super(clusterClient, CoreSenseiServiceImpl.JAVA_SERIALIZER);
public SenseiBroker(ZuCluster clusterClient, Configuration senseiConf) {
super(clusterClient, CoreSenseiServiceImpl.JAVA_SERIALIZER, senseiConf);
clusterClient.addClusterEventListener(this);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.senseidb.search.node;

import java.io.File;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Arrays;
Expand All @@ -10,6 +11,8 @@

import javax.management.StandardMBean;

import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.mortbay.jetty.Server;

Expand All @@ -18,6 +21,7 @@
import zu.finagle.server.ZuFinagleServer;
import zu.finagle.server.ZuTransportService;

import com.senseidb.conf.SenseiConfParams;
import com.senseidb.conf.SenseiServerBuilder;
import com.senseidb.jmx.JmxUtil;
import com.senseidb.plugin.SenseiPluginRegistry;
Expand All @@ -42,6 +46,7 @@ public class SenseiServer {
private final int _port;
private final int[] _partitions;
private final SenseiCore _core;
private final Configuration _senseiConf;
private final List<AbstractSenseiCoreService<AbstractSenseiRequest, AbstractSenseiResult>> _externalSvc;

protected volatile boolean _available = false;
Expand All @@ -52,21 +57,31 @@ public class SenseiServer {
private final ZuFinagleServer server;
private final ZuCluster cluster;

public SenseiServer(int port, SenseiCore senseiCore,
List<AbstractSenseiCoreService<AbstractSenseiRequest, AbstractSenseiResult>> externalSvc,
SenseiPluginRegistry pluginRegistry, ZuTransportService transport, ZuFinagleServer server,
ZuCluster cluster) {
@SuppressWarnings({ "unchecked", "rawtypes" })
public SenseiServer(SenseiCore senseiCore, SenseiPluginRegistry pluginRegistry,
ZuCluster cluster, Configuration senseiConf) throws ConfigurationException {
_core = senseiCore;
this.pluginRegistry = pluginRegistry;
this.transportService = transport;
this.server = server;
this.cluster = cluster;
_id = senseiCore.getNodeId();
_port = port;

_senseiConf = senseiConf;
_id = senseiConf.getInt(SenseiConfParams.NODE_ID);
_port = senseiConf.getInt(SenseiConfParams.SERVER_PORT);
_partitions = senseiCore.getPartitions();

new CoreSenseiServiceImpl(senseiCore);
_externalSvc = externalSvc;
this.transportService = new ZuTransportService();
int serverPort = senseiConf.getInt(SenseiConfParams.SERVER_PORT);
String hostAddress;
try {
hostAddress = NetUtil.getHostAddress();
} catch (Exception e) {
throw new ConfigurationException(e.getMessage(), e);
}
this.server = new ZuFinagleServer("sensei-finagle-server-" + _id, new InetSocketAddress(
hostAddress, serverPort), transportService.getService());

_externalSvc = (List) pluginRegistry.resolveBeansByListKey(SenseiConfParams.SENSEI_PLUGIN_SVCS,
AbstractSenseiCoreService.class);
}

private static String help() {
Expand Down Expand Up @@ -112,9 +127,9 @@ public void start(boolean available) throws Exception {
logger.info("Cluster Id: " + cluster.getClusterId());

AbstractSenseiCoreService<SenseiRequest, SenseiResult> coreSenseiService = new CoreSenseiServiceImpl(
_core);
_core, _senseiConf);
AbstractSenseiCoreService<SenseiRequest, SenseiSystemInfo> sysSenseiCoreService = new SysSenseiCoreServiceImpl(
_core);
_core, _senseiConf);
SenseiCoreServiceMessageHandler<SenseiRequest, SenseiResult> senseiMsgHandler = new SenseiCoreServiceMessageHandler<SenseiRequest, SenseiResult>(
coreSenseiService);
SenseiCoreServiceMessageHandler<SenseiRequest, SenseiSystemInfo> senseiSysMsgHandler = new SenseiCoreServiceMessageHandler<SenseiRequest, SenseiSystemInfo>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;

import zu.core.cluster.ZuCluster;
Expand All @@ -25,9 +25,11 @@ public class SenseiSysBroker extends AbstractConsistentHashBroker<SenseiRequest,
private final long _timeoutMillis = TIMEOUT_MILLIS;
private final Comparator<String> _versionComparator;
private Map<InetSocketAddress, Service<SenseiRequest, SenseiSystemInfo>> _nodeAddressToService;
private Map<Service<SenseiRequest, SenseiSystemInfo>, InetSocketAddress> _nodeServiceToAddress;

public SenseiSysBroker(ZuCluster clusterClient, Comparator<String> versionComparator) {
super(clusterClient, SysSenseiCoreServiceImpl.JAVA_SERIALIZER);
public SenseiSysBroker(ZuCluster clusterClient, Comparator<String> versionComparator,
Configuration senseiConf) {
super(clusterClient, SysSenseiCoreServiceImpl.JAVA_SERIALIZER, senseiConf);
_versionComparator = versionComparator;
clusterClient.addClusterEventListener(this);
}
Expand Down Expand Up @@ -56,7 +58,7 @@ public SenseiSystemInfo mergeResults(SenseiRequest request, List<SenseiSystemInf
}

@Override
protected List<SenseiSystemInfo> doCall(final SenseiRequest req) throws ExecutionException {
protected List<SenseiSystemInfo> doCall(final SenseiRequest req) {

Map<Service<SenseiRequest, SenseiSystemInfo>, SenseiRequest> serviceToRequest = new HashMap<Service<SenseiRequest, SenseiSystemInfo>, SenseiRequest>();
for (Service<SenseiRequest, SenseiSystemInfo> service : _nodeAddressToService.values()) {
Expand All @@ -81,15 +83,22 @@ public void clusterChanged(Map<Integer, List<InetSocketAddress>> clusterView) {
logger.info("clusterChanged(): Received new clusterView from zu " + clusterView);
Set<InetSocketAddress> nodes = getNodesAddresses(clusterView);
Map<InetSocketAddress, Service<SenseiRequest, SenseiSystemInfo>> addressToService = new HashMap<InetSocketAddress, Service<SenseiRequest, SenseiSystemInfo>>();
Map<Service<SenseiRequest, SenseiSystemInfo>, InetSocketAddress> serviceToAddress = new HashMap<Service<SenseiRequest, SenseiSystemInfo>, InetSocketAddress>();
for (InetSocketAddress nodeAddress : nodes) {
Service<SenseiRequest, SenseiSystemInfo> service = serviceDecorator.decorate(nodeAddress);
addressToService.put(nodeAddress, service);
serviceToAddress.put(service, nodeAddress);
}
_nodeAddressToService = addressToService;
_nodeServiceToAddress = serviceToAddress;
}

@Override
public void nodesRemoved(Set<InetSocketAddress> removedNodes) {
}

@Override
public InetSocketAddress getServiceAddress(Service<SenseiRequest, SenseiSystemInfo> service) {
return _nodeServiceToAddress.get(service);
}
}
Loading

0 comments on commit f1706fa

Please sign in to comment.