Skip to content

Commit

Permalink
Improved the performance of the group by map reduce. Still some fixes
Browse files Browse the repository at this point in the history
needed
  • Loading branch information
Volodymyr Zhabiuk committed Jan 2, 2013
1 parent 4ca8fe1 commit 853bff5
Show file tree
Hide file tree
Showing 23 changed files with 817 additions and 283 deletions.
Expand Up @@ -107,5 +107,6 @@ public interface FieldAccessor {
* @return
*/
public DocIDMapper getMapper();
public SingleFieldAccessor getSingleFieldAccessor(String facetName);

}
Expand Up @@ -18,6 +18,7 @@
import com.browseengine.bobo.facets.data.TermShortList;
import com.browseengine.bobo.facets.data.TermValueList;
import com.senseidb.search.req.SenseiSystemInfo.SenseiFacetInfo;
import com.senseidb.search.req.mapred.impl.SingleFieldAccessorImpl;

/**
* This class was designed to avoid polymorphism and to leverage primitive types as much as possible
Expand Down Expand Up @@ -339,4 +340,13 @@ public DocIDMapper getMapper() {
return mapper;
}

private Map<String, SingleFieldAccessor> singleFieldAccessors = new HashMap<String, SingleFieldAccessor>();
@Override
public SingleFieldAccessor getSingleFieldAccessor(String facetName) {
if (!singleFieldAccessors.containsKey(facetName)) {
singleFieldAccessors.put(facetName, new SingleFieldAccessorImpl((FacetDataCache) boboIndexReader.getFacetData(facetName), boboIndexReader.getFacetHandler(facetName), boboIndexReader));
}
return singleFieldAccessors.get(facetName);
}

}
@@ -0,0 +1,67 @@
package com.senseidb.search.req.mapred;

public interface SingleFieldAccessor {
/**
* Get facet value for the document
* @param fieldName
* @param docId
* @return
*/
public Object get(int docId);

/**
* Get string facet value for the document
* @param fieldName
* @param docId
* @return
*/
public String getString(int docId);

/**
* Get long facet value for the document
* @param fieldName
* @param docId
* @return
*/
public long getLong(int docId);

/**
* Get double facet value for the document
* @param fieldName
* @param docId
* @return
*/
public double getDouble(int docId);

/**
* Get short facet value for the document
* @param fieldName
* @param docId
* @return
*/
public short getShort(int docId);

/**
* Get integer facet value for the document
* @param fieldName
* @param docId
* @return
*/
public int getInteger(int docId);

/**
* Get float facet value for the document
* @param fieldName
* @param docId
* @return
*/
public float getFloat(int docId);

/**
* Get array facet value for the document
* @param fieldName
* @param docId
* @return
*/
public Object[] getArray(int docId);
}
Expand Up @@ -11,6 +11,7 @@
import com.senseidb.search.req.mapred.FieldAccessor;
import com.senseidb.search.req.mapred.IntArray;
import com.senseidb.search.req.mapred.SenseiMapReduce;
import com.senseidb.search.req.mapred.SingleFieldAccessor;
import com.senseidb.util.JSONUtil.FastJSONObject;

