Skip to content
This repository has been archived by the owner on Oct 30, 2020. It is now read-only.

Commit

Permalink
Merged from javasoze/master and fix the facethandler fail to load error.
Browse files Browse the repository at this point in the history
  • Loading branch information
wonlay committed Mar 19, 2012
2 parents 89eb637 + 7ab1513 commit bc98239
Show file tree
Hide file tree
Showing 29 changed files with 1,058 additions and 235 deletions.
86 changes: 0 additions & 86 deletions .classpath

This file was deleted.

15 changes: 13 additions & 2 deletions sensei-core/src/main/java/com/senseidb/conf/SchemaConverter.java
Expand Up @@ -134,7 +134,18 @@ static public JSONObject convert(Document schemaDoc)
facetObj.put("type", facet.getAttribute("type"));
String depends = facet.getAttribute("depends");
if (depends!=null){
facetObj.put("depends", depends);
String[] dependsList = depends.split(",");
JSONArray dependsArr = new JSONArray();
for (String dependName : dependsList)
{
if (dependName != null)
{
dependName = dependName.trim();
if (dependName.length() != 0)
dependsArr.put(dependName);
}
}
facetObj.put("depends", dependsArr);
}
String column = facet.getAttribute("column");
if (column!=null && column.length() > 0){
Expand Down Expand Up @@ -170,7 +181,7 @@ static public JSONObject convert(Document schemaDoc)

public static void main(String[] args) throws Exception
{
File xmlSchema = new File("../example/cars/conf/schema.xml");
File xmlSchema = new File("../example/tweets/conf/schema.xml");
if (!xmlSchema.exists()){
throw new ConfigurationException("schema not file");
}
Expand Down
Expand Up @@ -341,14 +341,13 @@ public static SenseiSystemInfo buildFacets(JSONObject schemaObj, SenseiPluginReg
String type = facet.getString("type");
String fieldName = facet.optString("column",name);
Set<String> dependSet = new HashSet<String>();
String depends= facet.optString("depends",null);
if (depends != null) {
for (String dep :depends.split("[,; ]")) {
dep = dep.trim();
if (!dep.equals("")) {
dependSet.add(dep);
}
}
JSONArray dependsArray = facet.optJSONArray("depends");

if (dependsArray != null) {
int depCount = dependsArray.length();
for (int k=0;k<depCount;++k){
dependSet.add(dependsArray.getString(k));
}
}

SenseiSystemInfo.SenseiFacetInfo facetInfo = new SenseiSystemInfo.SenseiFacetInfo(name);
Expand Down
Expand Up @@ -15,7 +15,7 @@ sensei.cluster.url=localhost:2181
sensei.cluster.timeout=30000

# sensei indexing parameters
sensei.index.directory = index/test
sensei.index.directory = /tmp/sensei-index/test

sensei.index.batchSize = 100000
sensei.index.batchDelay = 30000
Expand Down
Expand Up @@ -15,7 +15,7 @@ sensei.cluster.url=localhost:2181
sensei.cluster.timeout=30000

# sensei indexing parameters
sensei.index.directory = index/test
sensei.index.directory = /tmp/sensei-index/test

sensei.index.batchSize = 100000
sensei.index.batchDelay = 30000
Expand Down
8 changes: 8 additions & 0 deletions sensei-gateways/pom.xml
Expand Up @@ -146,5 +146,13 @@
<artifactId>avro</artifactId>
<version>1.4.0</version>
</dependency>

<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.8.2.2</version>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Expand Up @@ -17,24 +17,24 @@
import proj.zoie.dataprovider.jdbc.JDBCStreamDataProvider;
import proj.zoie.dataprovider.jdbc.PreparedStatementBuilder;
import proj.zoie.impl.indexing.StreamDataProvider;
import proj.zoie.impl.indexing.ZoieConfig;

import com.senseidb.gateway.SenseiGateway;
import com.senseidb.indexing.DataSourceFilter;
import com.senseidb.indexing.ShardingStrategy;
import com.senseidb.plugin.SenseiPluginRegistry;

public class JdbcDataProviderBuilder extends SenseiGateway<ResultSet>{

private SenseiPluginRegistry pluginRegistry;
private Comparator<String> _versionComparator;

@Override
public void start() {
_versionComparator = pluginRegistry.getBeanByName("versionComparator", Comparator.class);
if (_versionComparator == null) _versionComparator = ZoieConfig.DEFAULT_VERSION_COMPARATOR;
}


@Override
@Override
public StreamDataProvider<JSONObject> buildDataProvider(final DataSourceFilter<ResultSet> dataFilter,
String oldSinceKey,
ShardingStrategy shardingStrategy,
Expand All @@ -45,15 +45,15 @@ public StreamDataProvider<JSONObject> buildDataProvider(final DataSourceFilter<R
final String username = config.get("jdbc.username");
final String password = config.get("jdbc.password");
final String driver = config.get("jdbc.driver");
final String adaptor = config.get("jdbc.adaptor");
final String adaptor = config.get("jdbc.adaptor");

final SenseiJDBCAdaptor senseiAdaptor =
pluginRegistry.getBeanByName(adaptor, SenseiJDBCAdaptor.class) != null ?
pluginRegistry.getBeanByName(adaptor, SenseiJDBCAdaptor.class) :
pluginRegistry.getBeanByFullPrefix(adaptor, SenseiJDBCAdaptor.class);

if (senseiAdaptor==null){
throw new ConfigurationException("adaptor not found: "+adaptor);
throw new ConfigurationException("adaptor not found: " + adaptor);
}


Expand Down
@@ -0,0 +1,34 @@
package com.senseidb.gateway.kafka;

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.json.JSONObject;

import com.senseidb.indexing.DataSourceFilter;

public class AvroDataSourceFilter<D> extends DataSourceFilter<DataPacket> {

private final Class<D> _cls;
private BinaryDecoder binDecoder;
private final SpecificDatumReader<D> reader;
private final DataSourceFilter<D> _dataMapper;

public AvroDataSourceFilter(Class<D> cls,DataSourceFilter<D> dataMapper){
_cls = cls;
binDecoder = null;
reader = new SpecificDatumReader<D>(_cls);
_dataMapper = dataMapper;
if (_dataMapper == null){
throw new IllegalArgumentException("source filter is null");
}
}

@Override
protected JSONObject doFilter(DataPacket packet) throws Exception {
binDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(packet.data,packet.offset,packet.size, binDecoder);
D avroObj = _cls.newInstance();
reader.read(avroObj,binDecoder);
return _dataMapper.filter(avroObj);
}
}
@@ -0,0 +1,14 @@
package com.senseidb.gateway.kafka;

public final class DataPacket {

public final byte[] data;
public final int offset;
public final int size;

public DataPacket(byte[] data,int offset,int size){
this.data = data;
this.offset = offset;
this.size = size;
}
}
@@ -0,0 +1,17 @@
package com.senseidb.gateway.kafka;

import java.nio.charset.Charset;

import org.json.JSONObject;

import com.senseidb.indexing.DataSourceFilter;

public class DefaultJsonDataSourceFilter extends DataSourceFilter<DataPacket> {
public final static Charset UTF8 = Charset.forName("UTF-8");

@Override
protected JSONObject doFilter(DataPacket packet) throws Exception {
String jsonString = new String(packet.data,packet.offset,packet.size,UTF8);
return new JSONObject(jsonString);
}
}

This file was deleted.

Expand Up @@ -12,15 +12,12 @@
import com.senseidb.indexing.DataSourceFilter;
import com.senseidb.indexing.ShardingStrategy;

public class KafkaDataProviderBuilder extends SenseiGateway<byte[]>{
public class KafkaDataProviderBuilder extends SenseiGateway<DataPacket>{

private final Comparator<String> _versionComparator = ZoieConfig.DEFAULT_VERSION_COMPARATOR;




@Override
public StreamDataProvider<JSONObject> buildDataProvider(final DataSourceFilter<byte[]> dataFilter,
public StreamDataProvider<JSONObject> buildDataProvider(DataSourceFilter<DataPacket> dataFilter,
String oldSinceKey,
ShardingStrategy shardingStrategy,
Set<Integer> partitions) throws Exception
Expand All @@ -32,11 +29,31 @@ public StreamDataProvider<JSONObject> buildDataProvider(final DataSourceFilter<b
int timeout = timeoutStr != null ? Integer.parseInt(timeoutStr) : 10000;
int batchsize = Integer.parseInt(config.get("kafka.batchsize"));

long offset = oldSinceKey == null ? 0L : Long.parseLong(oldSinceKey);
KafkaJsonStreamDataProvider provider = new KafkaJsonStreamDataProvider(_versionComparator,zookeeperUrl,timeout,batchsize,consumerGroupId,topic,offset);
if (dataFilter!=null){
provider.setFilter(dataFilter);
}
long offset = oldSinceKey == null ? 0L : Long.parseLong(oldSinceKey);

if (dataFilter==null){
String type = config.get("kafka.msg.type");
if (type == null){
type = "json";
}

if ("json".equals(type)){
dataFilter = new DefaultJsonDataSourceFilter();
}
else if ("avro".equals(type)){
String msgClsString = config.get("kafka.msg.avro.class");
String dataMapperClassString = config.get("kafka.msg.avro.datamapper");
Class cls = Class.forName(msgClsString);
Class dataMapperClass = Class.forName(dataMapperClassString);
DataSourceFilter dataMapper = (DataSourceFilter)dataMapperClass.newInstance();
dataFilter = new AvroDataSourceFilter(cls, dataMapper);
}
else{
throw new IllegalArgumentException("invalid msg type: "+type);
}
}

KafkaStreamDataProvider provider = new KafkaStreamDataProvider(_versionComparator,zookeeperUrl,timeout,batchsize,consumerGroupId,topic,offset,dataFilter);
return provider;
}

Expand Down

0 comments on commit bc98239

Please sign in to comment.