Skip to content
This repository has been archived by the owner on Oct 30, 2020. It is now read-only.

Commit

Permalink
Merge remote branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
alperez committed Apr 3, 2012
2 parents 65d1b84 + 1e79158 commit f8079ca
Show file tree
Hide file tree
Showing 17 changed files with 413 additions and 266 deletions.
56 changes: 43 additions & 13 deletions sensei-core/src/main/antlr3/com/senseidb/bql/parsers/BQL.g
Expand Up @@ -818,22 +818,44 @@ limit_clause returns [int offset, int count]
}
;
or_column_name_list returns [JSONArray json]
@init {
$json = new JSONArray();
}
: col=column_name
{
String colName = $col.text;
if (colName != null) {
$json.put($col.text);
}
}
(OR col=column_name
{
colName = $col.text;
if (colName != null) {
$json.put($col.text);
}
}
)*
;
group_by_clause returns [JSONObject json]
: GROUP BY column_name (TOP top=INTEGER)?
: GROUP BY or_column_name_list (TOP top=INTEGER)?
{
$json = new JSONObject();
try {
JSONArray cols = new JSONArray();
String col = $column_name.text;
String[] facetInfo = _facetInfoMap.get(col);
if (facetInfo != null && (facetInfo[0].equals("range") ||
facetInfo[0].equals("multi") ||
facetInfo[0].equals("path"))) {
throw new FailedPredicateException(input,
"group_by_clause",
"Range/multi/path facet, \"" + col + "\", cannot be used in the GROUP BY clause.");
JSONArray cols = $or_column_name_list.json;
for (int i = 0; i < cols.length(); ++i) {
String col = cols.getString(i);
String[] facetInfo = _facetInfoMap.get(col);
if (facetInfo != null && (facetInfo[0].equals("range") ||
facetInfo[0].equals("multi") ||
facetInfo[0].equals("path"))) {
throw new FailedPredicateException(input,
"group_by_clause",
"Range/multi/path facet, \"" + col + "\", cannot be used in the GROUP BY clause.");
}
}
cols.put(col);
$json.put("columns", cols);
if (top != null) {
$json.put("top", Integer.parseInt(top.getText()));
Expand Down Expand Up @@ -1552,7 +1574,7 @@ null_predicate returns [JSONObject json]
}
;
value_list returns [Object json]
non_variable_value_list returns [Object json]
@init {
JSONArray jsonArray = new JSONArray();
}
Expand All @@ -1568,6 +1590,14 @@ value_list returns [Object json]
{
$json = jsonArray;
}
;
value_list returns [Object json]
: non_variable_value_list
{
$json = $non_variable_value_list.json;
}
| VARIABLE
{
$json = $VARIABLE.text;
Expand Down Expand Up @@ -1703,7 +1733,7 @@ facet_param_list returns [JSONObject json]
;
facet_param returns [String facet, JSONObject param]
: LPAR column_name COMMA STRING_LITERAL COMMA facet_param_type COMMA (val=value | valList=value_list) RPAR
: LPAR column_name COMMA STRING_LITERAL COMMA facet_param_type COMMA (val=value | valList=non_variable_value_list) RPAR
{
$facet = $column_name.text; // XXX Check error here?
try {
Expand Down
Expand Up @@ -64,6 +64,10 @@ public interface SenseiConfParams {
public static final String SERVER_BROKER_MAXTHREAD = "sensei.broker.maxThread";
public static final String SERVER_BROKER_MAXWAIT = "sensei.broker.maxWaittime";

public static final String SENSEI_BROKER_POLL_INTERVAL = "sensei.broker.pollInterval";
public static final String SENSEI_BROKER_MIN_RESPONSES = "sensei.broker.minResponses";
public static final String SENSEI_BROKER_MAX_TOTAL_WAIT = "sensei.broker.maxTotalWait";

public static final String SERVER_SEARCH_ROUTER_FACTORY = "sensei.search.router.factory";

public static final String SENSEI_INDEX_PRUNER = "sensei.index.pruner";
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.browseengine.bobo.facets.FacetHandlerInitializerParam;
import com.browseengine.bobo.facets.RuntimeFacetHandler;
import com.browseengine.bobo.facets.RuntimeFacetHandlerFactory;
import com.browseengine.bobo.facets.AbstractRuntimeFacetHandlerFactory;
import com.browseengine.bobo.facets.attribute.AttributesFacetHandler;
import com.browseengine.bobo.facets.data.PredefinedTermListFactory;
import com.browseengine.bobo.facets.data.TermListFactory;
Expand Down Expand Up @@ -273,14 +274,20 @@ private static <T extends Number> RuntimeFacetHandlerFactory<?,?> buildHistogram
final T end,
final T unit)
{
return new RuntimeFacetHandlerFactory<FacetHandlerInitializerParam, RuntimeFacetHandler<FacetDataNone>>()
return new AbstractRuntimeFacetHandlerFactory<FacetHandlerInitializerParam, RuntimeFacetHandler<FacetDataNone>>()
{
@Override
public RuntimeFacetHandler<FacetDataNone> get(FacetHandlerInitializerParam params)
{
return new HistogramFacetHandler<T>(name, dataHandler, start, end, unit);
};

@Override
public boolean isLoadLazily()
{
return true;
}

@Override
public String getName()
{
Expand Down Expand Up @@ -442,7 +449,7 @@ public static SenseiSystemInfo buildFacets(JSONObject schemaObj, SenseiPluginReg
final String depends = dependSet.iterator().next();
Assert.notEmpty(paramMap.get("range"), "Facet handler " + name + " should have at least one predefined range");

return new RuntimeFacetHandlerFactory<FacetHandlerInitializerParam, RuntimeFacetHandler<?>>() {
return new AbstractRuntimeFacetHandlerFactory<FacetHandlerInitializerParam, RuntimeFacetHandler<?>>() {

@Override
public String getName() {
Expand Down
Expand Up @@ -6,6 +6,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -44,6 +45,9 @@ public abstract class AbstractConsistentHashBroker<REQUEST extends AbstractSense
protected long _timeout = 8000;
protected final Serializer<REQUEST, RESULT> _serializer;
protected volatile SenseiLoadBalancer _loadBalancer;
protected final int _pollInterval;
protected final int _minResponses;
protected final int _maxTotalWait;

private static Timer ScatterTimer = null;
private static Timer GatherTimer = null;
Expand Down Expand Up @@ -85,12 +89,19 @@ public abstract class AbstractConsistentHashBroker<REQUEST extends AbstractSense
* @param scatterGatherHandler
* @throws NorbertException
*/
public AbstractConsistentHashBroker(PartitionedNetworkClient<Integer> networkClient, Serializer<REQUEST, RESULT> serializer)
public AbstractConsistentHashBroker(PartitionedNetworkClient<Integer> networkClient,
Serializer<REQUEST, RESULT> serializer,
int pollInterval,
int minResponses,
int maxTotalWait)
throws NorbertException
{
super(networkClient);
_loadBalancer = null;
_serializer = serializer;
_pollInterval = pollInterval;
_minResponses = minResponses;
_maxTotalWait = maxTotalWait;
}

public <T> T customizeRequest(REQUEST request)
Expand Down Expand Up @@ -190,58 +201,71 @@ protected RESULT doBrowse(PartitionedNetworkClient<Integer> networkClient, final
final Map<Integer, Node> nodeMap = new HashMap<Integer, Node>();
final Map<Integer, Future<RESULT>> futureMap = new HashMap<Integer, Future<RESULT>>();

try{
ScatterTimer.time(new Callable<Object>(){

@Override
public Object call() throws Exception {
for(int ni = 0; ni < parts.length; ni++)
{
Node node = searchNodes.nodelist[ni].get(searchNodes.nodegroup[ni]);
Set<Integer> pset = partsMap.get(node.getId());
if (pset == null)
{
pset = new HashSet<Integer>();
partsMap.put(node.getId(), pset);
}
pset.add(parts[ni]);
nodeMap.put(node.getId(), node);
}
try
{
ScatterTimer.time(new Callable<Object>() {
@Override
public Object call() throws Exception {
for(int ni = 0; ni < parts.length; ni++)
{
Node node = searchNodes.nodelist[ni].get(searchNodes.nodegroup[ni]);
Set<Integer> pset = partsMap.get(node.getId());
if (pset == null)
{
pset = new HashSet<Integer>();
partsMap.put(node.getId(), pset);
}
pset.add(parts[ni]);
nodeMap.put(node.getId(), node);
}

for (Map.Entry<Integer, Node> entry : nodeMap.entrySet())
{
req.setPartitions(partsMap.get(entry.getKey()));
req.saveState();
REQUEST thisRequest = customizeRequest(req);
if (logger.isDebugEnabled()){
logger.debug("broker sending req part: " + partsMap.get(entry.getKey()) + " on node: " + entry.getValue());
}
futureMap.put(entry.getKey(), (Future<RESULT>)_networkClient.sendRequestToNode(thisRequest, entry.getValue(), _serializer));
req.restoreState();
}
for(Map.Entry<Integer, Future<RESULT>> entry : futureMap.entrySet())
{
RESULT resp;
try
{
resp = entry.getValue().get(_timeout,TimeUnit.MILLISECONDS);
resultlist.add(resp);
if (logger.isDebugEnabled())
{
logger.debug("broker receiving res part: " + partsMap.get(entry.getKey()) + " on node: " + nodeMap.get(entry.getKey())
+ " node time: " + resp.getTime() +"ms remote time: " + (System.currentTimeMillis() - time) + "ms");
}
} catch (Exception e)
{
ErrorMeter.mark();
logger.error("broker receiving res part: " + partsMap.get(entry.getKey()) + " on node: " + nodeMap.get(entry.getKey())
+ e +" remote time: " + (System.currentTimeMillis() - time) + "ms");
}
}
return null;
}
for (Map.Entry<Integer, Node> entry : nodeMap.entrySet())
{
req.setPartitions(partsMap.get(entry.getKey()));
req.saveState();
REQUEST thisRequest = customizeRequest(req);
if (logger.isDebugEnabled()){
logger.debug("broker sending req part: " + partsMap.get(entry.getKey()) + " on node: " + entry.getValue());
}
futureMap.put(entry.getKey(), (Future<RESULT>)_networkClient.sendRequestToNode(thisRequest, entry.getValue(), _serializer));
req.restoreState();
}

int totalTime = 0;
int interval = _pollInterval;
int numResults = 0;
int totalTasks = futureMap.size();
int minRespExpected = (_minResponses < totalTasks) ? _minResponses : totalTasks;
while (numResults < minRespExpected ||
(numResults < totalTasks && totalTime < _maxTotalWait))
{
long startTime = System.currentTimeMillis();
Thread.sleep(interval); // Sleep for a small interval. May wake up much later.
totalTime += (System.currentTimeMillis() - startTime);
if (totalTime > _timeout)
{
logger.error("Hit hard timeout limit on broker.");
break;
}
Iterator itr = futureMap.entrySet().iterator();
while (itr.hasNext())
{
Map.Entry<Integer, Future<RESULT>> entry = (Map.Entry<Integer, Future<RESULT>>) itr.next();
Future<RESULT> futureRes = entry.getValue();
if (futureRes.isDone())
{
resultlist.add((RESULT) futureRes.get());
itr.remove();
numResults++;
}
}
}

logger.info("totalTime = " + totalTime + ", resultlist.size = " + resultlist.size());
return null;
}

});
});
}
catch(Exception e){
logger.error(e.getMessage(),e);
Expand Down

0 comments on commit f8079ca

Please sign in to comment.