public class AvgMapReduce implements SenseiMapReduce<AvgResult, AvgResult> {
Expand All @@ -27,9 +28,10 @@ public void init(JSONObject params) {

@Override
public AvgResult map(IntArray docId, int docIdCount, long[] uids, FieldAccessor accessor, FacetCountAccessor facetCountAccessor) {
double ret = 0;
SingleFieldAccessor singleFieldAccessor = accessor.getSingleFieldAccessor(column);
double ret = 0;
for (int i = 0; i < docIdCount; i++) {
ret+= accessor.getDouble(column, docId.get(i));
ret+= singleFieldAccessor.getDouble(docId.get(i));
}
return new AvgResult(ret / docIdCount, docIdCount);
}
Expand Down
@@ -1,20 +1,23 @@
package com.senseidb.search.req.mapred.functions;

import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;

import java.util.List;

import org.json.JSONException;
import org.json.JSONObject;

import com.browseengine.bobo.facets.data.TermNumberList;
import com.senseidb.search.req.mapred.CombinerStage;
import com.senseidb.search.req.mapred.FacetCountAccessor;
import com.senseidb.search.req.mapred.FieldAccessor;
import com.senseidb.search.req.mapred.IntArray;
import com.senseidb.search.req.mapred.SenseiMapReduce;
import com.senseidb.search.req.mapred.SingleFieldAccessor;
import com.senseidb.util.JSONUtil.FastJSONObject;

public class DistinctCountMapReduce implements SenseiMapReduce<LongOpenHashSet, Integer> {
public class DistinctCountMapReduce implements SenseiMapReduce<IntOpenHashSet, Integer> {

private String column;

Expand All @@ -28,21 +31,32 @@ public void init(JSONObject params) {
}

@Override
public LongOpenHashSet map(IntArray docId, int docIdCount, long[] uids, FieldAccessor accessor, FacetCountAccessor facetCountAccessor) {
LongOpenHashSet hashSet = new LongOpenHashSet(docIdCount);
for (int i =0; i < docIdCount; i++) {
hashSet.add(accessor.getLong(column, docId.get(i)));
}
public IntOpenHashSet map(IntArray docId, int docIdCount, long[] uids, FieldAccessor accessor, FacetCountAccessor facetCountAccessor) {

SingleFieldAccessor singleFieldAccessor = accessor.getSingleFieldAccessor(column);
IntOpenHashSet intSet = new IntOpenHashSet();
if (!(accessor.getTermValueList(column) instanceof TermNumberList)) {
for (int i =0; i < docIdCount; i++) {
singleFieldAccessor.get(docId.get(i)).hashCode();
intSet.add(singleFieldAccessor.get(docId.get(i)).hashCode());
}
} else {
for (int i =0; i < docIdCount; i++) {
singleFieldAccessor.getInteger(docId.get(i));
intSet.add(singleFieldAccessor.get(docId.get(i)).hashCode());
}
}


return hashSet;
return intSet;
}

@Override
public List<LongOpenHashSet> combine(List<LongOpenHashSet> mapResults, CombinerStage combinerStage) {
public List<IntOpenHashSet> combine(List<IntOpenHashSet> mapResults, CombinerStage combinerStage) {
if (mapResults.isEmpty()) {
return mapResults;
}
LongOpenHashSet ret = mapResults.get(0);
IntOpenHashSet ret = mapResults.get(0);
for (int i = 1; i < mapResults.size(); i++) {
ret.addAll(mapResults.get(i));
}
Expand All @@ -52,11 +66,11 @@ public List<LongOpenHashSet> combine(List<LongOpenHashSet> mapResults, CombinerS
}

@Override
public Integer reduce(List<LongOpenHashSet> combineResults) {
public Integer reduce(List<IntOpenHashSet> combineResults) {
if (combineResults.isEmpty()) {
return 0;
}
LongOpenHashSet ret = combineResults.get(0);
IntOpenHashSet ret = combineResults.get(0);
for (int i = 1; i < combineResults.size(); i++) {
ret.addAll(combineResults.get(i));
}
Expand Down

This file was deleted.

Expand Up @@ -13,6 +13,7 @@
import com.senseidb.search.req.mapred.FieldAccessor;
import com.senseidb.search.req.mapred.IntArray;
import com.senseidb.search.req.mapred.SenseiMapReduce;
import com.senseidb.search.req.mapred.SingleFieldAccessor;
import com.senseidb.util.JSONUtil.FastJSONArray;
import com.senseidb.util.JSONUtil.FastJSONObject;

Expand All @@ -25,8 +26,9 @@ public MaxResult map(IntArray docIds, int docIdCount, long[] uids, FieldAccessor
double max = Double.MIN_VALUE;
double tmp = 0;
long uid = 0l;
SingleFieldAccessor singleFieldAccessor = accessor.getSingleFieldAccessor(column);
for (int i =0; i < docIdCount; i++) {
tmp = accessor.getDouble(column, docIds.get(i));
tmp = singleFieldAccessor.getDouble(docIds.get(i));
if (max < tmp) {
max = tmp;
if (uids != null && !(uids.length == 1 && uids[0] == Long.MIN_VALUE)) {
Expand Down
Expand Up @@ -11,6 +11,7 @@
import com.senseidb.search.req.mapred.FieldAccessor;
import com.senseidb.search.req.mapred.IntArray;
import com.senseidb.search.req.mapred.SenseiMapReduce;
import com.senseidb.search.req.mapred.SingleFieldAccessor;
import com.senseidb.util.JSONUtil.FastJSONArray;
import com.senseidb.util.JSONUtil.FastJSONObject;

Expand All @@ -23,8 +24,10 @@ public MinResult map(IntArray docIds, int docIdCount, long[] uids, FieldAccessor
double min = Double.MAX_VALUE;
double tmp = 0;
long uid = 0l;
SingleFieldAccessor singleFieldAccessor = accessor.getSingleFieldAccessor(column);

for (int i =0; i < docIdCount; i++) {
tmp = accessor.getDouble(column, docIds.get(i));
tmp = singleFieldAccessor.getDouble(docIds.get(i));
if (min > tmp) {
min = tmp;
if (uids != null && !(uids.length == 1 && uids[0] == Long.MIN_VALUE)) {
Expand Down
Expand Up @@ -7,11 +7,13 @@
import org.json.JSONException;
import org.json.JSONObject;

import com.browseengine.bobo.facets.data.TermNumberList;
import com.senseidb.search.req.mapred.CombinerStage;
import com.senseidb.search.req.mapred.FacetCountAccessor;
import com.senseidb.search.req.mapred.FieldAccessor;
import com.senseidb.search.req.mapred.IntArray;
import com.senseidb.search.req.mapred.SenseiMapReduce;
import com.senseidb.search.req.mapred.SingleFieldAccessor;
import com.senseidb.util.JSONUtil.FastJSONArray;
import com.senseidb.util.JSONUtil.FastJSONObject;

Expand All @@ -30,8 +32,12 @@ public void init(JSONObject params) {
@Override
public Double map(IntArray docIds, int docIdCount, long[] uids, FieldAccessor accessor, FacetCountAccessor facetCountAccessor) {
double ret = 0;
SingleFieldAccessor singleFieldAccessor = accessor.getSingleFieldAccessor(column);
if (!(accessor.getTermValueList(column) instanceof TermNumberList)) {
throw new IllegalStateException("SumMapReduce needs numeric column");
}
for (int i = 0; i < docIdCount; i++) {
ret += accessor.getDouble(column, docIds.get(i));
ret += singleFieldAccessor.getDouble(docIds.get(i));
}
return ret;
}
Expand Down
Expand Up @@ -6,8 +6,9 @@
import org.json.JSONObject;

import com.senseidb.search.req.mapred.FieldAccessor;
import com.senseidb.search.req.mapred.SingleFieldAccessor;
public interface AggregateFunction<T extends GroupedValue> extends Serializable {
public T produceSingleValue(FieldAccessor accessor, int docId);
public T produceSingleValue(SingleFieldAccessor accessor, int docId);
public Object toJson(HashMap<String, T> reduceResult);

}

0 comments on commit 853bff5

Please sign in to comment.