Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'master' into release-1.1.2-rc

  • Loading branch information...
commit cb81c5f9b5b8375da8786eeb1b99a9985b4f1374 2 parents e0de805 + a074b3f
@alejandroperez alejandroperez authored
Showing with 416 additions and 282 deletions.
  1. +3 −0  .gitignore
  2. +43 −13 sensei-core/src/main/antlr3/com/senseidb/bql/parsers/BQL.g
  3. +4 −0 sensei-core/src/main/java/com/senseidb/conf/SenseiConfParams.java
  4. +9 −12 sensei-core/src/main/java/com/senseidb/conf/SenseiFacetHandlerBuilder.java
  5. +75 −51 sensei-core/src/main/java/com/senseidb/search/node/AbstractConsistentHashBroker.java
  6. +182 −177 sensei-core/src/main/java/com/senseidb/search/node/ResultMerger.java
  7. +8 −3 sensei-core/src/main/java/com/senseidb/search/node/SenseiBroker.java
  8. +9 −3 sensei-core/src/main/java/com/senseidb/search/node/SenseiSysBroker.java
  9. +5 −1 sensei-core/src/main/java/com/senseidb/search/query/filters/SenseiTermFilter.java
  10. +9 −3 sensei-core/src/main/java/com/senseidb/search/req/mapred/obsolete/MapReduceBroker.java
  11. +4 −2 sensei-core/src/main/java/com/senseidb/servlet/AbstractSenseiClientServlet.java
  12. +4 −2 sensei-core/src/main/java/com/senseidb/servlet/SenseiHttpInvokerServiceServlet.java
  13. +6 −0 sensei-core/src/main/java/com/senseidb/servlet/ZookeeperConfigurableServlet.java
  14. +8 −5 sensei-core/src/main/java/com/senseidb/svc/impl/ClusteredSenseiServiceImpl.java
  15. +2 −2 sensei-core/src/test/java/com/senseidb/test/SenseiStarter.java
  16. +15 −0 sensei-core/src/test/java/com/senseidb/test/bql/parsers/TestBQL.java
  17. +28 −0 sensei-core/src/test/java/com/senseidb/test/bql/parsers/TestErrorHandling.java
  18. +2 −8 sensei-core/src/test/java/com/senseidb/test/plugin/MockRuntimeFacetHandlerFactory.java
