Skip to content
This repository has been archived by the owner on Oct 30, 2020. It is now read-only.

Commit

Permalink
Merge divchenko's 'master' into 'master'.
Browse files Browse the repository at this point in the history
Conflicts:
	build.gradle
	gradle.properties
	sensei-core/src/main/java/com/senseidb/indexing/DefaultStreamingIndexingManager.java
	sensei-core/src/main/java/com/senseidb/search/node/SenseiBroker.java
	sensei-core/src/main/java/com/senseidb/search/node/broker/BrokerConfig.java
	sensei-core/src/main/java/com/senseidb/servlet/AbstractSenseiClientServlet.java
	sensei-core/src/test/java/com/senseidb/test/SenseiStarter.java
	sensei-federated-broker/src/main/java/com/senseidb/federated/broker/proxy/SenseiBrokerProxy.java
  • Loading branch information
nsabovic committed Sep 24, 2013
2 parents 5acf303 + 4294773 commit 041262a
Show file tree
Hide file tree
Showing 17 changed files with 225 additions and 67 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Expand Up @@ -41,8 +41,8 @@ project.ext.externalDependency = [
'kafka': 'kafka:kafka:0.8.0_0.30_9bd2a11486420a313f800-SNAPSHOT',
'zoieCore': 'com.linkedin.zoie:zoie-core:3.3.11',
'zoieJms': 'com.linkedin.zoie:zoie-jms:3.3.11',
'boboBrowse': 'com.linkedin.bobo:bobo-browse:3.1.15',
'norbert':'com.linkedin:norbert_2.8.1:0.6.20-SNAPSHOT',
'boboBrowse': 'com.linkedin.bobo:bobo-browse:3.1.18',
'norbert':'com.linkedin.norbert:norbert:0.6.34',

'antlr':'org.antlr:antlr:3.4',

Expand Down
Expand Up @@ -80,6 +80,10 @@ public class SenseiClientRequest {
* Flag indicating whether explanation information should be returned
*/
private boolean explain;
/**
* Flag indicating whether request trace information should be logged
*/
private boolean trace;
/**
* the field value used for routing
*/
Expand Down Expand Up @@ -129,6 +133,11 @@ public Builder explain(boolean explain) {
return this;
}

public Builder trace(boolean trace) {
request.trace = trace;
return this;
}

public Builder query(Query query) {
request.query = query;

Expand Down Expand Up @@ -279,6 +288,10 @@ public boolean isExplain() {
return explain;
}

public boolean isTrace() {
return trace;
}

public String getRouteParam() {
return routeParam;
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
@@ -1 +1 @@
version=1.5.17
version=1.5.25
Expand Up @@ -89,6 +89,7 @@ public interface SenseiConfParams {
public static final String SERVER_BROKER_MAXWAIT = "sensei.broker.maxWaittime";
public static final String SERVER_BROKER_TIMEOUT = "sensei.broker.timeout";
public static final String ALLOW_PARTIAL_MERGE = "sensei.broker.allowPartialMerge";
public static final String SERVER_BROKER_REQUEST_CUSTOMIZER_FACTORY = "sensei.broker.request.customizer.factory";


public static final String SENSEI_BROKER_POLL_INTERVAL = "sensei.broker.pollInterval";
Expand Down
Expand Up @@ -78,6 +78,9 @@ public class DefaultStreamingIndexingManager implements SenseiIndexingManager<JS
private Meter _providerBatchSizeMeter;
private Meter _eventMeter;
private Meter _updateBatchSizeMeter;
private Meter _indexSizeMeter;
private long _lastMeasureTime;
private static final long MEASURE_INTERVAL = 1000 * 60; // 1 minute
private Timer _indexingLatencyTimer;

private StreamDataProvider<JSONObject> _dataProvider;
Expand All @@ -94,9 +97,7 @@ public class DefaultStreamingIndexingManager implements SenseiIndexingManager<JS
private final Comparator<String> _versionComparator;
private final PluggableSearchEngineManager pluggableSearchEngineManager;
private SenseiPluginRegistry pluginRegistry;




public DefaultStreamingIndexingManager(SenseiSchema schema,Configuration senseiConfig,
SenseiPluginRegistry pluginRegistry, SenseiGateway<?> gateway, ShardingStrategy shardingStrategy, PluggableSearchEngineManager pluggableSearchEngineManager){
Expand Down Expand Up @@ -221,6 +222,9 @@ public void shutdown() {
if (_updateBatchSizeMeter != null) {
_updateBatchSizeMeter.stop();
}
if (_indexSizeMeter != null) {
_indexSizeMeter.stop();
}
if (_eventMeter != null) {
_eventMeter.stop();
}
Expand All @@ -235,6 +239,7 @@ public void start() throws Exception {
_providerBatchSizeMeter = registerMeter("provider-batch-size", "provide-batch-size");
_updateBatchSizeMeter = registerMeter("update-batch-size", "update-batch-size");
_eventMeter = registerMeter("indexing-events", "indexing-events");
_indexSizeMeter = registerMeter("index-size", "index-size");
_indexingLatencyTimer = registerTimer("indexing-latency");

_dataProvider.start();
Expand Down Expand Up @@ -406,6 +411,11 @@ public void consume(Collection<proj.zoie.api.DataConsumer.DataEvent<JSONObject>>
}
}

long indexSize = 0;
long now = System.currentTimeMillis();
boolean measureIndexSize = now - _lastMeasureTime > MEASURE_INTERVAL ? true : false;
_lastMeasureTime = now;

Iterator<Integer> it = _zoieSystemMap.keySet().iterator();
while(it.hasNext()){
int part_num = it.next();
Expand All @@ -430,8 +440,14 @@ else if (_currentVersion != null && !_currentVersion.equals(partDataSet.getLast(
}
dataConsumer.consume(partDataSet);
}

if (measureIndexSize)
indexSize += dataConsumer.getAdminMBean().getDiskIndexSizeBytes();
}
_dataCollectorMap.put(part_num, new LinkedList<DataEvent<JSONObject>>());

if (measureIndexSize)
_indexSizeMeter.mark(indexSize);
}
}
catch(Exception e){
Expand Down
113 changes: 88 additions & 25 deletions sensei-core/src/main/java/com/senseidb/search/node/SenseiBroker.java
Expand Up @@ -18,35 +18,28 @@
*/
package com.senseidb.search.node;

import com.linkedin.norbert.network.Serializer;
import com.senseidb.search.req.*;
import com.senseidb.metrics.MetricFactory;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;

import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.apache.log4j.Logger;
import org.apache.lucene.document.Document;
import org.apache.lucene.search.SortField;

import proj.zoie.api.indexing.AbstractZoieIndexable;

import com.browseengine.bobo.api.FacetSpec;
import com.linkedin.norbert.NorbertException;
import com.linkedin.norbert.javacompat.cluster.ClusterClient;
import com.linkedin.norbert.javacompat.cluster.Node;
import com.linkedin.norbert.javacompat.network.PartitionedNetworkClient;
import com.linkedin.norbert.javacompat.network.RequestBuilder;
import com.linkedin.norbert.network.ResponseIterator;
import com.linkedin.norbert.network.Serializer;
import com.senseidb.conf.SenseiSchema;
import com.senseidb.indexing.DefaultJsonSchemaInterpreter;
import com.senseidb.svc.impl.CoreSenseiServiceImpl;
import com.senseidb.metrics.MetricFactory;
import com.senseidb.search.req.*;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.MetricName;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import org.apache.log4j.Logger;
import org.apache.lucene.document.Document;
import org.apache.lucene.search.SortField;
import proj.zoie.api.indexing.AbstractZoieIndexable;

import java.util.*;
import java.util.concurrent.ExecutionException;


/**
Expand All @@ -62,15 +55,21 @@ public class SenseiBroker extends AbstractConsistentHashBroker<SenseiRequest, Se

private final boolean allowPartialMerge;
private final ClusterClient clusterClient;
private final SenseiRequestCustomizerFactory requestCustomizerFactory;
private final Counter numberOfNodesInTheCluster = MetricFactory.newCounter(new MetricName(SenseiBroker.class,
"numberOfNodesInTheCluster"));

public SenseiBroker(PartitionedNetworkClient<String> networkClient, ClusterClient clusterClient,
Serializer<SenseiRequest, SenseiResult> serializer, long timeoutMillis, boolean allowPartialMerge)
public SenseiBroker(PartitionedNetworkClient<String> networkClient,
ClusterClient clusterClient,
boolean allowPartialMerge,
Serializer<SenseiRequest, SenseiResult> serializer,
long timeoutMillis,
SenseiRequestCustomizerFactory requestCustomizerFactory)
throws NorbertException {
super(networkClient, serializer, timeoutMillis);
this.clusterClient = clusterClient;
this.allowPartialMerge = allowPartialMerge;
this.requestCustomizerFactory = requestCustomizerFactory;
clusterClient.addListener(this);
logger.info("created broker instance " + networkClient + " " + clusterClient);
}
Expand Down Expand Up @@ -172,10 +171,69 @@ public SenseiResult getEmptyResultInstance()
{
return new SenseiResult();
}

protected List<SenseiResult> doCall(final SenseiRequest req) throws ExecutionException
{
List<SenseiResult> resultList = new ArrayList<SenseiResult>();

// only instantiate if debug logging is enabled
final List<StringBuilder> timingLogLines = req.isTrace() || logger.isDebugEnabled() ? new LinkedList<StringBuilder>() : null;

final SenseiRequestCustomizer customizer;
if (requestCustomizerFactory != null)
customizer = requestCustomizerFactory.getRequestCustomizer(req);
else
customizer = null;

ResponseIterator<SenseiResult> responseIterator =
buildIterator(_networkClient.sendRequestToOneReplica(getRouteParam(req), new RequestBuilder<Integer, SenseiRequest>() {
@Override
public SenseiRequest apply(Node node, Set<Integer> nodePartitions) {
// TODO: Cloning is yucky per http://www.artima.com/intv/bloch13.html
SenseiRequest clone = (SenseiRequest) (((SenseiRequest) req).clone());

clone.setPartitions(nodePartitions);
if (timingLogLines != null) {
// this means debug logging was enabled, produce first portion of log lines
timingLogLines.add(buildLogLineForRequest(node, clone));
}

SenseiRequest customizedRequest = customizeRequest(clone, customizer, nodePartitions);
return customizedRequest;
}
}, _serializer));

while(responseIterator.hasNext()) {
resultList.add(responseIterator.next());
}

if (timingLogLines != null) {
// this means debug logging was enabled, complete the timing log lines and log them
int i = 0;
for (StringBuilder logLine : timingLogLines) {
// we are assuming the request builder gets called in the same order as the response
// iterator is built, otherwise the loglines would be out of sync between req & res
if (i < resultList.size()) {
buildLogLineForResult(logLine, resultList.get(i++));
if (logger.isDebugEnabled())
logger.debug(logLine.toString());
else
logger.info(logLine.toString());
}
}
String numResponses = String.format("There are %d responses", resultList.size());
if (logger.isDebugEnabled())
logger.debug(numResponses);
else
logger.info(numResponses);
}

return resultList;
}

@Override
public SenseiRequest customizeRequest(SenseiRequest request)
{ // Rewrite offset and count.
public SenseiRequest customizeRequest(SenseiRequest request, SenseiRequestCustomizer customizer, Set<Integer> nodePartitions)
{
// Rewrite offset and count.
request.setCount(request.getOffset()+request.getCount());
request.setOffset(0);

Expand All @@ -193,6 +251,11 @@ public SenseiRequest customizeRequest(SenseiRequest request)
if (!request.isFetchStoredFields())
request.setFetchStoredFields(request.isFetchStoredValue());

if (customizer != null)
{
request = customizer.customize(request, nodePartitions);
}

// Rewrite select list to include sort and group by fields:
if (request.getSelectSet() != null)
{
Expand Down
Expand Up @@ -18,22 +18,20 @@
*/
package com.senseidb.search.node.broker;

import java.util.Comparator;

import com.linkedin.norbert.network.Serializer;
import com.senseidb.search.req.SenseiResult;
import org.apache.commons.configuration.Configuration;

import com.linkedin.norbert.javacompat.cluster.ZooKeeperClusterClient;
import com.linkedin.norbert.javacompat.network.NetworkClientConfig;
import com.linkedin.norbert.javacompat.network.PartitionedLoadBalancerFactory;
import com.linkedin.norbert.network.Serializer;
import com.senseidb.cluster.client.SenseiNetworkClient;
import com.senseidb.conf.SenseiConfParams;
import com.senseidb.search.node.SenseiBroker;
import com.senseidb.search.node.SenseiSysBroker;
import com.senseidb.search.req.SenseiRequest;
import com.senseidb.search.req.SenseiRequestCustomizerFactory;
import com.senseidb.search.req.SenseiResult;
import com.senseidb.servlet.SenseiConfigServletContextListener;
import com.senseidb.svc.api.SenseiException;
import org.apache.commons.configuration.Configuration;

import java.util.Comparator;

public class BrokerConfig {
protected String clusterName;
Expand All @@ -55,6 +53,9 @@ public class BrokerConfig {
private SenseiNetworkClient networkClient;
private SenseiBroker senseiBroker;
private SenseiSysBroker senseiSysBroker;
private long brokerTimeout;
private long brokerTimeout;
private SenseiRequestCustomizerFactory requestCustomizerFactory;
protected long brokerTimeout;


Expand Down
Expand Up @@ -65,6 +65,7 @@ public class SenseiRequest implements AbstractSenseiRequest, Cloneable
private Map<String,FacetHandlerInitializerParam> _facetInitParamMap;
private Set<Integer> _partitions;
private boolean _showExplanation;
private boolean _trace;
private static Random _rand = new Random(System.nanoTime());
private String _routeParam;
private String _groupBy; // TODO: Leave here for backward compatible reason, will remove it later.
Expand All @@ -76,7 +77,7 @@ public class SenseiRequest implements AbstractSenseiRequest, Cloneable
private transient Set<String> _selectSet;
private SenseiMapReduce mapReduceFunction;
private List<SenseiError> errors;

public SenseiRequest(){
_facetInitParamMap = new HashMap<String,FacetHandlerInitializerParam>();
_selections=new HashMap<String,BrowseSelection>();
Expand All @@ -86,6 +87,7 @@ public SenseiRequest(){
_fetchStoredValue = false;
_partitions = null;
_showExplanation = false;
_trace = false;
_routeParam = null;
_groupBy = null;
_groupByMulti = null;
Expand All @@ -103,7 +105,8 @@ public Set<String> getTermVectorsToFetch(){
public void setTermVectorsToFetch(Set<String> termVectorsToFetch){
_termVectorsToFetch = termVectorsToFetch;
}
/**

/**
* Get the transaction ID.
* @return the transaction ID.
*/
Expand All @@ -124,11 +127,19 @@ public final void setTid(long tid)

public boolean isShowExplanation() {
return _showExplanation;
}
}

public void setShowExplanation(boolean showExplanation) {
public void setShowExplanation(boolean showExplanation) {
_showExplanation = showExplanation;
}
}

public boolean isTrace() {
return _trace;
}

public void setTrace(boolean trace) {
_trace = trace;
}

public void setPartitions(Set<Integer> partitions){
_partitions = partitions;
Expand Down Expand Up @@ -536,14 +547,19 @@ public SenseiRequest clone() {
for(Entry<String, FacetSpec> facetSpec : this.getFacetSpecs().entrySet()) {
cloneFacetSpecs.put(facetSpec.getKey(), facetSpec.getValue().clone());
}

clone.setFacetSpecs(cloneFacetSpecs);

Map<String, FacetHandlerInitializerParam> cloneFacetInit = new HashMap<String, FacetHandlerInitializerParam>();
for(Entry<String, FacetHandlerInitializerParam> facetInit : this.getFacetHandlerInitParamMap().entrySet()) {
cloneFacetInit.put(facetInit.getKey(), facetInit.getValue()); // TODO consider cloning values as well
}
clone.setFacetHandlerInitParamMap(cloneFacetInit);

clone.setQuery(this.getQuery());
clone.setOffset(this.getOffset());
clone.setCount(this.getCount());
clone.setFetchStoredFields(this.isFetchStoredFields());
clone.setFetchStoredValue(this.isFetchStoredValue());
clone.setFacetHandlerInitParamMap(this.getFacetHandlerInitParamMap());
clone.setPartitions(this.getPartitions());
clone.setShowExplanation(this.isShowExplanation());
clone.setRouteParam(this.getRouteParam());
Expand Down

0 comments on commit 041262a

Please sign in to comment.