Permalink
Browse files

added new config option: THROTTLE_EVENTS_PER_SECOND

git-svn-id: http://developer.marklogic.com/svn/recordloader/trunk@680 e04f4502-82db-0310-b1af-f799f365da79
  • Loading branch information...
Mike
Mike committed May 3, 2007
1 parent 2034aef commit ffe45639fb911bfb786eb7b9678b9312bbe771dd
View
@@ -231,6 +231,11 @@ <h2>Available properties:</h2>
Note that when using standard input, this value is ignored.</td>
</tr>
+<tr><td>THROTTLE_EVENTS_PER_SECOND</td><td>0</td>
+<td>If non-zero, all threads will be throttled
+ to the given number of inserts per second.</td>
+</tr>
+
<tr><td>XML_REPAIR_LEVEL</td><td>NONE</td>
<td>To what degree should XPP3 and MarkLogic Server
compensate for invalid XML?
@@ -1,15 +1,15 @@
# simple, fast MedlineCitationSet loader
-THREADS=1
-#CONNECTION_STRING=user:password@hostname:portnumber
-INPUT_PATH=/space/medline
+THREADS=2
+CONNECTION_STRING=xcc://admin:admin@localhost:9000/training
+INPUT_PATH=/home/mblakele/space/medline
ID_NAME=PMID
# files have root MedlineCitationSet, child MedlineCitation
# so we need not set RECORD_NAME
-OUTPUT_COLLECTIONS=medline
+OUTPUT_COLLECTIONS=medline,2007-02-08
URI_PREFIX=/medline/
URI_SUFFIX=.xml
# place all documents in a new namespace
-DEFAULT_NAMESPACE=
+#DEFAULT_NAMESPACE=
# if this is interrupted, we can resume at this MedlineID value
#START_ID=66000002
#SKIP_EXISTING=true
@@ -63,7 +63,7 @@
private static final String SIMPLE_NAME = RecordLoader.class
.getSimpleName();
- public static final String VERSION = "2007-01-19.1";
+ public static final String VERSION = "2007-05-02.1";
public static final String NAME = RecordLoader.class.getName();
@@ -188,6 +188,10 @@
*/
static final String THREADS_KEY = "THREADS";
+ static final String THROTTLE_KEY = "THROTTLE_EVENTS_PER_SECOND";
+
+ static final String THROTTLE_DEFAULT = "0";
+
/**
*
*/
@@ -308,6 +312,8 @@
private int capacity = DEFAULT_CAPACITY;
+ private double throttledEventsPerSecond;
+
/**
* @param _props
*/
@@ -425,6 +431,12 @@ private void configureOptions() {
zipInputPattern = props.getProperty(ZIP_INPUT_PATTERN_KEY,
ZIP_INPUT_PATTERN_DEFAULT);
logger.fine(ZIP_INPUT_PATTERN_KEY + " = " + zipInputPattern);
+
+ throttledEventsPerSecond = Double.parseDouble(props.getProperty(
+ THROTTLE_KEY, THROTTLE_DEFAULT));
+ if (isThrottled()) {
+ logger.info("throttle = " + throttledEventsPerSecond);
+ }
}
private void configureCollections() {
@@ -723,4 +735,15 @@ public boolean isFileBasedId() {
return idNodeName.equals(ID_NAME_FILENAME);
}
+ /**
+ * @return
+ */
+ public boolean isThrottled() {
+ return throttledEventsPerSecond > 0;
+ }
+
+ public double getThrottledEventsPerSecond() {
+ return throttledEventsPerSecond;
+ }
+
}
@@ -31,7 +31,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.logging.Level;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
@@ -86,8 +85,6 @@
private String currentFileBasename = null;
- private long totalSkipped = 0;
-
private Map collectionMap;
private Configuration config;
@@ -139,6 +136,7 @@ public Loader(Monitor _monitor, URI _uri, Configuration _config)
* @see java.util.concurrent.Callable#call()
*/
public Object call() throws Exception {
+
session = conn.newSession();
logger.fine(Configuration.ID_NAME_KEY + "=" + idName);
@@ -614,10 +612,7 @@ private boolean checkExistingUri(String uri) throws XccException,
}
// ok, must be skipExisting...
// count it and log the message
- totalSkipped++;
- logger.log((totalSkipped % 500 == 0) ? Level.INFO
- : Level.FINE, "skipping " + totalSkipped
- + " existing uri " + uri);
+ monitor.incrementSkipped("existing uri " + uri);
return true;
}
}
@@ -658,8 +653,7 @@ private boolean checkStartId(String id) {
// is this my cow?
if (!startId.equals(id)) {
// don't bother to open the stream: skip this record
- logger.info("skipping record " + (++totalSkipped)
- + " with id " + id + " != " + startId);
+ monitor.incrementSkipped("id " + id + " != " + startId);
return true;
}
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
import java.util.zip.ZipFile;
import com.marklogic.ps.SimpleLogger;
@@ -53,6 +54,8 @@
private Configuration config;
+ private int totalSkipped = 0;
+
private Monitor() {
// avoid no-argument constructors
}
@@ -180,6 +183,25 @@ public synchronized void add(String _uri, TimedEvent _event) {
lastUri = _uri;
}
timer.add(_event);
+
+ // optional throttling
+ if (config.isThrottled()) {
+ logger.finer("throttling rate " + timer.getEventsPerSecond());
+ long sleepMillis;
+ while (config.getThrottledEventsPerSecond() < timer
+ .getEventsPerSecond()) {
+ sleepMillis = (long) (1000 / config
+ .getThrottledEventsPerSecond() - 1000 / timer
+ .getEventsPerSecond());
+ try {
+ Thread.sleep(sleepMillis);
+ } catch (InterruptedException e) {
+ logger.logException("interrupted", e);
+ }
+ }
+ logger.fine("throttled rate " + timer.getEventsPerSecond());
+ }
+
}
/**
@@ -216,4 +238,13 @@ public void setConfig(Configuration _config) {
config = _config;
}
+ /**
+ *
+ */
+ public void incrementSkipped(String message) {
+ logger.log((totalSkipped % 500 == 0) ? Level.INFO
+ : Level.FINE, "skipping " + totalSkipped
+ + " " + message);
+ }
+
}
View
@@ -1,7 +1,9 @@
#!/bin/sh
#
-CP=../lib/recordloader.jar
+BASE=`dirname $0`
+
+CP=$BASE/../lib/recordloader.jar
CP=$CP:$HOME/lib/java/xcc.jar
CP=$CP:$HOME/lib/java/xpp3.jar
@@ -16,6 +18,12 @@ for a in $*; do
fi
done
-$JAVA_HOME/bin/java -cp $CP $VMARGS com.marklogic.ps.RecordLoader $FILES
+if [ -d "$JAVA_HOME" ]; then
+ JAVA=$JAVA_HOME/bin/java
+else
+ JAVA=java
+fi
+
+$JAVA -cp $CP $VMARGS com.marklogic.ps.RecordLoader $FILES
# end recordloader.sh

0 comments on commit ffe4563

Please sign in to comment.