Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added simple graceful degradation at broker level.

  • Loading branch information...
commit b07746e2caf9538d26af82bdfaa4c4403f6de60a 1 parent 14de89f
Baoqiu Cui authored
View
4 sensei-core/src/main/java/com/senseidb/conf/SenseiConfParams.java
@@ -64,6 +64,10 @@
public static final String SERVER_BROKER_MAXTHREAD = "sensei.broker.maxThread";
public static final String SERVER_BROKER_MAXWAIT = "sensei.broker.maxWaittime";
+ public static final String SENSEI_BROKER_POLL_INTERVAL = "sensei.broker.pollInterval";
+ public static final String SENSEI_BROKER_MIN_RESPONSES = "sensei.broker.minResponses";
+ public static final String SENSEI_BROKER_MAX_TOTAL_WAIT = "sensei.broker.maxTotalWait";
+
public static final String SERVER_SEARCH_ROUTER_FACTORY = "sensei.search.router.factory";
public static final String SENSEI_INDEX_PRUNER = "sensei.index.pruner";
View
126 sensei-core/src/main/java/com/senseidb/search/node/AbstractConsistentHashBroker.java
@@ -6,6 +6,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -44,6 +45,9 @@
protected long _timeout = 8000;
protected final Serializer<REQUEST, RESULT> _serializer;
protected volatile SenseiLoadBalancer _loadBalancer;
+ protected final int _pollInterval;
+ protected final int _minResponses;
+ protected final int _maxTotalWait;
private static Timer ScatterTimer = null;
private static Timer GatherTimer = null;
@@ -85,12 +89,19 @@
* @param scatterGatherHandler
* @throws NorbertException
*/
- public AbstractConsistentHashBroker(PartitionedNetworkClient<Integer> networkClient, Serializer<REQUEST, RESULT> serializer)
+ public AbstractConsistentHashBroker(PartitionedNetworkClient<Integer> networkClient,
+ Serializer<REQUEST, RESULT> serializer,
+ int pollInterval,
+ int minResponses,
+ int maxTotalWait)
throws NorbertException
{
super(networkClient);
_loadBalancer = null;
_serializer = serializer;
+ _pollInterval = pollInterval;
+ _minResponses = minResponses;
+ _maxTotalWait = maxTotalWait;
}
public <T> T customizeRequest(REQUEST request)
@@ -190,58 +201,71 @@ protected RESULT doBrowse(PartitionedNetworkClient<Integer> networkClient, final
final Map<Integer, Node> nodeMap = new HashMap<Integer, Node>();
final Map<Integer, Future<RESULT>> futureMap = new HashMap<Integer, Future<RESULT>>();
- try{
- ScatterTimer.time(new Callable<Object>(){
-
- @Override
- public Object call() throws Exception {
- for(int ni = 0; ni < parts.length; ni++)
- {
- Node node = searchNodes.nodelist[ni].get(searchNodes.nodegroup[ni]);
- Set<Integer> pset = partsMap.get(node.getId());
- if (pset == null)
- {
- pset = new HashSet<Integer>();
- partsMap.put(node.getId(), pset);
- }
- pset.add(parts[ni]);
- nodeMap.put(node.getId(), node);
- }
+ try
+ {
+ ScatterTimer.time(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ for(int ni = 0; ni < parts.length; ni++)
+ {
+ Node node = searchNodes.nodelist[ni].get(searchNodes.nodegroup[ni]);
+ Set<Integer> pset = partsMap.get(node.getId());
+ if (pset == null)
+ {
+ pset = new HashSet<Integer>();
+ partsMap.put(node.getId(), pset);
+ }
+ pset.add(parts[ni]);
+ nodeMap.put(node.getId(), node);
+ }
- for (Map.Entry<Integer, Node> entry : nodeMap.entrySet())
- {
- req.setPartitions(partsMap.get(entry.getKey()));
- req.saveState();
- REQUEST thisRequest = customizeRequest(req);
- if (logger.isDebugEnabled()){
- logger.debug("broker sending req part: " + partsMap.get(entry.getKey()) + " on node: " + entry.getValue());
- }
- futureMap.put(entry.getKey(), (Future<RESULT>)_networkClient.sendRequestToNode(thisRequest, entry.getValue(), _serializer));
- req.restoreState();
- }
- for(Map.Entry<Integer, Future<RESULT>> entry : futureMap.entrySet())
- {
- RESULT resp;
- try
- {
- resp = entry.getValue().get(_timeout,TimeUnit.MILLISECONDS);
- resultlist.add(resp);
- if (logger.isDebugEnabled())
- {
- logger.debug("broker receiving res part: " + partsMap.get(entry.getKey()) + " on node: " + nodeMap.get(entry.getKey())
- + " node time: " + resp.getTime() +"ms remote time: " + (System.currentTimeMillis() - time) + "ms");
- }
- } catch (Exception e)
- {
- ErrorMeter.mark();
- logger.error("broker receiving res part: " + partsMap.get(entry.getKey()) + " on node: " + nodeMap.get(entry.getKey())
- + e +" remote time: " + (System.currentTimeMillis() - time) + "ms");
- }
- }
- return null;
- }
+ for (Map.Entry<Integer, Node> entry : nodeMap.entrySet())
+ {
+ req.setPartitions(partsMap.get(entry.getKey()));
+ req.saveState();
+ REQUEST thisRequest = customizeRequest(req);
+ if (logger.isDebugEnabled()){
+ logger.debug("broker sending req part: " + partsMap.get(entry.getKey()) + " on node: " + entry.getValue());
+ }
+ futureMap.put(entry.getKey(), (Future<RESULT>)_networkClient.sendRequestToNode(thisRequest, entry.getValue(), _serializer));
+ req.restoreState();
+ }
+
+ int totalTime = 0;
+ int interval = _pollInterval;
+ int numResults = 0;
+ int totalTasks = futureMap.size();
+ int minRespExpected = (_minResponses < totalTasks) ? _minResponses : totalTasks;
+ while (numResults < minRespExpected ||
+ (numResults < totalTasks && totalTime < _maxTotalWait))
+ {
+ long startTime = System.currentTimeMillis();
+ Thread.sleep(interval); // Sleep for a small interval. May wake up much later.
+ totalTime += (System.currentTimeMillis() - startTime);
+ if (totalTime > _timeout)
+ {
+ logger.error("Hit hard timeout limit on broker.");
+ break;
+ }
+ Iterator itr = futureMap.entrySet().iterator();
+ while (itr.hasNext())
+ {
+ Map.Entry<Integer, Future<RESULT>> entry = (Map.Entry<Integer, Future<RESULT>>) itr.next();
+ Future<RESULT> futureRes = entry.getValue();
+ if (futureRes.isDone())
+ {
+ resultlist.add((RESULT) futureRes.get());
+ itr.remove();
+ numResults++;
+ }
+ }
+ }
+
+ logger.info("totalTime = " + totalTime + ", resultlist.size = " + resultlist.size());
+ return null;
+ }
- });
+ });
}
catch(Exception e){
logger.error(e.getMessage(),e);
View
11 sensei-core/src/main/java/com/senseidb/search/node/SenseiBroker.java
@@ -38,10 +38,15 @@
private long _timeoutMillis = TIMEOUT_MILLIS;
private final SenseiLoadBalancerFactory _loadBalancerFactory;
- public SenseiBroker(PartitionedNetworkClient<Integer> networkClient, ClusterClient clusterClient,
- SenseiLoadBalancerFactory loadBalancerFactory) throws NorbertException
+ public SenseiBroker(PartitionedNetworkClient<Integer> networkClient,
+ ClusterClient clusterClient,
+ SenseiLoadBalancerFactory loadBalancerFactory,
+ int pollInterval,
+ int minResponses,
+ int maxTotalWait)
+ throws NorbertException
{
- super(networkClient, CoreSenseiServiceImpl.PROTO_SERIALIZER);
+ super(networkClient, CoreSenseiServiceImpl.PROTO_SERIALIZER, pollInterval, minResponses, maxTotalWait);
_loadBalancerFactory = loadBalancerFactory;
clusterClient.addListener(this);
logger.info("created broker instance " + networkClient + " " + clusterClient + " " + loadBalancerFactory);
View
12 sensei-core/src/main/java/com/senseidb/search/node/SenseiSysBroker.java
@@ -36,10 +36,16 @@
protected Set<Node> _nodes = Collections.EMPTY_SET;
- public SenseiSysBroker(PartitionedNetworkClient<Integer> networkClient, ClusterClient clusterClient,
- SenseiLoadBalancerFactory loadBalancerFactory, Comparator<String> versionComparator) throws NorbertException
+ public SenseiSysBroker(PartitionedNetworkClient<Integer> networkClient,
+ ClusterClient clusterClient,
+ SenseiLoadBalancerFactory loadBalancerFactory,
+ Comparator<String> versionComparator,
+ int pollInterval,
+ int minResponses,
+ int maxTotalWait)
+ throws NorbertException
{
- super(networkClient, SysSenseiCoreServiceImpl.PROTO_SERIALIZER);
+ super(networkClient, SysSenseiCoreServiceImpl.PROTO_SERIALIZER, pollInterval, minResponses, maxTotalWait);
_versionComparator = versionComparator;
_loadBalancerFactory = loadBalancerFactory;
clusterClient.addListener(this);
View
12 sensei-core/src/main/java/com/senseidb/search/req/mapred/obsolete/MapReduceBroker.java
@@ -21,9 +21,15 @@
private final SenseiLoadBalancerFactory loadBalancerFactory;
private long _timeoutMillis;
- public MapReduceBroker(PartitionedNetworkClient<Integer> networkClient, ClusterClient clusterClient,
- SenseiLoadBalancerFactory loadBalancerFactory) throws NorbertException {
- super(networkClient, MapReduceSenseiService.SERIALIZER);
+ public MapReduceBroker(PartitionedNetworkClient<Integer> networkClient,
+ ClusterClient clusterClient,
+ SenseiLoadBalancerFactory loadBalancerFactory,
+ int pollInterval,
+ int minResponses,
+ int maxTotalWait)
+ throws NorbertException
+ {
+ super(networkClient, MapReduceSenseiService.SERIALIZER, pollInterval, minResponses, maxTotalWait);
this.loadBalancerFactory = loadBalancerFactory;
clusterClient.addListener(this);
logger.info("created broker instance " + networkClient + " " + clusterClient + " " + loadBalancerFactory);
View
6 sensei-core/src/main/java/com/senseidb/servlet/AbstractSenseiClientServlet.java
@@ -83,8 +83,10 @@ public void init(ServletConfig config) throws ServletException {
_networkClientConfig.setClusterClient(_clusterClient);
_networkClient = new SenseiNetworkClient(_networkClientConfig, null);
- _senseiBroker = new SenseiBroker(_networkClient, _clusterClient, loadBalancerFactory);
- _senseiSysBroker = new SenseiSysBroker(_networkClient, _clusterClient, loadBalancerFactory, versionComparator);
+ _senseiBroker = new SenseiBroker(_networkClient, _clusterClient, loadBalancerFactory,
+ pollInterval, minResponses, maxTotalWait);
+ _senseiSysBroker = new SenseiSysBroker(_networkClient, _clusterClient, loadBalancerFactory, versionComparator,
+ pollInterval, minResponses, maxTotalWait);
logger.info("Connecting to cluster: "+clusterName+" ...");
_clusterClient.awaitConnectionUninterruptibly();
View
6 sensei-core/src/main/java/com/senseidb/servlet/SenseiHttpInvokerServiceServlet.java
@@ -27,8 +27,10 @@ public void init(ServletConfig config) throws ServletException {
super.init(config);
innerSvc = new ClusteredSenseiServiceImpl(zkurl, zkTimeout, clusterClientName, clusterName, connectTimeoutMillis,
- writeTimeoutMillis, maxConnectionsPerNode, staleRequestTimeoutMins, staleRequestCleanupFrequencyMins,
- loadBalancerFactory, versionComparator);
+ writeTimeoutMillis, maxConnectionsPerNode, staleRequestTimeoutMins,
+ staleRequestCleanupFrequencyMins,
+ loadBalancerFactory, versionComparator,
+ pollInterval, minResponses, maxTotalWait);
innerSvc.start();
target = new HttpInvokerServiceExporter();
target.setService(innerSvc);
View
6 sensei-core/src/main/java/com/senseidb/servlet/ZookeeperConfigurableServlet.java
@@ -29,6 +29,9 @@
protected int staleRequestCleanupFrequencyMins;
protected SenseiLoadBalancerFactory loadBalancerFactory;
protected Comparator<String> versionComparator;
+ protected int pollInterval;
+ protected int minResponses;
+ protected int maxTotalWait;
@Override
public void init(ServletConfig config) throws ServletException {
@@ -56,5 +59,8 @@ public void init(ServletConfig config) throws ServletException {
versionComparator = (Comparator<String>)ctx.getAttribute(SenseiConfigServletContextListener.SENSEI_CONF_VERSION_COMPARATOR);
loadBalancerFactory = (SenseiLoadBalancerFactory)ctx.getAttribute(
SenseiConfigServletContextListener.SENSEI_CONF_ROUTER_FACTORY);
+ pollInterval = senseiConf.getInt(SenseiConfParams.SENSEI_BROKER_POLL_INTERVAL, 2);
+ minResponses = senseiConf.getInt(SenseiConfParams.SENSEI_BROKER_MIN_RESPONSES, 2);
+ maxTotalWait = senseiConf.getInt(SenseiConfParams.SENSEI_BROKER_MAX_TOTAL_WAIT, 200);
}
}
View
13 sensei-core/src/main/java/com/senseidb/svc/impl/ClusteredSenseiServiceImpl.java
@@ -29,9 +29,10 @@
private final String _clusterName;
public ClusteredSenseiServiceImpl(String zkurl,int zkTimeout,String clusterClientName, String clusterName, int connectTimeoutMillis,
- int writeTimeoutMillis, int maxConnectionsPerNode, int staleRequestTimeoutMins,
- int staleRequestCleanupFrequencyMins, SenseiLoadBalancerFactory loadBalancerFactory,
- Comparator<String> versionComparator) {
+ int writeTimeoutMillis, int maxConnectionsPerNode, int staleRequestTimeoutMins,
+ int staleRequestCleanupFrequencyMins, SenseiLoadBalancerFactory loadBalancerFactory,
+ Comparator<String> versionComparator, int pollInterval, int minResponses, int maxTotalWait)
+ {
_clusterName = clusterName;
_networkClientConfig.setServiceName(clusterName);
_networkClientConfig.setZooKeeperConnectString(zkurl);
@@ -47,8 +48,10 @@ public ClusteredSenseiServiceImpl(String zkurl,int zkTimeout,String clusterClien
_networkClientConfig.setClusterClient(_clusterClient);
_networkClient = new SenseiNetworkClient(_networkClientConfig,null);
- _senseiBroker = new SenseiBroker(_networkClient, _clusterClient, loadBalancerFactory);
- _senseiSysBroker = new SenseiSysBroker(_networkClient, _clusterClient, loadBalancerFactory, versionComparator);
+ _senseiBroker = new SenseiBroker(_networkClient, _clusterClient, loadBalancerFactory,
+ pollInterval, minResponses, maxTotalWait);
+ _senseiSysBroker = new SenseiSysBroker(_networkClient, _clusterClient, loadBalancerFactory, versionComparator,
+ pollInterval, minResponses, maxTotalWait);
}
public void start(){
View
4 sensei-core/src/test/java/com/senseidb/test/SenseiStarter.java
@@ -91,9 +91,9 @@ public static synchronized void start(String confDir1, String confDir2) {
broker = null;
try
{
- broker = new SenseiBroker(networkClient, clusterClient, loadBalancerFactory);
+ broker = new SenseiBroker(networkClient, clusterClient, loadBalancerFactory, 1, 2, 2000);
broker.setTimeoutMillis(0);
- mapReduceBroker = new MapReduceBroker(networkClient, clusterClient, loadBalancerFactory);
+ mapReduceBroker = new MapReduceBroker(networkClient, clusterClient, loadBalancerFactory, 1, 2, 2000);
broker.setTimeoutMillis(0);
} catch (NorbertException ne) {
logger.info("shutting down cluster...", ne);
Please sign in to comment.
Something went wrong with that request. Please try again.