Permalink
Browse files

throttle code

  • Loading branch information...
1 parent 1a3dc66 commit 59b2916c18e995f7004234b183a6ab5d3c89597a Michael Blakeley committed Jun 22, 2010
View
@@ -8,6 +8,9 @@ cd $BASE
pwd
BASE=`basename $PWD`
PAGES=$BASE-gh-pages
+
+git status
+
echo releasing $BASE
(cd src && ant jar) \
&& cp lib/$BASE.jar ../$PAGES/ \
@@ -1,5 +1,5 @@
/*
- * Copyright (c)2005-2008 Mark Logic Corporation
+ * Copyright (c)2005-2010 Mark Logic Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,18 +29,18 @@
*/
public class Timer {
- private static final int BYTES_PER_KILOBYTE = 1024;
+ public static final int BYTES_PER_KILOBYTE = 1024;
- private static final int MILLISECONDS_PER_SECOND = 1000;
+ public static final int MILLISECONDS_PER_SECOND = 1000;
- private static final int NANOSECONDS_PER_MICROSECOND = MILLISECONDS_PER_SECOND;
+ public static final int NANOSECONDS_PER_MICROSECOND = MILLISECONDS_PER_SECOND;
- private static final int MICROSECONDS_PER_MILLISECOND = MILLISECONDS_PER_SECOND;
+ public static final int MICROSECONDS_PER_MILLISECOND = MILLISECONDS_PER_SECOND;
- private static final int NANOSECONDS_PER_MILLISECOND = NANOSECONDS_PER_MICROSECOND
+ public static final int NANOSECONDS_PER_MILLISECOND = NANOSECONDS_PER_MICROSECOND
* MICROSECONDS_PER_MILLISECOND;
- private static final int NANOSECONDS_PER_SECOND = NANOSECONDS_PER_MILLISECOND
+ public static final int NANOSECONDS_PER_SECOND = NANOSECONDS_PER_MILLISECOND
* MILLISECONDS_PER_SECOND;
private long errors = 0;
@@ -101,6 +101,10 @@ public long getBytes() {
return bytes;
}
+ public double getBytesPerSecond() {
+ return bytes / getDurationSeconds();
+ }
+
/**
* @return
*/
@@ -143,6 +143,14 @@
public static final String THREADS_DEFAULT = "1";
+ public static final String THROTTLE_EVENTS_KEY = "THROTTLE_EVENTS_PER_SECOND";
+
+ public static final String THROTTLE_EVENTS_DEFAULT = "0";
+
+ public static final String THROTTLE_BYTES_KEY = "THROTTLE_BYTES_PER_SECOND";
+
+ public static final String THROTTLE_BYTES_DEFAULT = "0";
+
public static final String URI_PREFIX_KEY = "URI_PREFIX";
public static final String URI_SUFFIX_KEY = "URI_SUFFIX";
@@ -183,6 +191,10 @@
protected Long startPosition;
+ protected double throttledEventsPerSecond;
+
+ protected int throttledBytesPerSecond;
+
protected String uriPrefix;
protected String[] outputCollections;
@@ -311,6 +323,8 @@ private void configure() throws XccException, IOException,
configureOutput();
configureTimestamp(properties.getProperty(INPUT_TIMESTAMP_KEY));
+
+ configureThrottling();
// miscellaneous
String startPositionString = properties
@@ -405,6 +419,52 @@ private void configureOutput() throws IOException,
}
}
+ /**
+ *
+ */
+ void configureThrottling() {
+ throttledEventsPerSecond = Double.parseDouble(properties
+ .getProperty(THROTTLE_EVENTS_KEY));
+
+ throttledBytesPerSecond = Integer.parseInt(properties
+ .getProperty(THROTTLE_BYTES_KEY));
+ }
+
+ /**
+ * @param _timestampString
+ * @throws RequestException
+ *
+ */
+ private void configureTimestamp(String _timestampString)
+ throws RequestException {
+ if (null != _timestampString) {
+ Session sess = newInputSession();
+ if (null == sess) {
+ logger.warning("ignoring "
+ + Configuration.INPUT_TIMESTAMP_KEY + "="
+ + _timestampString + " because "
+ + Configuration.INPUT_CONNECTION_STRING_KEY
+ + " is not set.");
+ } else if (_timestampString.startsWith("#")) {
+ // handle special values
+ if (Configuration.INPUT_TIMESTAMP_AUTO
+ .equals(_timestampString)) {
+ // fetch the current timestamp
+ timestamp = sess.getCurrentServerPointInTime();
+ } else {
+ logger.warning("ignoring unknown timestamp "
+ + _timestampString);
+ }
+ } else {
+ timestamp = new BigInteger(_timestampString);
+ }
+
+ if (null != timestamp) {
+ logger.info("using timestamp " + timestamp);
+ }
+ }
+ }
+
/**
* @param _connectionString
* @return
@@ -722,41 +782,6 @@ public String getInputModule() {
return properties.getProperty(INPUT_MODULE_URI_KEY);
}
- /**
- * @param _timestampString
- * @throws RequestException
- *
- */
- private void configureTimestamp(String _timestampString)
- throws RequestException {
- if (null != _timestampString) {
- Session sess = newInputSession();
- if (null == sess) {
- logger.warning("ignoring "
- + Configuration.INPUT_TIMESTAMP_KEY + "="
- + _timestampString + " because "
- + Configuration.INPUT_CONNECTION_STRING_KEY
- + " is not set.");
- } else if (_timestampString.startsWith("#")) {
- // handle special values
- if (Configuration.INPUT_TIMESTAMP_AUTO
- .equals(_timestampString)) {
- // fetch the current timestamp
- timestamp = sess.getCurrentServerPointInTime();
- } else {
- logger.warning("ignoring unknown timestamp "
- + _timestampString);
- }
- } else {
- timestamp = new BigInteger(_timestampString);
- }
-
- if (null != timestamp) {
- logger.info("using timestamp " + timestamp);
- }
- }
- }
-
/**
* @return
*/
@@ -808,4 +833,25 @@ public String getUriSuffix() {
return properties.getProperty(URI_SUFFIX_KEY);
}
+ /**
+ * @return
+ */
+ public boolean isThrottled() {
+ return (throttledEventsPerSecond > 0 || throttledBytesPerSecond > 0);
+ }
+
+ /**
+ * @return
+ */
+ public int getThrottledBytesPerSecond() {
+ return throttledBytesPerSecond;
+ }
+
+ /**
+ * @return
+ */
+ public double getThrottledEventsPerSecond() {
+ return throttledEventsPerSecond;
+ }
+
}
@@ -58,17 +58,20 @@
protected Object taskCountMutex = new Object();
+ protected Configuration config;
+
/**
- * @param _logger
+ * @param _config
* @param _pool
* @param _cs
* @param _fatalErrors
*/
- public Monitor(SimpleLogger _logger, ThreadPoolExecutor _pool,
+ public Monitor(Configuration _config, ThreadPoolExecutor _pool,
CompletionService<TimedEvent[]> _cs, boolean _fatalErrors) {
+ config = _config;
completionService = _cs;
pool = _pool;
- logger = _logger;
+ logger = _config.getLogger();
fatalErrors = _fatalErrors;
}
@@ -129,6 +132,9 @@ protected void monitor() throws ExecutionException {
// try to avoid thread starvation
yield();
+ // TODO check throttle
+ checkThrottle();
+
// check completed tasks
// sometimes this goes so fast that we never leave the loop,
// so progress is never displayed... so limit the number of loops.
@@ -243,4 +249,57 @@ public void setFinalTaskCount(long _count) {
taskCountFinal = true;
}
}
+
+ /**
+ *
+ */
+ private void checkThrottle() {
+ // optional throttling
+ if (!config.isThrottled()) {
+ return;
+ }
+
+ long sleepMillis;
+ double throttledEventsPerSecond = config
+ .getThrottledEventsPerSecond();
+ boolean isEvents = (throttledEventsPerSecond > 0);
+ int throttledBytesPerSecond = isEvents ? 0 : config
+ .getThrottledBytesPerSecond();
+ logger.fine("throttling "
+ + (isEvents
+ // events
+ ? (timer.getEventsPerSecond() + " tps to "
+ + throttledEventsPerSecond + " tps")
+ // bytes
+ : (timer.getBytesPerSecond() + " B/sec to "
+ + throttledBytesPerSecond + " B/sec")));
+ // call the methods every time
+ while ((throttledEventsPerSecond > 0 && (throttledEventsPerSecond < timer
+ .getEventsPerSecond()))
+ || (throttledBytesPerSecond > 0 && (throttledBytesPerSecond < timer
+ .getBytesPerSecond()))) {
+ if (isEvents) {
+ sleepMillis = (long) Math
+ .ceil(Timer.MILLISECONDS_PER_SECOND
+ * ((timer.getEventCount() / throttledEventsPerSecond) - timer
+ .getDurationSeconds()));
+ } else {
+ sleepMillis = (long) Math
+ .ceil(Timer.MILLISECONDS_PER_SECOND
+ * ((timer.getBytes() / throttledBytesPerSecond) - timer
+ .getDurationSeconds()));
+ }
+ sleepMillis = Math.max(sleepMillis, 1);
+ logger.finer("sleeping " + sleepMillis);
+ try {
+ Thread.sleep(sleepMillis);
+ } catch (InterruptedException e) {
+ logger.logException("interrupted", e);
+ }
+ }
+ logger.fine("throttled to "
+ + (isEvents ? (timer.getEventsPerSecond() + " tps")
+ : (timer.getBytesPerSecond() + " B/sec")));
+ }
+
}
@@ -26,12 +26,12 @@
import com.marklogic.xcc.Version;
/**
- * @author Michael Blakeley <michael.blakeley@marklogic.com>
+ * @author Michael Blakeley
*
*/
public class XQSync {
- public static String VERSION = "2010-06-16.1";
+ public static String VERSION = "2010-06-22.1";
private static String versionMessage = "version " + VERSION + " on "
+ System.getProperty("java.version") + " ("
@@ -151,7 +151,7 @@ public void run() {
// to attempt to avoid starvation, run the monitor with higher
// priority than the thread pool will have.
- monitor = new Monitor(logger, pool, completionService,
+ monitor = new Monitor(configuration, pool, completionService,
configuration.isFatalErrors());
monitor.setPriority(1 + Thread.NORM_PRIORITY);
monitor.start();
@@ -275,7 +275,7 @@ private long queueFromInputPackage(String _path) throws IOException,
SyncException {
logger.info(_path);
File file = new File(_path);
-
+
if (!file.exists()) {
throw new IOException("missing expected input package path: "
+ _path);

0 comments on commit 59b2916

Please sign in to comment.