View
3  .gitignore
@@ -24,3 +24,6 @@ cscope.*
index
target
*.pyc
+*.iml
+*.ipr
+*.iws
View
56 sensei-core/src/main/antlr3/com/senseidb/bql/parsers/BQL.g
@@ -818,22 +818,44 @@ limit_clause returns [int offset, int count]
}
;
+or_column_name_list returns [JSONArray json]
+@init {
+ $json = new JSONArray();
+}
+ : col=column_name
+ {
+ String colName = $col.text;
+ if (colName != null) {
+ $json.put($col.text);
+ }
+ }
+ (OR col=column_name
+ {
+ colName = $col.text;
+ if (colName != null) {
+ $json.put($col.text);
+ }
+ }
+ )*
+ ;
+
group_by_clause returns [JSONObject json]
- : GROUP BY column_name (TOP top=INTEGER)?
+ : GROUP BY or_column_name_list (TOP top=INTEGER)?
{
$json = new JSONObject();
try {
- JSONArray cols = new JSONArray();
- String col = $column_name.text;
- String[] facetInfo = _facetInfoMap.get(col);
- if (facetInfo != null && (facetInfo[0].equals("range") ||
- facetInfo[0].equals("multi") ||
- facetInfo[0].equals("path"))) {
- throw new FailedPredicateException(input,
- "group_by_clause",
- "Range/multi/path facet, \"" + col + "\", cannot be used in the GROUP BY clause.");
+ JSONArray cols = $or_column_name_list.json;
+ for (int i = 0; i < cols.length(); ++i) {
+ String col = cols.getString(i);
+ String[] facetInfo = _facetInfoMap.get(col);
+ if (facetInfo != null && (facetInfo[0].equals("range") ||
+ facetInfo[0].equals("multi") ||
+ facetInfo[0].equals("path"))) {
+ throw new FailedPredicateException(input,
+ "group_by_clause",
+ "Range/multi/path facet, \"" + col + "\", cannot be used in the GROUP BY clause.");
+ }
}
- cols.put(col);
$json.put("columns", cols);
if (top != null) {
$json.put("top", Integer.parseInt(top.getText()));
@@ -1552,7 +1574,7 @@ null_predicate returns [JSONObject json]
}
;
-value_list returns [Object json]
+non_variable_value_list returns [Object json]
@init {
JSONArray jsonArray = new JSONArray();
}
@@ -1568,6 +1590,14 @@ value_list returns [Object json]
{
$json = jsonArray;
}
+ ;
+
+
+value_list returns [Object json]
+ : non_variable_value_list
+ {
+ $json = $non_variable_value_list.json;
+ }
| VARIABLE
{
$json = $VARIABLE.text;
@@ -1703,7 +1733,7 @@ facet_param_list returns [JSONObject json]
;
facet_param returns [String facet, JSONObject param]
- : LPAR column_name COMMA STRING_LITERAL COMMA facet_param_type COMMA (val=value | valList=value_list) RPAR
+ : LPAR column_name COMMA STRING_LITERAL COMMA facet_param_type COMMA (val=value | valList=non_variable_value_list) RPAR
{
$facet = $column_name.text; // XXX Check error here?
try {
View
4 sensei-core/src/main/java/com/senseidb/conf/SenseiConfParams.java
@@ -64,6 +64,10 @@
public static final String SERVER_BROKER_MAXTHREAD = "sensei.broker.maxThread";
public static final String SERVER_BROKER_MAXWAIT = "sensei.broker.maxWaittime";
+ public static final String SENSEI_BROKER_POLL_INTERVAL = "sensei.broker.pollInterval";
+ public static final String SENSEI_BROKER_MIN_RESPONSES = "sensei.broker.minResponses";
+ public static final String SENSEI_BROKER_MAX_TOTAL_WAIT = "sensei.broker.maxTotalWait";
+
public static final String SERVER_SEARCH_ROUTER_FACTORY = "sensei.search.router.factory";
public static final String SENSEI_INDEX_PRUNER = "sensei.index.pruner";
View
21 sensei-core/src/main/java/com/senseidb/conf/SenseiFacetHandlerBuilder.java
@@ -22,6 +22,7 @@
import com.browseengine.bobo.facets.FacetHandlerInitializerParam;
import com.browseengine.bobo.facets.RuntimeFacetHandler;
import com.browseengine.bobo.facets.RuntimeFacetHandlerFactory;
+import com.browseengine.bobo.facets.AbstractRuntimeFacetHandlerFactory;
import com.browseengine.bobo.facets.attribute.AttributesFacetHandler;
import com.browseengine.bobo.facets.data.PredefinedTermListFactory;
import com.browseengine.bobo.facets.data.TermListFactory;
@@ -273,7 +274,7 @@ else if ("double".equals(dataType))
final T end,
final T unit)
{
- return new RuntimeFacetHandlerFactory<FacetHandlerInitializerParam, RuntimeFacetHandler<FacetDataNone>>()
+ return new AbstractRuntimeFacetHandlerFactory<FacetHandlerInitializerParam, RuntimeFacetHandler<FacetDataNone>>()
{
@Override
public RuntimeFacetHandler<FacetDataNone> get(FacetHandlerInitializerParam params)
@@ -281,16 +282,17 @@ else if ("double".equals(dataType))
return new HistogramFacetHandler<T>(name, dataHandler, start, end, unit);
};
+ @Override
+ public boolean isLoadLazily()
+ {
+ return true;
+ }
+
@Override
public String getName()
{
return name;
}
-
- @Override
- public boolean isLoadLazily() {
- return false;
- }
};
}
@@ -447,7 +449,7 @@ public static SenseiSystemInfo buildFacets(JSONObject schemaObj, SenseiPluginReg
final String depends = dependSet.iterator().next();
Assert.notEmpty(paramMap.get("range"), "Facet handler " + name + " should have at least one predefined range");
- return new RuntimeFacetHandlerFactory<FacetHandlerInitializerParam, RuntimeFacetHandler<?>>() {
+ return new AbstractRuntimeFacetHandlerFactory<FacetHandlerInitializerParam, RuntimeFacetHandler<?>>() {
@Override
public String getName() {
@@ -455,11 +457,6 @@ public String getName() {
}
@Override
- public boolean isLoadLazily() {
- return false;
- }
-
- @Override
public RuntimeFacetHandler<?> get(FacetHandlerInitializerParam params) {
long overrideNow = -1;
try {
View
126 sensei-core/src/main/java/com/senseidb/search/node/AbstractConsistentHashBroker.java
@@ -6,6 +6,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -44,6 +45,9 @@
protected long _timeout = 8000;
protected final Serializer<REQUEST, RESULT> _serializer;
protected volatile SenseiLoadBalancer _loadBalancer;
+ protected final int _pollInterval;
+ protected final int _minResponses;
+ protected final int _maxTotalWait;
private static Timer ScatterTimer = null;
private static Timer GatherTimer = null;
@@ -85,12 +89,19 @@
* @param scatterGatherHandler
* @throws NorbertException
*/
- public AbstractConsistentHashBroker(PartitionedNetworkClient<Integer> networkClient, Serializer<REQUEST, RESULT> serializer)
+ public AbstractConsistentHashBroker(PartitionedNetworkClient<Integer> networkClient,
+ Serializer<REQUEST, RESULT> serializer,
+ int pollInterval,
+ int minResponses,
+ int maxTotalWait)
throws NorbertException
{
super(networkClient);
_loadBalancer = null;
_serializer = serializer;
+ _pollInterval = pollInterval;
+ _minResponses = minResponses;
+ _maxTotalWait = maxTotalWait;
}
public <T> T customizeRequest(REQUEST request)
@@ -190,58 +201,71 @@ protected RESULT doBrowse(PartitionedNetworkClient<Integer> networkClient, final
final Map<Integer, Node> nodeMap = new HashMap<Integer, Node>();
final Map<Integer, Future<RESULT>> futureMap = new HashMap<Integer, Future<RESULT>>();
- try{
- ScatterTimer.time(new Callable<Object>(){
-
- @Override
- public Object call() throws Exception {
- for(int ni = 0; ni < parts.length; ni++)
- {
- Node node = searchNodes.nodelist[ni].get(searchNodes.nodegroup[ni]);
- Set<Integer> pset = partsMap.get(node.getId());
- if (pset == null)
- {
- pset = new HashSet<Integer>();
- partsMap.put(node.getId(), pset);
- }
- pset.add(parts[ni]);
- nodeMap.put(node.getId(), node);
- }
+ try
+ {
+ ScatterTimer.time(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ for(int ni = 0; ni < parts.length; ni++)
+ {
+ Node node = searchNodes.nodelist[ni].get(searchNodes.nodegroup[ni]);
+ Set<Integer> pset = partsMap.get(node.getId());
+ if (pset == null)
+ {
+ pset = new HashSet<Integer>();
+ partsMap.put(node.getId(), pset);
+ }
+ pset.add(parts[ni]);
+ nodeMap.put(node.getId(), node);
+ }
- for (Map.Entry<Integer, Node> entry : nodeMap.entrySet())
- {
- req.setPartitions(partsMap.get(entry.getKey()));
- req.saveState();
- REQUEST thisRequest = customizeRequest(req);
- if (logger.isDebugEnabled()){
- logger.debug("broker sending req part: " + partsMap.get(entry.getKey()) + " on node: " + entry.getValue());
- }
- futureMap.put(entry.getKey(), (Future<RESULT>)_networkClient.sendRequestToNode(thisRequest, entry.getValue(), _serializer));
- req.restoreState();
- }
- for(Map.Entry<Integer, Future<RESULT>> entry : futureMap.entrySet())
- {
- RESULT resp;
- try
- {
- resp = entry.getValue().get(_timeout,TimeUnit.MILLISECONDS);
- resultlist.add(resp);
- if (logger.isDebugEnabled())
- {
- logger.debug("broker receiving res part: " + partsMap.get(entry.getKey()) + " on node: " + nodeMap.get(entry.getKey())
- + " node time: " + resp.getTime() +"ms remote time: " + (System.currentTimeMillis() - time) + "ms");
- }
- } catch (Exception e)
- {
- ErrorMeter.mark();
- logger.error("broker receiving res part: " + partsMap.get(entry.getKey()) + " on node: " + nodeMap.get(entry.getKey())
- + e +" remote time: " + (System.currentTimeMillis() - time) + "ms");
- }
- }
- return null;
- }
+ for (Map.Entry<Integer, Node> entry : nodeMap.entrySet())
+ {
+ req.setPartitions(partsMap.get(entry.getKey()));
+ req.saveState();
+ REQUEST thisRequest = customizeRequest(req);
+ if (logger.isDebugEnabled()){
+ logger.debug("broker sending req part: " + partsMap.get(entry.getKey()) + " on node: " + entry.getValue());
+ }
+ futureMap.put(entry.getKey(), (Future<RESULT>)_networkClient.sendRequestToNode(thisRequest, entry.getValue(), _serializer));
+ req.restoreState();
+ }
+
+ int totalTime = 0;
+ int interval = _pollInterval;
+ int numResults = 0;
+ int totalTasks = futureMap.size();
+ int minRespExpected = (_minResponses < totalTasks) ? _minResponses : totalTasks;
+ while (numResults < minRespExpected ||
+ (numResults < totalTasks && totalTime < _maxTotalWait))
+ {
+ long startTime = System.currentTimeMillis();
+ Thread.sleep(interval); // Sleep for a small interval. May wake up much later.
+ totalTime += (System.currentTimeMillis() - startTime);
+ if (totalTime > _timeout)
+ {
+ logger.error("Hit hard timeout limit on broker.");
+ break;
+ }
+ Iterator itr = futureMap.entrySet().iterator();
+ while (itr.hasNext())
+ {
+ Map.Entry<Integer, Future<RESULT>> entry = (Map.Entry<Integer, Future<RESULT>>) itr.next();
+ Future<RESULT> futureRes = entry.getValue();
+ if (futureRes.isDone())
+ {
+ resultlist.add((RESULT) futureRes.get());
+ itr.remove();
+ numResults++;
+ }
+ }
+ }
+
+ logger.info("totalTime = " + totalTime + ", resultlist.size = " + resultlist.size());
+ return null;
+ }
- });
+ });
}
catch(Exception e){
logger.error(e.getMessage(),e);
View
359 sensei-core/src/main/java/com/senseidb/search/node/ResultMerger.java
@@ -594,7 +594,7 @@ public static SenseiResult merge(final SenseiRequest req, Collection<SenseiResul
List<SenseiHit> hitsList = new ArrayList<SenseiHit>(req.getCount());
Iterator<SenseiHit> mergedIter = ListMerger.mergeLists(iteratorList, comparator);
int offsetLeft = req.getOffset();
- if (groupAccessibles == null)
+ if (!hasSortCollector)
{
Map<Object, SenseiHit>[] groupHitMaps = new Map[req.getGroupBy().length];
for (int i=0; i<groupHitMaps.length; ++i)
@@ -651,16 +651,16 @@ else if (hitsList.size()<req.getCount())
}
else
{
- CombinedFacetAccessible[] combinedFacetAccessibles = new CombinedFacetAccessible[groupAccessibles.length];
- Set<Object>[] groupSets = new Set[groupAccessibles.length];
- for (int i=0; i<groupAccessibles.length; ++i)
- {
- combinedFacetAccessibles[i] = new CombinedFacetAccessible(new FacetSpec(), groupAccessibles[i]);
- groupSets[i] = new HashSet<Object>(topHits);
- }
MyScoreDoc pre = null;
- if (topHits > 0 && combinedFacetAccessibles.length > 1 && hasSortCollector)
+ if (topHits > 0 && groupAccessibles != null && groupAccessibles.length > 1)
{
+ CombinedFacetAccessible[] combinedFacetAccessibles = new CombinedFacetAccessible[groupAccessibles.length];
+ Set<Object>[] groupSets = new Set[groupAccessibles.length];
+ for (int i=0; i<groupAccessibles.length; ++i)
+ {
+ combinedFacetAccessibles[i] = new CombinedFacetAccessible(new FacetSpec(), groupAccessibles[i]);
+ groupSets[i] = new HashSet<Object>(topHits);
+ }
totalDocs = 0;
MyScoreDoc tmpScoreDoc = new MyScoreDoc(0, 0.0f, 0, null);
MyScoreDoc bottom = null;
@@ -686,6 +686,7 @@ public Comparable value(ScoreDoc doc)
for (SenseiResult res : results)
{
SortCollector sortCollector = res.getSortCollector();
+ if (sortCollector == null) continue;
Iterator<CollectorContext> contextIter = sortCollector.contextList.iterator();
CollectorContext currentContext = null;
int contextLeft = 0;
@@ -841,6 +842,8 @@ else if (rawGroupValueType[j] == 0)
}
else
{
+ Set<Object>[] groupSets = new Set[1];
+ groupSets[0] = new HashSet<Object>(topHits);
while(mergedIter.hasNext())
{
SenseiHit hit = mergedIter.next();
@@ -875,17 +878,6 @@ else if (rawGroupValueType[i] == 0)
i = -1;
break;
}
-
- if (rawGroupValueType[i] == 2)
- {
- if (combinedFacetAccessibles[i].getCappedFacetCount(primitiveLongArrayWrapperTmp.data, 2) != 1)
- break;
- }
- else
- {
- if (combinedFacetAccessibles[i].getCappedFacetCount(rawGroupValue, 2) != 1)
- break;
- }
}
if (i >= 0)
{
@@ -897,7 +889,7 @@ else if (rawGroupValueType[i] == 0)
if (offsetLeft > 0)
--offsetLeft;
else {
- hit.setGroupHitsCount(combinedFacetAccessibles[i].getFacetHitsCount(hit.getRawGroupValue()));
+ //hit.setGroupHitsCount(combinedFacetAccessibles[i].getFacetHitsCount(hit.getRawGroupValue()));
hitsList.add(hit);
if (hitsList.size() >= req.getCount())
break;
@@ -909,220 +901,233 @@ else if (rawGroupValueType[i] == 0)
}
}
}
- for (int i=0; i<combinedFacetAccessibles.length; ++i) combinedFacetAccessibles[i].close();
+ //for (int i=0; i<combinedFacetAccessibles.length; ++i) combinedFacetAccessibles[i].close();
}
hits = hitsList.toArray(new SenseiHit[hitsList.size()]);
- if (req.getMaxPerGroup() > 1 || req.getGroupBy().length > 1)
+ Map<Object, HitWithGroupQueue>[] groupMaps = new Map[req.getGroupBy().length];
+ for (int i=0; i<groupMaps.length; ++i)
{
- Map<Object, HitWithGroupQueue>[] groupMaps = new Map[req.getGroupBy().length];
- for (int i=0; i<groupMaps.length; ++i)
+ groupMaps[i] = new HashMap<Object, HitWithGroupQueue>(hits.length*2);
+ }
+ for (SenseiHit hit : hits)
+ {
+ rawGroupValue = hit.getRawField(req.getGroupBy()[hit.getGroupPosition()]);
+
+ if (rawGroupValueType[hit.getGroupPosition()] == 2)
{
- groupMaps[i] = new HashMap<Object, HitWithGroupQueue>(hits.length*2);
+ rawGroupValue = new PrimitiveLongArrayWrapper((long[])rawGroupValue);
}
- for (SenseiHit hit : hits)
+ else if (rawGroupValueType[hit.getGroupPosition()] == 0)
{
- rawGroupValue = hit.getRawField(req.getGroupBy()[hit.getGroupPosition()]);
-
- if (rawGroupValueType[hit.getGroupPosition()] == 2)
+ if (rawGroupValue != null)
{
- rawGroupValue = new PrimitiveLongArrayWrapper((long[])rawGroupValue);
- }
- else if (rawGroupValueType[hit.getGroupPosition()] == 0)
- {
- if (rawGroupValue != null)
+ if (rawGroupValue instanceof long[])
{
- if (rawGroupValue instanceof long[])
- {
- rawGroupValueType[hit.getGroupPosition()] = 2;
- rawGroupValue = new PrimitiveLongArrayWrapper((long[])rawGroupValue);
- }
- else
- rawGroupValueType[hit.getGroupPosition()] = 1;
+ rawGroupValueType[hit.getGroupPosition()] = 2;
+ rawGroupValue = new PrimitiveLongArrayWrapper((long[])rawGroupValue);
}
+ else
+ rawGroupValueType[hit.getGroupPosition()] = 1;
}
+ }
- groupMaps[hit.getGroupPosition()].put(rawGroupValue, new HitWithGroupQueue(hit, new PriorityQueue<MyScoreDoc>()
- {
- private int r;
+ groupMaps[hit.getGroupPosition()].put(rawGroupValue, new HitWithGroupQueue(hit, new PriorityQueue<MyScoreDoc>()
+ {
+ private int r;
- {
- this.initialize(req.getMaxPerGroup() <= 1? 0:req.getMaxPerGroup());
- }
+ {
+ this.initialize(req.getMaxPerGroup() <= 1? 0:req.getMaxPerGroup());
+ }
- protected boolean lessThan(MyScoreDoc a, MyScoreDoc b)
- {
- r = a.sortValue.compareTo(b.sortValue);
- if (r>0)
- return true;
- else if (r<0)
- return false;
- else
- return (a.finalDoc > b.finalDoc);
- }
+ protected boolean lessThan(MyScoreDoc a, MyScoreDoc b)
+ {
+ r = a.sortValue.compareTo(b.sortValue);
+ if (r>0)
+ return true;
+ else if (r<0)
+ return false;
+ else
+ return (a.finalDoc > b.finalDoc);
}
- ));
- }
+ }
+ ));
+ }
- MyScoreDoc tmpScoreDoc = null;
- int doc = 0;
- float score = 0.0f;
- HitWithGroupQueue hitWithGroupQueue = null;
+ MyScoreDoc tmpScoreDoc = null;
+ int doc = 0;
+ float score = 0.0f;
+ HitWithGroupQueue hitWithGroupQueue = null;
- totalDocs = 0;
- for (SenseiResult res : results)
+ totalDocs = 0;
+ for (SenseiResult res : results)
+ {
+ if (hasSortCollector)
{
- if (hasSortCollector)
- {
- SortCollector sortCollector = res.getSortCollector();
- Iterator<CollectorContext> contextIter = sortCollector.contextList.iterator();
- CollectorContext currentContext = null;
- int contextLeft = 0;
- FacetDataCache[] dataCaches = new FacetDataCache[sortCollector.groupByMulti.length];
- while (contextIter.hasNext()) {
- currentContext = contextIter.next();
- contextLeft = currentContext.length;
- if (contextLeft > 0)
- {
- for (int j=0; j<sortCollector.groupByMulti.length; ++j)
- dataCaches[j] = (FacetDataCache)sortCollector.groupByMulti[j].getFacetData(currentContext.reader);
- break;
- }
+ SortCollector sortCollector = res.getSortCollector();
+ if (sortCollector == null) continue;
+ Iterator<CollectorContext> contextIter = sortCollector.contextList.iterator();
+ CollectorContext currentContext = null;
+ int contextLeft = 0;
+ FacetDataCache[] dataCaches = new FacetDataCache[sortCollector.groupByMulti.length];
+ while (contextIter.hasNext()) {
+ currentContext = contextIter.next();
+ contextLeft = currentContext.length;
+ if (contextLeft > 0)
+ {
+ for (int j=0; j<sortCollector.groupByMulti.length; ++j)
+ dataCaches[j] = (FacetDataCache)sortCollector.groupByMulti[j].getFacetData(currentContext.reader);
+ break;
}
+ }
- Iterator<float[]> scoreArrayIter = sortCollector.scorearraylist != null ? sortCollector.scorearraylist.iterator():null;
- if (contextLeft > 0)
+ Iterator<float[]> scoreArrayIter = sortCollector.scorearraylist != null ? sortCollector.scorearraylist.iterator():null;
+ if (contextLeft > 0)
+ {
+ for (int[] docs : sortCollector.docidarraylist)
{
- for (int[] docs : sortCollector.docidarraylist)
+ float[] scores = scoreArrayIter != null ? scoreArrayIter.next():null;
+ for (int i=0; i<SortCollector.BLOCK_SIZE; ++i)
{
- float[] scores = scoreArrayIter != null ? scoreArrayIter.next():null;
- for (int i=0; i<SortCollector.BLOCK_SIZE; ++i)
+ doc = docs[i];
+ score = scores != null ? scores[i]:0.0f;
+ int j=0;
+ for (; j<sortCollector.groupByMulti.length; ++j)
{
- doc = docs[i];
- score = scores != null ? scores[i]:0.0f;
- int j=0;
- for (; j<sortCollector.groupByMulti.length; ++j)
- {
- rawGroupValue = dataCaches[j].valArray.getRawValue(dataCaches[j].orderArray.get(doc));
+ rawGroupValue = dataCaches[j].valArray.getRawValue(dataCaches[j].orderArray.get(doc));
- if (rawGroupValueType[j] == 2)
- {
- primitiveLongArrayWrapperTmp.data = (long[])rawGroupValue;
- rawGroupValue = primitiveLongArrayWrapperTmp;
- }
- else if (rawGroupValueType[j] == 0)
+ if (rawGroupValueType[j] == 2)
+ {
+ primitiveLongArrayWrapperTmp.data = (long[])rawGroupValue;
+ rawGroupValue = primitiveLongArrayWrapperTmp;
+ }
+ else if (rawGroupValueType[j] == 0)
+ {
+ if (rawGroupValue != null)
{
- if (rawGroupValue != null)
+ if (rawGroupValue instanceof long[])
{
- if (rawGroupValue instanceof long[])
- {
- rawGroupValueType[j] = 2;
- primitiveLongArrayWrapperTmp.data = (long[])rawGroupValue;
- rawGroupValue = primitiveLongArrayWrapperTmp;
- }
- else
- rawGroupValueType[j] = 1;
+ rawGroupValueType[j] = 2;
+ primitiveLongArrayWrapperTmp.data = (long[])rawGroupValue;
+ rawGroupValue = primitiveLongArrayWrapperTmp;
}
- }
-
- hitWithGroupQueue = groupMaps[j].get(rawGroupValue);
- if (hitWithGroupQueue != null)
- {
- hitWithGroupQueue.hit.setGroupHitsCount(hitWithGroupQueue.hit.getGroupHitsCount() + 1);
- // Collect this hit.
- if (tmpScoreDoc == null)
- tmpScoreDoc = new MyScoreDoc(doc, score, currentContext.base + totalDocs + doc, currentContext.reader);
else
- {
- tmpScoreDoc.doc = doc;
- tmpScoreDoc.score = score;
- tmpScoreDoc.finalDoc = currentContext.base + totalDocs + doc;
- tmpScoreDoc.reader = currentContext.reader;
- }
- tmpScoreDoc.sortValue = currentContext.comparator.value(tmpScoreDoc);
- tmpScoreDoc.groupPos = j;
- tmpScoreDoc.rawGroupValue = rawGroupValue;
- tmpScoreDoc = hitWithGroupQueue.queue.insertWithOverflow(tmpScoreDoc);
- break;
+ rawGroupValueType[j] = 1;
}
}
- --contextLeft;
- if (contextLeft <= 0)
+
+ hitWithGroupQueue = groupMaps[j].get(rawGroupValue);
+ if (hitWithGroupQueue != null)
{
- while (contextIter.hasNext()) {
- currentContext = contextIter.next();
- contextLeft = currentContext.length;
- if (contextLeft > 0)
- {
- for (j=0; j<sortCollector.groupByMulti.length; ++j)
- dataCaches[j] = (FacetDataCache)sortCollector.groupByMulti[j].getFacetData(currentContext.reader);
- break;
- }
+ hitWithGroupQueue.hit.setGroupHitsCount(hitWithGroupQueue.hit.getGroupHitsCount() + 1);
+ // Collect this hit.
+ if (tmpScoreDoc == null)
+ tmpScoreDoc = new MyScoreDoc(doc, score, currentContext.base + totalDocs + doc, currentContext.reader);
+ else
+ {
+ tmpScoreDoc.doc = doc;
+ tmpScoreDoc.score = score;
+ tmpScoreDoc.finalDoc = currentContext.base + totalDocs + doc;
+ tmpScoreDoc.reader = currentContext.reader;
}
- if (contextLeft <= 0) // No more docs left.
+ tmpScoreDoc.sortValue = currentContext.comparator.value(tmpScoreDoc);
+ tmpScoreDoc.groupPos = j;
+ tmpScoreDoc.rawGroupValue = rawGroupValue;
+ tmpScoreDoc = hitWithGroupQueue.queue.insertWithOverflow(tmpScoreDoc);
+ break;
+ }
+ }
+ --contextLeft;
+ if (contextLeft <= 0)
+ {
+ while (contextIter.hasNext()) {
+ currentContext = contextIter.next();
+ contextLeft = currentContext.length;
+ if (contextLeft > 0)
+ {
+ for (j=0; j<sortCollector.groupByMulti.length; ++j)
+ dataCaches[j] = (FacetDataCache)sortCollector.groupByMulti[j].getFacetData(currentContext.reader);
break;
+ }
}
+ if (contextLeft <= 0) // No more docs left.
+ break;
}
}
}
- sortCollector.close();
}
- else
+ sortCollector.close();
+ }
+ else
+ {
+ if (res.getSenseiHits() != null)
{
- if (res.getSenseiHits() != null)
+ for (SenseiHit hit : res.getSenseiHits())
{
- for (SenseiHit hit : res.getSenseiHits())
+ if (hit.getGroupHits() != null)
{
- if (hit.getGroupHits() != null)
+ rawGroupValue = hit.getRawGroupValue();
+ if (rawGroupValueType[hit.getGroupPosition()] == 2)
{
- rawGroupValue = hit.getRawGroupValue();
- if (rawGroupValueType[hit.getGroupPosition()] == 2)
- {
- primitiveLongArrayWrapperTmp.data = (long[])rawGroupValue;
- rawGroupValue = primitiveLongArrayWrapperTmp;
- }
-
- hitWithGroupQueue = groupMaps[hit.getGroupPosition()].get(rawGroupValue);
- if (hitWithGroupQueue != null)
- hitWithGroupQueue.iterList.add(Arrays.asList(hit.getSenseiGroupHits()).iterator());
+ primitiveLongArrayWrapperTmp.data = (long[])rawGroupValue;
+ rawGroupValue = primitiveLongArrayWrapperTmp;
}
+
+ hitWithGroupQueue = groupMaps[hit.getGroupPosition()].get(rawGroupValue);
+ if (hitWithGroupQueue != null)
+ hitWithGroupQueue.iterList.add(Arrays.asList(hit.getSenseiGroupHits()).iterator());
}
}
}
- totalDocs += res.getTotalDocs();
}
+ totalDocs += res.getTotalDocs();
+ }
- if (hasSortCollector)
+ if (hasSortCollector)
+ {
+ for (Map<Object, HitWithGroupQueue> map : groupMaps)
{
- for (Map<Object, HitWithGroupQueue> map : groupMaps)
+ for (HitWithGroupQueue hwg : map.values())
{
- for (HitWithGroupQueue hwg : map.values())
+ int index = hwg.queue.size() - 1;
+ if (index >= 0)
{
- int index = hwg.queue.size() - 1;
- if (index >= 0)
+ SenseiHit[] groupHits = new SenseiHit[index+1];
+ while (index >=0)
{
- SenseiHit[] groupHits = new SenseiHit[index+1];
- while (index >=0)
- {
- groupHits[index] = hwg.queue.pop().getSenseiHit(req);
- --index;
- }
- hwg.hit.setGroupHits(groupHits);
+ groupHits[index] = hwg.queue.pop().getSenseiHit(req);
+ --index;
}
+ hwg.hit.setGroupHits(groupHits);
}
}
}
- else
+ }
+ else
+ {
+ for (Map<Object, HitWithGroupQueue> map : groupMaps)
{
- for (Map<Object, HitWithGroupQueue> map : groupMaps)
+ for (HitWithGroupQueue hwg : map.values())
{
- for (HitWithGroupQueue hwg : map.values())
- {
- List<SenseiHit> mergedList = ListMerger.mergeLists(0, req.getMaxPerGroup(), hwg.iterList
- .toArray(new Iterator[hwg.iterList.size()]), comparator);
- SenseiHit[] groupHits = mergedList.toArray(new SenseiHit[mergedList.size()]);
- hwg.hit.setGroupHits(groupHits);
- }
+ List<SenseiHit> mergedList = ListMerger.mergeLists(0, req.getMaxPerGroup(), hwg.iterList
+ .toArray(new Iterator[hwg.iterList.size()]), comparator);
+ SenseiHit[] groupHits = mergedList.toArray(new SenseiHit[mergedList.size()]);
+ hwg.hit.setGroupHits(groupHits);
+ }
+ }
+ }
+ }
+
+ if (groupAccessibles != null)
+ {
+ for (List<FacetAccessible> list : groupAccessibles)
+ {
+ if (list != null)
+ {
+ for (FacetAccessible acc : list)
+ {
+ if (acc != null)
+ acc.close();
}
}
}
View
11 sensei-core/src/main/java/com/senseidb/search/node/SenseiBroker.java
@@ -38,10 +38,15 @@
private long _timeoutMillis = TIMEOUT_MILLIS;
private final SenseiLoadBalancerFactory _loadBalancerFactory;
- public SenseiBroker(PartitionedNetworkClient<Integer> networkClient, ClusterClient clusterClient,
- SenseiLoadBalancerFactory loadBalancerFactory) throws NorbertException
+ public SenseiBroker(PartitionedNetworkClient<Integer> networkClient,
+ ClusterClient clusterClient,
+ SenseiLoadBalancerFactory loadBalancerFactory,
+ int pollInterval,
+ int minResponses,
+ int maxTotalWait)
+ throws NorbertException
{
- super(networkClient, CoreSenseiServiceImpl.PROTO_SERIALIZER);
+ super(networkClient, CoreSenseiServiceImpl.PROTO_SERIALIZER, pollInterval, minResponses, maxTotalWait);
_loadBalancerFactory = loadBalancerFactory;
clusterClient.addListener(this);
logger.info("created broker instance " + networkClient + " " + clusterClient + " " + loadBalancerFactory);
View
12 sensei-core/src/main/java/com/senseidb/search/node/SenseiSysBroker.java
@@ -36,10 +36,16 @@
protected Set<Node> _nodes = Collections.EMPTY_SET;
- public SenseiSysBroker(PartitionedNetworkClient<Integer> networkClient, ClusterClient clusterClient,
- SenseiLoadBalancerFactory loadBalancerFactory, Comparator<String> versionComparator) throws NorbertException
+ public SenseiSysBroker(PartitionedNetworkClient<Integer> networkClient,
+ ClusterClient clusterClient,
+ SenseiLoadBalancerFactory loadBalancerFactory,
+ Comparator<String> versionComparator,
+ int pollInterval,
+ int minResponses,
+ int maxTotalWait)
+ throws NorbertException
{
- super(networkClient, SysSenseiCoreServiceImpl.PROTO_SERIALIZER);
+ super(networkClient, SysSenseiCoreServiceImpl.PROTO_SERIALIZER, pollInterval, minResponses, maxTotalWait);
_versionComparator = versionComparator;
_loadBalancerFactory = loadBalancerFactory;
clusterClient.addListener(this);
View
6 sensei-core/src/main/java/com/senseidb/search/query/filters/SenseiTermFilter.java
@@ -110,7 +110,11 @@ public DocIdSet getDocIdSet(IndexReader reader) throws IOException {
if (reader instanceof BoboIndexReader){
BoboIndexReader boboReader = (BoboIndexReader)reader;
FacetHandler facetHandler = (FacetHandler)boboReader.getFacetHandler(_name);
- Object obj = facetHandler.getFacetData(boboReader);
+ Object obj = null;
+ if (facetHandler != null)
+ {
+ obj = facetHandler.getFacetData(boboReader);
+ }
if (obj!=null && obj instanceof FacetDataCache){
FacetDataCache facetData = (FacetDataCache)obj;
TermValueList valArray = facetData.valArray;
View
12 sensei-core/src/main/java/com/senseidb/search/req/mapred/obsolete/MapReduceBroker.java
@@ -21,9 +21,15 @@
private final SenseiLoadBalancerFactory loadBalancerFactory;
private long _timeoutMillis;
- public MapReduceBroker(PartitionedNetworkClient<Integer> networkClient, ClusterClient clusterClient,
- SenseiLoadBalancerFactory loadBalancerFactory) throws NorbertException {
- super(networkClient, MapReduceSenseiService.SERIALIZER);
+ public MapReduceBroker(PartitionedNetworkClient<Integer> networkClient,
+ ClusterClient clusterClient,
+ SenseiLoadBalancerFactory loadBalancerFactory,
+ int pollInterval,
+ int minResponses,
+ int maxTotalWait)
+ throws NorbertException
+ {
+ super(networkClient, MapReduceSenseiService.SERIALIZER, pollInterval, minResponses, maxTotalWait);
this.loadBalancerFactory = loadBalancerFactory;
clusterClient.addListener(this);
logger.info("created broker instance " + networkClient + " " + clusterClient + " " + loadBalancerFactory);
View
6 sensei-core/src/main/java/com/senseidb/servlet/AbstractSenseiClientServlet.java
@@ -83,8 +83,10 @@ public void init(ServletConfig config) throws ServletException {
_networkClientConfig.setClusterClient(_clusterClient);
_networkClient = new SenseiNetworkClient(_networkClientConfig, null);
- _senseiBroker = new SenseiBroker(_networkClient, _clusterClient, loadBalancerFactory);
- _senseiSysBroker = new SenseiSysBroker(_networkClient, _clusterClient, loadBalancerFactory, versionComparator);
+ _senseiBroker = new SenseiBroker(_networkClient, _clusterClient, loadBalancerFactory,
+ pollInterval, minResponses, maxTotalWait);
+ _senseiSysBroker = new SenseiSysBroker(_networkClient, _clusterClient, loadBalancerFactory, versionComparator,
+ pollInterval, minResponses, maxTotalWait);
logger.info("Connecting to cluster: "+clusterName+" ...");
_clusterClient.awaitConnectionUninterruptibly();
View
6 sensei-core/src/main/java/com/senseidb/servlet/SenseiHttpInvokerServiceServlet.java
@@ -27,8 +27,10 @@ public void init(ServletConfig config) throws ServletException {
super.init(config);
innerSvc = new ClusteredSenseiServiceImpl(zkurl, zkTimeout, clusterClientName, clusterName, connectTimeoutMillis,
- writeTimeoutMillis, maxConnectionsPerNode, staleRequestTimeoutMins, staleRequestCleanupFrequencyMins,
- loadBalancerFactory, versionComparator);
+ writeTimeoutMillis, maxConnectionsPerNode, staleRequestTimeoutMins,
+ staleRequestCleanupFrequencyMins,
+ loadBalancerFactory, versionComparator,
+ pollInterval, minResponses, maxTotalWait);
innerSvc.start();
target = new HttpInvokerServiceExporter();
target.setService(innerSvc);
View
6 sensei-core/src/main/java/com/senseidb/servlet/ZookeeperConfigurableServlet.java
@@ -29,6 +29,9 @@
protected int staleRequestCleanupFrequencyMins;
protected SenseiLoadBalancerFactory loadBalancerFactory;
protected Comparator<String> versionComparator;
+ protected int pollInterval;
+ protected int minResponses;
+ protected int maxTotalWait;
@Override
public void init(ServletConfig config) throws ServletException {
@@ -56,5 +59,8 @@ public void init(ServletConfig config) throws ServletException {
versionComparator = (Comparator<String>)ctx.getAttribute(SenseiConfigServletContextListener.SENSEI_CONF_VERSION_COMPARATOR);
loadBalancerFactory = (SenseiLoadBalancerFactory)ctx.getAttribute(
SenseiConfigServletContextListener.SENSEI_CONF_ROUTER_FACTORY);
+ pollInterval = senseiConf.getInt(SenseiConfParams.SENSEI_BROKER_POLL_INTERVAL, 2);
+ minResponses = senseiConf.getInt(SenseiConfParams.SENSEI_BROKER_MIN_RESPONSES, 2);
+ maxTotalWait = senseiConf.getInt(SenseiConfParams.SENSEI_BROKER_MAX_TOTAL_WAIT, 200);
}
}
View
13 sensei-core/src/main/java/com/senseidb/svc/impl/ClusteredSenseiServiceImpl.java
@@ -29,9 +29,10 @@
private final String _clusterName;
public ClusteredSenseiServiceImpl(String zkurl,int zkTimeout,String clusterClientName, String clusterName, int connectTimeoutMillis,
- int writeTimeoutMillis, int maxConnectionsPerNode, int staleRequestTimeoutMins,
- int staleRequestCleanupFrequencyMins, SenseiLoadBalancerFactory loadBalancerFactory,
- Comparator<String> versionComparator) {
+ int writeTimeoutMillis, int maxConnectionsPerNode, int staleRequestTimeoutMins,
+ int staleRequestCleanupFrequencyMins, SenseiLoadBalancerFactory loadBalancerFactory,
+ Comparator<String> versionComparator, int pollInterval, int minResponses, int maxTotalWait)
+ {
_clusterName = clusterName;
_networkClientConfig.setServiceName(clusterName);
_networkClientConfig.setZooKeeperConnectString(zkurl);
@@ -47,8 +48,10 @@ public ClusteredSenseiServiceImpl(String zkurl,int zkTimeout,String clusterClien
_networkClientConfig.setClusterClient(_clusterClient);
_networkClient = new SenseiNetworkClient(_networkClientConfig,null);
- _senseiBroker = new SenseiBroker(_networkClient, _clusterClient, loadBalancerFactory);
- _senseiSysBroker = new SenseiSysBroker(_networkClient, _clusterClient, loadBalancerFactory, versionComparator);
+ _senseiBroker = new SenseiBroker(_networkClient, _clusterClient, loadBalancerFactory,
+ pollInterval, minResponses, maxTotalWait);
+ _senseiSysBroker = new SenseiSysBroker(_networkClient, _clusterClient, loadBalancerFactory, versionComparator,
+ pollInterval, minResponses, maxTotalWait);
}
public void start(){
View
4 sensei-core/src/test/java/com/senseidb/test/SenseiStarter.java
@@ -91,9 +91,9 @@ public static synchronized void start(String confDir1, String confDir2) {
broker = null;
try
{
- broker = new SenseiBroker(networkClient, clusterClient, loadBalancerFactory);
+ broker = new SenseiBroker(networkClient, clusterClient, loadBalancerFactory, 1, 2, 2000);
broker.setTimeoutMillis(0);
- mapReduceBroker = new MapReduceBroker(networkClient, clusterClient, loadBalancerFactory);
+ mapReduceBroker = new MapReduceBroker(networkClient, clusterClient, loadBalancerFactory, 1, 2, 2000);
broker.setTimeoutMillis(0);
} catch (NorbertException ne) {
logger.info("shutting down cluster...", ne);
View
15 sensei-core/src/test/java/com/senseidb/test/bql/parsers/TestBQL.java
@@ -159,6 +159,21 @@ public void testGroupBy2() throws Exception
}
@Test
+ public void testGroupByOrColumns() throws Exception
+ {
+ System.out.println("testGroupByOrColumns");
+ System.out.println("==================================================");
+
+ JSONObject json = _compiler.compile(
+ "SELECT category " +
+ "FROM cars " +
+ "GROUP BY color OR category TOP 5"
+ );
+ JSONObject expected = new JSONObject("{\"groupBy\":{\"columns\":[\"color\",\"category\"],\"top\":5},\"meta\":{\"select_list\":[\"category\"]}}");
+ assertTrue(_comp.isEquals(json, expected));
+ }
+
+ @Test
public void testEqualPredInteger() throws Exception
{
System.out.println("testEqualPredInteger");
View
28 sensei-core/src/test/java/com/senseidb/test/bql/parsers/TestErrorHandling.java
@@ -736,6 +736,34 @@ public void testBadGroupBy() throws Exception
}
}
+ @Test
+ public void testBadGroupBy2() throws Exception
+ {
+ System.out.println("testBadGroupBy2");
+ System.out.println("==================================================");
+
+ boolean caughtException = false;
+ try
+ {
+ JSONObject json = _compiler.compile(
+ "select category, tags \n" +
+ "from cars \n" +
+ "group by color OR year \n" +
+ "order by color \n"
+ );
+ }
+ catch (RecognitionException err)
+ {
+ assertEquals("[line:4, col:0] Range/multi/path facet, \"year\", cannot be used in the GROUP BY clause. (token=order)",
+ _compiler.getErrorMessage(err));
+ caughtException = true;
+ }
+ finally
+ {
+ assertTrue(caughtException);
+ }
+ }
+
// @Test
// public void testConflictSelections() throws Exception
// {
View
10 sensei-core/src/test/java/com/senseidb/test/plugin/MockRuntimeFacetHandlerFactory.java
@@ -2,9 +2,9 @@
import com.browseengine.bobo.facets.FacetHandlerInitializerParam;
import com.browseengine.bobo.facets.RuntimeFacetHandler;
-import com.browseengine.bobo.facets.RuntimeFacetHandlerFactory;
+import com.browseengine.bobo.facets.AbstractRuntimeFacetHandlerFactory;
-public class MockRuntimeFacetHandlerFactory implements RuntimeFacetHandlerFactory<FacetHandlerInitializerParam, RuntimeFacetHandler<?>> {
+public class MockRuntimeFacetHandlerFactory extends AbstractRuntimeFacetHandlerFactory<FacetHandlerInitializerParam, RuntimeFacetHandler<?>> {
@Override
public String getName() {
@@ -16,10 +16,4 @@ public String getName() {
return null;
}
-
- @Override
- public boolean isLoadLazily() {
- return false;
- }
-
}
Please sign in to comment.
Something went wrong with that request. Please try again.