From adf9d0cc86c0a8d7428de58499abb544b0adf815 Mon Sep 17 00:00:00 2001 From: vzhabiuk Date: Fri, 8 Jun 2012 14:12:29 -0700 Subject: [PATCH] Added the support of facet counts for the map reduce --- .../bobo/mapred/BoboMapFunctionWrapper.java | 28 +++++++++++++++++-- .../bobo/mapred/MapReduceResult.java | 4 +++ .../bobo/search/BoboSearcher2.java | 18 ++++++++---- 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/bobo-browse/src/main/java/com/browseengine/bobo/mapred/BoboMapFunctionWrapper.java b/bobo-browse/src/main/java/com/browseengine/bobo/mapred/BoboMapFunctionWrapper.java index 47d90a16..b15f0afc 100644 --- a/bobo-browse/src/main/java/com/browseengine/bobo/mapred/BoboMapFunctionWrapper.java +++ b/bobo-browse/src/main/java/com/browseengine/bobo/mapred/BoboMapFunctionWrapper.java @@ -1,11 +1,35 @@ package com.browseengine.bobo.mapred; +import java.util.List; + import com.browseengine.bobo.api.BoboIndexReader; +import com.browseengine.bobo.facets.FacetCountCollector; +/** + * Is the part of the bobo request, that maintains the map result intermediate state + * + */ public interface BoboMapFunctionWrapper { - public void mapFullIndexReader(BoboIndexReader reader); + /** + * When there is no filter, map reduce will try to map the entire segment + * @param reader + */ + public void mapFullIndexReader(BoboIndexReader reader, FacetCountCollector[] facetCountCollectors); + /** + * The basic callback method for a single doc + * @param docId + * @param reader + */ public void mapSingleDocument(int docId, BoboIndexReader reader); - public void finalizeSegment(BoboIndexReader reader); + /** + * The callback method, after the segment was processed + * @param reader + */ + public void finalizeSegment(BoboIndexReader reader, FacetCountCollector[] facetCountCollectors); + /** + * The callback method, after the partition was processed + * + */ public void finalizePartition(); public MapReduceResult getResult(); } diff --git a/bobo-browse/src/main/java/com/browseengine/bobo/mapred/MapReduceResult.java b/bobo-browse/src/main/java/com/browseengine/bobo/mapred/MapReduceResult.java index f452e0f6..db0d2ba0 100644 --- a/bobo-browse/src/main/java/com/browseengine/bobo/mapred/MapReduceResult.java +++ b/bobo-browse/src/main/java/com/browseengine/bobo/mapred/MapReduceResult.java @@ -4,6 +4,10 @@ import java.util.ArrayList; import java.util.List; +/** + * Keeps the map reduce results + * + */ public class MapReduceResult implements Serializable { protected List mapResults = new ArrayList(200); protected Serializable reduceResult; diff --git a/bobo-browse/src/main/java/com/browseengine/bobo/search/BoboSearcher2.java b/bobo-browse/src/main/java/com/browseengine/bobo/search/BoboSearcher2.java index f9d7a3db..14b221dc 100644 --- a/bobo-browse/src/main/java/com/browseengine/bobo/search/BoboSearcher2.java +++ b/bobo-browse/src/main/java/com/browseengine/bobo/search/BoboSearcher2.java @@ -106,7 +106,15 @@ public void setNextReader(BoboIndexReader reader,int docBase) throws IOException } _countCollectors = collectorList.toArray(new FacetCountCollector[collectorList.size()]); } - + public FacetCountCollector[] getCountCollectors() { + List collectors = new ArrayList(); + collectors.addAll(Arrays.asList(_countCollectors)); + for (FacetHitCollector facetHitCollector : _collectors) { + collectors.addAll(facetHitCollector._collectAllCollectorList); + collectors.addAll(facetHitCollector._countCollectorList); + } + return collectors.toArray(new FacetCountCollector[collectors.size()]); + } } private final static class DefaultFacetValidator extends FacetValidator{ @@ -286,9 +294,6 @@ public void search(Weight weight, Filter filter, Collector collector, int start, collector.setScorer(scorer); target = scorer.nextDoc(); - if (target!=DocIdSetIterator.NO_MORE_DOCS && mapReduceWrapper != null) { - mapReduceWrapper.mapFullIndexReader(_subReaders[i]); - } while(target!=DocIdSetIterator.NO_MORE_DOCS) { if(validator.validate(target)) @@ -303,6 +308,9 @@ public void search(Weight weight, Filter filter, Collector collector, int start, } } } + if (mapReduceWrapper != null) { + mapReduceWrapper.mapFullIndexReader(_subReaders[i], validator.getCountCollectors()); + } } return; } @@ -371,7 +379,7 @@ public void search(Weight weight, Filter filter, Collector collector, int start, target = filterDocIdIterator.advance(doc); } } - mapReduceWrapper.finalizeSegment(_subReaders[i]); + mapReduceWrapper.finalizeSegment(_subReaders[i], validator.getCountCollectors()); } } }