Permalink
Browse files

added sharding strategy support

  • Loading branch information...
1 parent 76d7f9f commit 461786d6bea47ed54f687065e57f4687309b1434 @javasoze javasoze committed May 13, 2011
@@ -27,7 +27,6 @@
private static Logger logger = Logger.getLogger(SenseiSchema.class);
private String _uidField;
- private String _shardByField;
private String _deleteField;
private String _skipField;
private String _srcDataStore;
@@ -52,9 +51,6 @@ public String getUidField(){
return _uidField;
}
- public String getShardByField(){
- return _shardByField;
- }
public String getDeleteField(){
return _deleteField;
@@ -95,9 +91,6 @@ public static SenseiSchema build(Document schemaDoc) throws ConfigurationExcepti
Element tableElem = (Element) tables.item(0);
schema._uidField = tableElem.getAttribute("uid");
- schema._shardByField = tableElem.getAttribute("shard-by");
- if (schema._shardByField==null || schema._shardByField.length()==0)
- schema._shardByField = schema._uidField;
schema._deleteField = tableElem.getAttribute("delete-field");
if (schema._deleteField==null) schema._deleteField="";
schema._skipField = tableElem.getAttribute("skip-field");
@@ -47,6 +47,8 @@
private static final String BATCH_SIZE = "batchSize";
+ private static final String SHARDING_STRATEGY = "shardingStrategy";
+
private StreamDataProvider<JSONObject> _dataProvider;
private String _oldestSinceKey;
@@ -58,6 +60,7 @@
private Map<Integer, Zoie<BoboIndexReader, JSONObject>> _zoieSystemMap;
private final LinkedHashMap<Integer, Collection<DataEvent<JSONObject>>> _dataCollectorMap;
private final Comparator<String> _versionComparator;
+ private final ShardingStrategy _shardingStrategy;
public DefaultStreamingIndexingManager(SenseiSchema schema,Configuration senseiConfig, ApplicationContext pluginContext, Comparator<String> versionComparator){
_dataProvider = null;
@@ -70,6 +73,19 @@ public DefaultStreamingIndexingManager(SenseiSchema schema,Configuration senseiC
_registeredMBeans = new LinkedList<ObjectName>();
_dataCollectorMap = new LinkedHashMap<Integer, Collection<DataEvent<JSONObject>>>();
_versionComparator = versionComparator;
+
+ String shardingStrategyName = _myconfig.getString(SHARDING_STRATEGY);
+
+ ShardingStrategy strategy = null;
+
+ if (shardingStrategyName!=null){
+ strategy = (ShardingStrategy)pluginContext.getBean(shardingStrategyName);
+ }
+ if (strategy == null){
+ strategy = new ShardingStrategy.UidModShardingStrategy();
+ }
+
+ _shardingStrategy = strategy;
}
public void updateOldestSinceKey(String sinceKey){
@@ -217,9 +233,11 @@ public void consume(Collection<proj.zoie.api.DataConsumer.DataEvent<JSONObject>>
}
_currentVersion = dataEvt.getVersion();
- long shardBy = obj.getLong(_senseiSchema.getShardByField());
- int routeToPart = (int)(shardBy % _maxPartitionId);
- if(shardBy>=0 && DefaultStreamingIndexingManager.this._dataCollectorMap.containsKey(routeToPart)){
+ long uid = obj.getLong(_senseiSchema.getUidField());
+
+
+ int routeToPart = _shardingStrategy.caculateShard(_maxPartitionId, uid, obj);
+ if(DefaultStreamingIndexingManager.this._dataCollectorMap.containsKey(routeToPart)){
Collection<DataEvent<JSONObject>> partDataSet = DefaultStreamingIndexingManager.this._dataCollectorMap.get(routeToPart);
if (partDataSet!=null){
if (obj == obj)
@@ -0,0 +1,15 @@
+package com.sensei.indexing.api;
+
+import org.json.JSONObject;
+
+public interface ShardingStrategy {
+ int caculateShard(int maxShardId,long uid,JSONObject dataObj);
+
+ public static class UidModShardingStrategy implements ShardingStrategy{
+
+ @Override
+ public int caculateShard(int maxShardId, long uid, JSONObject dataObj) {
+ return (int)(uid % maxShardId);
+ }
+ }
+}

0 comments on commit 461786d

Please sign in to comment.