Permalink
Browse files

Merge branch 'json-api' of https://github.com/wonlay/sensei into

json-api

Conflicts:
	sensei-core/src/main/java/com/sensei/indexing/api/DefaultStreamingIndexingManager.java
  • Loading branch information...
2 parents 52d8d53 + a637ebe commit b8e08538dfcc25e59898e3317eaca1cd426c2981 vzhabiuk committed Dec 14, 2011
View
0 lib/kafka-0.7.6.jar 100755 → 100644
No changes.
@@ -26,9 +26,16 @@
import com.sensei.indexing.api.MetaType;
public class SenseiSchema {
- public static final String SRC_DATA_FIELD_NAME = "__SRC_DATA__";
+ public static final String SRC_DATA_FIELD_NAME = "__SRC_DATA__";
public static final String SRC_DATA_COMPRESSED_FIELD_NAME = "stored";
+ public static final String EVENT_TYPE_FIELD = "_type";
+ public static final String EVENT_FIELD = "event";
+ public static final String EVENT_TYPE_ADD = "add";
+ public static final String EVENT_TYPE_UPDATE = "update";
+ public static final String EVENT_TYPE_DELETE = "delete";
+ public static final String EVENT_TYPE_SKIP = "skip";
+
private static Logger logger = Logger.getLogger(SenseiSchema.class);
private String _uidField;
@@ -1,5 +1,6 @@
package com.sensei.indexing.api;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
@@ -11,6 +12,7 @@
import java.util.Set;
import java.util.StringTokenizer;
import java.util.Map.Entry;
+import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.configuration.ConfigurationException;
@@ -150,6 +152,47 @@ public Object extract(String val) {
});
}
+
+ public static byte[] compress(byte[] src) throws Exception
+ {
+ byte[] data = null;
+ if (src != null)
+ {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ GZIPOutputStream gzipStream = new GZIPOutputStream(bout);
+
+ gzipStream.write(src);
+ gzipStream.flush();
+ gzipStream.close();
+ bout.flush();
+
+ data = bout.toByteArray();
+ }
+
+ return data;
+ }
+
+ public static byte[] decompress(byte[] src) throws Exception
+ {
+ byte[] data = null;
+ if (src != null)
+ {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ byte[] buf = new byte[1024]; // 1k buffer
+ ByteArrayInputStream bin = new ByteArrayInputStream(src);
+ GZIPInputStream gzipStream = new GZIPInputStream(bin);
+
+ int len;
+ while ((len = gzipStream.read(buf)) > 0) {
+ bout.write(buf, 0, len);
+ }
+ bout.flush();
+
+ data = bout.toByteArray();
+ }
+
+ return data;
+ }
public void setCustomIndexingPipeline(CustomIndexingPipeline customIndexingPipeline){
_customIndexingPipeline = customIndexingPipeline;
@@ -269,37 +312,69 @@ public long getUID() {
}
@Override
- public boolean isDeleted() {
- return filtered.optBoolean(_delField);
+ public boolean isDeleted()
+ {
+ try
+ {
+ String type = filtered.optString(SenseiSchema.EVENT_TYPE_FIELD, null);
+ if (type == null)
+ return filtered.optBoolean(_delField);
+ else
+ return SenseiSchema.EVENT_TYPE_DELETE.equalsIgnoreCase(type);
+ }
+ catch(Exception e)
+ {
+ logger.error(e.getMessage(), e);
+ return false;
+ }
}
@Override
- public boolean isSkip() {
- return filtered.optBoolean(_skipField);
+ public boolean isSkip()
+ {
+ try
+ {
+ String type = filtered.optString(SenseiSchema.EVENT_TYPE_FIELD, null);
+ if (type == null)
+ return filtered.optBoolean(_skipField);
+ else
+ return SenseiSchema.EVENT_TYPE_SKIP.equalsIgnoreCase(type);
+ }
+ catch(Exception e)
+ {
+ logger.error(e.getMessage(), e);
+ return false;
+ }
}
@Override
- public byte[] getStoreValue() {
+ public byte[] getStoreValue()
+ {
byte[] data = null;
- if (src!=null){
- String strData = src.toString();
- if (strData!=null){
- try{
- data = strData.getBytes("UTF-8");
- if (_compressSrcData) {
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- GZIPOutputStream gzipStream = new GZIPOutputStream(bout);
-
- gzipStream.write(data);
- gzipStream.flush();
- gzipStream.close();
- bout.flush();
+ if (src != null)
+ {
+ Object type = src.remove(SenseiSchema.EVENT_TYPE_FIELD);
+ try
+ {
+ if (_compressSrcData)
+ data = compress(src.toString().getBytes("UTF-8"));
+ else
+ data = src.toString().getBytes("UTF-8");
+ }
+ catch (Exception e)
+ {
+ logger.error(e.getMessage(), e);
+ }
- data = bout.toByteArray();
- }
+ if (type != null)
+ {
+ try
+ {
+ src.put(SenseiSchema.EVENT_TYPE_FIELD, type);
}
- catch(Exception e){
- logger.error(e.getMessage(),e);
+ catch(Exception e)
+ {
+ logger.error("Should never happen", e);
}
}
}
@@ -5,6 +5,7 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -20,6 +21,7 @@
import proj.zoie.api.DataProvider;
import proj.zoie.api.Zoie;
import proj.zoie.api.ZoieException;
+import proj.zoie.api.ZoieIndexReader;
import proj.zoie.impl.indexing.StreamDataProvider;
import proj.zoie.mbean.DataProviderAdmin;
import proj.zoie.mbean.DataProviderAdminMBean;
@@ -132,7 +134,7 @@ public void initialize(
_zoieSystemMap = zoieSystemMap;
_dataProvider = buildDataProvider();
-
+
Iterator<Integer> it = zoieSystemMap.keySet().iterator();
while(it.hasNext()){
int part = it.next();
@@ -213,12 +215,96 @@ public DataDispatcher(int maxPartitionId,String uidField){
_currentVersion = null;
}
+ private JSONObject rewriteData(JSONObject obj, int partNum)
+ {
+ String type = obj.optString(SenseiSchema.EVENT_TYPE_FIELD, null);
+
+ JSONObject event = obj.optJSONObject(SenseiSchema.EVENT_FIELD);
+ if (event == null)
+ event = obj;
+ else if (type != null)
+ {
+ try
+ {
+ event.put(SenseiSchema.EVENT_TYPE_FIELD, type);
+ }
+ catch(Exception e)
+ {
+ logger.error("Should never happen", e);
+ }
+ }
+
+ if (SenseiSchema.EVENT_TYPE_UPDATE.equalsIgnoreCase(type))
+ {
+ Zoie<BoboIndexReader, JSONObject> zoie = _zoieSystemMap.get(partNum);
+ List<ZoieIndexReader<BoboIndexReader>> readers;
+ try
+ {
+ readers = zoie.getIndexReaders();
+ }
+ catch(Exception e)
+ {
+ logger.error(e.getMessage(), e);
+ return null;
+ }
+
+ if (readers == null)
+ {
+ logger.error("Cannot found original doc for and update event: " + obj);
+ return null;
+ }
+ try
+ {
+ byte[] src = null;
+ long uid = event.getLong(_senseiSchema.getUidField());
+ for (ZoieIndexReader<BoboIndexReader> reader : readers)
+ {
+ src = reader.getStoredValue(uid);
+ if (src != null)
+ break;
+ }
+ byte[] data = null;
+
+ if (_senseiSchema.isCompressSrcData())
+ data = DefaultJsonSchemaInterpreter.decompress(src);
+ else
+ data = src;
+
+ if (data == null)
+ {
+ logger.error("Cannot found original doc for and update event: " + obj);
+ return null;
+ }
+
+ JSONObject newEvent = new JSONObject(new String(data, "UTF-8"));
+ Iterator<String> keys = event.keys();
+ while(keys.hasNext())
+ {
+ String key = keys.next();
+ newEvent.put(key, event.get(key));
+ }
+ event = newEvent;
+ }
+ catch (Exception e)
+ {
+ logger.error(e.getMessage(), e);
+ return null;
+ }
+ finally
+ {
+ zoie.returnIndexReaders(readers);
+ }
+ }
+
+ return event;
+ }
+
@Override
public void consume(Collection<proj.zoie.api.DataConsumer.DataEvent<JSONObject>> data) throws ZoieException
{
UpdateBatchSizeMeter.mark(data.size());
- ProviderBatchSizeMeter.mark(DefaultStreamingIndexingManager.this._dataProvider.getBatchSize());
- EventMeter.mark(DefaultStreamingIndexingManager.this._dataProvider.getEventCount());
+ ProviderBatchSizeMeter.mark(_dataProvider.getBatchSize());
+ EventMeter.mark(_dataProvider.getEventCount());
try{
for(DataEvent<JSONObject> dataEvt : data){
@@ -230,27 +316,33 @@ public void consume(Collection<proj.zoie.api.DataConsumer.DataEvent<JSONObject>>
_currentVersion = dataEvt.getVersion();
int routeToPart = _shardingStrategy.caculateShard(_maxPartitionId, obj);
- if(DefaultStreamingIndexingManager.this._dataCollectorMap.containsKey(routeToPart)){
- Collection<DataEvent<JSONObject>> partDataSet = DefaultStreamingIndexingManager.this._dataCollectorMap.get(routeToPart);
- if (partDataSet!=null){
+ Collection<DataEvent<JSONObject>> partDataSet = _dataCollectorMap.get(routeToPart);
+ if (partDataSet != null)
+ {
+ JSONObject rewrited = rewriteData(obj, routeToPart);
+ if (rewrited != null)
+ {
+ if (rewrited != obj)
+ dataEvt = new DataEvent<JSONObject>(rewrited, dataEvt.getVersion());
partDataSet.add(dataEvt);
}
}
}
- Iterator<Integer> it = DefaultStreamingIndexingManager.this._zoieSystemMap.keySet().iterator();
+ Iterator<Integer> it = _zoieSystemMap.keySet().iterator();
while(it.hasNext()){
int part_num = it.next();
- Zoie<BoboIndexReader,JSONObject> dataConsumer = DefaultStreamingIndexingManager.this._zoieSystemMap.get(part_num);
+ Zoie<BoboIndexReader,JSONObject> dataConsumer = _zoieSystemMap.get(part_num);
if (dataConsumer!=null){
LinkedList<DataEvent<JSONObject>> partDataSet =
- (LinkedList<DataEvent<JSONObject>>) DefaultStreamingIndexingManager.this._dataCollectorMap.get(part_num);
+ (LinkedList<DataEvent<JSONObject>>) _dataCollectorMap.get(part_num);
if (partDataSet != null)
{
if (partDataSet.size() == 0)
{
JSONObject markerObj = new JSONObject();
- markerObj.put(DefaultStreamingIndexingManager.this._senseiSchema.getSkipField(), "true");
+ //markerObj.put(_senseiSchema.getSkipField(), "true");
+ markerObj.put(SenseiSchema.EVENT_TYPE_FIELD, SenseiSchema.EVENT_TYPE_SKIP);
markerObj.put(_uidField, 0L); // Add a dummy uid
partDataSet.add(new DataEvent<JSONObject>(markerObj, _currentVersion));
}
@@ -262,7 +354,7 @@ else if (_currentVersion != null && !_currentVersion.equals(partDataSet.getLast(
dataConsumer.consume(partDataSet);
}
}
- DefaultStreamingIndexingManager.this._dataCollectorMap.put(part_num, new LinkedList<DataEvent<JSONObject>>());
+ _dataCollectorMap.put(part_num, new LinkedList<DataEvent<JSONObject>>());
}
}
catch(Exception e){
@@ -278,7 +370,7 @@ public String getVersion()
@Override
public Comparator<String> getVersionComparator() {
- return DefaultStreamingIndexingManager.this._versionComparator;
+ return _versionComparator;
}
}
}
@@ -3,6 +3,8 @@
import org.json.JSONObject;
import org.json.JSONException;
+import com.sensei.conf.SenseiSchema;
+
public interface ShardingStrategy {
int caculateShard(int maxShardId,JSONObject dataObj) throws JSONException;
@@ -18,7 +20,13 @@ public FieldModShardingStrategy(String field)
@Override
public int caculateShard(int maxShardId,JSONObject dataObj) throws JSONException
{
- return (int)(dataObj.getLong(_field) % maxShardId);
+ JSONObject event = dataObj.optJSONObject(SenseiSchema.EVENT_FIELD);
+ long uid;
+ if (event == null)
+ uid = dataObj.getLong(_field);
+ else
+ uid = event.getLong(_field);
+ return (int)(uid % maxShardId);
}
}
}
Oops, something went wrong.

0 comments on commit b8e0853

Please sign in to comment.