Permalink
Browse files

Merge pull request #37 from searchify/maxqueuelength

Adding max_search_queue config file option to limit number of threads waiting to search
  • Loading branch information...
2 parents 08cbf30 + b6c8248 commit f2354fe9db43786126e304d12aae4322ae7b98b1 @santip santip committed Apr 19, 2012
@@ -113,6 +113,7 @@
private static final int DEFAULT_BASE_PORT = 7910;
private static final int DEFAULT_RTI_SIZE = 1000;
private static final int DEFAULT_BDB_CACHE = 100;
+ private static final int DEFAULT_MAX_SEARCH_QUEUE_LENGTH = 100;
public static enum SuggestValues { NO, QUERIES, DOCUMENTS};
public static enum StorageValues { NO, BDB, RAM, CASSANDRA };
@@ -663,7 +664,13 @@ public static void main(String[] args) throws IOException{
searcher = new DidYouMeanSearcher(searcher, dym);
}
- searcher = new TrafficLimitingSearcher(searcher);
+ int maxSearchQueueLength = DEFAULT_MAX_SEARCH_QUEUE_LENGTH;
+ if (configuration.containsKey("max_search_queue")) {
+ maxSearchQueueLength = ((Long) configuration.get("max_search_queue")).intValue();
+ logger.info("Using max_search_queue length: " + maxSearchQueueLength);
+ }
+
+ searcher = new TrafficLimitingSearcher(searcher, maxSearchQueueLength);
Runtime.getRuntime().addShutdownHook(new ShutdownThread(indexer));
new SearcherServer(searcher, ie.getParser(), ie.boostsManager, ie.scorer, basePort + 2).start();
@@ -27,46 +27,76 @@
public class TrafficLimitingSearcher extends AbstractDocumentSearcher {
- private static final Logger logger = Logger.getLogger(Execute.whoAmI());
+ private static final Logger logger = Logger.getLogger(Execute.whoAmI());
private static final int MAX_NUMBER_OF_PARALLEL_REQUESTS = 3;
+ private static final int DEFAULT_MAX_SEARCH_QUEUE_LENGTH = Integer.MAX_VALUE;
+ private final int maxSearchQueueLength;
private final DocumentSearcher delegate;
private final Semaphore semaphore;
public TrafficLimitingSearcher(DocumentSearcher searcher) {
+ this(searcher, DEFAULT_MAX_SEARCH_QUEUE_LENGTH);
+ }
+
+ /**
+ * @param searcher the delegate DocumentSearcher
+ * @param maxSearchQueueLength max allowed search queue length, or 0 for no waiters allowed
+ */
+ public TrafficLimitingSearcher(DocumentSearcher searcher, int maxSearchQueueLength) {
Preconditions.checkNotNull(searcher);
+ Preconditions.checkArgument(maxSearchQueueLength >= 0);
this.delegate = searcher;
- semaphore = new Semaphore(MAX_NUMBER_OF_PARALLEL_REQUESTS);
+ this.maxSearchQueueLength = maxSearchQueueLength;
+ semaphore = new Semaphore(MAX_NUMBER_OF_PARALLEL_REQUESTS, true);
}
-
@Override
public SearchResults search(Query query, int start, int limit, int scoringFunctionIndex, Map<String, String> extraParameters) throws InterruptedException {
- // call delegate searcher
+ // call delegate searcher
try {
+ int queueLen = semaphore.getQueueLength();
+ if (queueLen >= maxSearchQueueLength) {
+ logger.warn("Too many waiting to search, queue length = " + queueLen + ", returning without searching");
+ throw new InterruptedException("Too many concurrent searches");
+ }
+ if (queueLen > 0) {
+ logger.warn("Concurrent searches queue length is " + queueLen + ", will wait");
+ }
+ // consider adding a timeout to this call to semaphore.acquire()
+ semaphore.acquire();
try {
- if (!semaphore.tryAcquire()) {
- logger.warn("Too many concurrent searches. Will wait.");
- semaphore.acquire();
- }
- } catch (InterruptedException e) {}
- return this.delegate.search(query, start, limit, scoringFunctionIndex, extraParameters);
- } finally {
- semaphore.release();
+ return this.delegate.search(query, start, limit, scoringFunctionIndex, extraParameters);
+ } finally {
+ semaphore.release();
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Throwing InterruptedException: " + e.getMessage());
+ throw e;
}
}
@Override
public int countMatches(Query query) throws InterruptedException {
try {
+ int queueLen = semaphore.getQueueLength();
+ if (queueLen >= maxSearchQueueLength) {
+ logger.warn("Too many waiting to search, queue length = " + queueLen + ", returning without searching");
+ throw new InterruptedException("Too many concurrent searches");
+ }
+ if (queueLen > 0) {
+ logger.warn("Concurrent searches queue length is " + queueLen + ", will wait");
+ }
+ // consider adding a timeout to this call to semaphore.acquire()
+ semaphore.acquire();
try {
- if (!semaphore.tryAcquire()) {
- logger.warn("Too many concurrent searches. Will wait.");
- semaphore.acquire();
- }
- } catch (InterruptedException e) {}
- return this.delegate.countMatches(query);
- } finally {
- semaphore.release();
+ return this.delegate.countMatches(query);
+ } finally {
+ semaphore.release();
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Throwing InterruptedException: " + e.getMessage());
+ throw e;
}
}
}
+
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2011 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+package com.flaptor.indextank.search;
+
+import com.flaptor.indextank.IndexTankTestCase;
+import com.flaptor.indextank.query.Query;
+import com.flaptor.indextank.query.TermQuery;
+import com.flaptor.util.TestInfo;
+import com.google.common.collect.Lists;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static com.flaptor.util.TestInfo.TestType.UNIT;
+
+public class TrafficLimitingSearcherTest extends IndexTankTestCase {
+ private TrafficLimitingSearcher sleepSearcher;
+ private TrafficLimitingSearcher fastSearcher;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ this.sleepSearcher = new TrafficLimitingSearcher(new SleepingSearcher(3000), 3);
+ this.fastSearcher = new TrafficLimitingSearcher(new SleepingSearcher(0), 3);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ private void sleep(int millis) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(millis);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @TestInfo(testType=UNIT)
+ public void testQueueTooLong() {
+ // start 3 that can run in parallel, and sleep to let them start
+ new Thread(new RunSearch(sleepSearcher)).start();
+ sleep(100);
+ new Thread(new RunSearch(sleepSearcher)).start();
+ sleep(100);
+ new Thread(new RunSearch(sleepSearcher)).start();
+ sleep(100);
+ // then start 3 to fill up the queue
+ new Thread(new RunSearch(sleepSearcher)).start();
+ sleep(100);
+ new Thread(new RunSearch(sleepSearcher)).start();
+ sleep(100);
+ new Thread(new RunSearch(sleepSearcher)).start();
+ sleep(100);
+ // now start one that should throw an exception because the queue is at max length
+ try {
+ final Query query = new Query(new TermQuery("text", "nada"), null, null);
+ sleepSearcher.search(query, 0, 10, 0);
+ fail("Should have thrown InterruptedException");
+ } catch (InterruptedException e) {
+ System.out.println("InterruptedException thrown, success");
+ }
+ }
+
+ @TestInfo(testType=UNIT)
+ public void testNonConcurrentSearching() throws InterruptedException {
+ // make sure serial searches don't cause an InterruptedException
+ for (int i = 0; i < 100; i++) {
+ final Query query = new Query(new TermQuery("text", "nada"), null, null);
+ fastSearcher.search(query, 0, 10, 0);
+ }
+ }
+
+ @TestInfo(testType=UNIT)
+ public void testConstructor() {
+ new TrafficLimitingSearcher(sleepSearcher, 0);
+ try {
+ new TrafficLimitingSearcher(sleepSearcher, -1);
+ fail("Constructor should not allow negative max queue length");
+ } catch (IllegalArgumentException iae) {
+ // pass
+ }
+ }
+
+ private static class RunSearch implements Runnable {
+ final DocumentSearcher searcher;
+
+ private RunSearch(DocumentSearcher searcher) {
+ this.searcher = searcher;
+ }
+
+ @Override
+ public void run() {
+ try {
+ final Query query = new Query(new TermQuery("text","hola"),null,null);
+ searcher.search(query, 0, 10, 0);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private class SleepingSearcher extends AbstractDocumentSearcher {
+ private int sleepTime = 3000;
+
+ private SleepingSearcher(int sleepTime) {
+ this.sleepTime = sleepTime;
+ }
+
+ @Override
+ public SearchResults search(Query query, int start, int limit, int scoringFunctionIndex, Map<String, String> extraParameters) throws InterruptedException {
+ sleep(sleepTime);
+ return new SearchResults(Lists.<SearchResult>newArrayList(), 0, null);
+ }
+
+ @Override
+ public int countMatches(Query query) throws InterruptedException {
+ sleep(sleepTime);
+ return 0;
+ }
+ }
+}
+

0 comments on commit f2354fe

Please sign in to comment.