Browse files

improve synchronization loop.

  • Loading branch information...
1 parent e99fb95 commit 1870dbf926e023838c9f45f17ca8a874c74e1051 Robert Newson committed Aug 14, 2009
View
4 src/main/java/com/github/rnewson/couchdb/lucene/Config.java
@@ -51,9 +51,7 @@
static final String DB_PASSWORD = System.getProperty("couchdb.password");
- static final int COMMIT_MIN = Integer.getInteger("couchdb.lucene.commit.min", 5 * 1000);
-
- static final int COMMIT_MAX = Integer.getInteger("couchdb.lucene.commit.max", 5 * 60 * 1000);
+ static final int COMMIT_MIN = Integer.getInteger("couchdb.lucene.commit.min", 5000);
static final boolean LUCENE_DEBUG = Boolean.getBoolean("couchdb.lucene.debug");
View
107 src/main/java/com/github/rnewson/couchdb/lucene/Index.java
@@ -18,6 +18,7 @@
import static com.github.rnewson.couchdb.lucene.Utils.docQuery;
import static com.github.rnewson.couchdb.lucene.Utils.token;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import java.io.File;
@@ -49,7 +50,9 @@
static class Indexer implements Runnable {
- private boolean isStale = true;
+ private long staleAt = now();
+
+ private long freshAt = staleAt;
private final Directory dir;
@@ -58,45 +61,35 @@ public Indexer(final Directory dir) {
}
public synchronized boolean isStale() {
- return isStale;
+ return staleAt >= freshAt;
}
- public synchronized void setStale(final boolean isStale) {
- this.isStale = isStale;
+ public synchronized void setStale() {
+ staleAt = now() + leniency();
}
- public synchronized boolean setStale(final boolean expected, final boolean update) {
- if (isStale == expected) {
- isStale = update;
- return true;
- }
- return false;
+ public synchronized void setFresh() {
+ freshAt = now() + leniency();
+ }
+
+ private long leniency() {
+ return MILLISECONDS.toNanos(Config.COMMIT_MIN);
+ }
+
+ private long now() {
+ return System.nanoTime();
}
public void run() {
while (true) {
- if (!isStale()) {
- sleep();
- } else {
- final long commitBy = System.currentTimeMillis() + Config.COMMIT_MAX;
- boolean quiet = false;
- while (!quiet && System.currentTimeMillis() < commitBy) {
- setStale(false);
- sleep();
- quiet = !isStale();
- }
-
- /*
- * Either no update has occurred in the last COMMIT_MIN
- * interval or continual updates have occurred for
- * COMMIT_MAX interval. Either way, index all changes and
- * commit.
- */
+ if (isStale()) {
try {
updateIndex();
} catch (final Throwable t) {
Utils.LOG.warn("Exception while updating index.", t);
}
+ } else {
+ sleep();
}
}
}
@@ -137,7 +130,6 @@ private synchronized void updateIndex() throws IOException {
final String[] dbnames = DB.getAllDatabases();
Arrays.sort(dbnames);
- boolean commit = false;
boolean expunge = false;
final IndexWriter writer = newWriter();
final Progress progress = new Progress();
@@ -158,7 +150,6 @@ private synchronized void updateIndex() throws IOException {
Utils.LOG.info("Database '" + term.text()
+ "' has been deleted, removing all documents from index.");
deleteDatabase(term.text(), progress, writer);
- commit = true;
expunge = true;
}
} while (terms.next());
@@ -200,7 +191,7 @@ private synchronized void updateIndex() throws IOException {
final Rhino rhino = new Rhino(dbname, defaults, fun);
try {
- commit |= updateDatabase(writer, analyzer, sig, dbname, viewname, progress, rhino);
+ updateDatabase(writer, analyzer, sig, dbname, viewname, progress, rhino);
} finally {
rhino.close();
}
@@ -219,46 +210,39 @@ private synchronized void updateIndex() throws IOException {
}
} catch (final Exception e) {
Utils.LOG.error("Error updating index.", e);
- commit = false;
} finally {
- if (commit) {
- progress.save(writer);
- if (expunge) {
- writer.expungeDeletes();
- }
- writer.close();
+ progress.save(writer);
+ if (expunge) {
+ writer.expungeDeletes();
+ }
+ writer.close();
- final IndexReader reader = IndexReader.open(dir);
- try {
- Utils.LOG.info("Committed changes to index (" + reader.numDocs() + " documents in index, "
- + reader.numDeletedDocs() + " deletes).");
- } finally {
- reader.close();
- }
- } else {
- writer.rollback();
+ final IndexReader reader = IndexReader.open(dir);
+ try {
+ Utils.LOG.info("Committed changes to index (" + reader.numDocs() + " documents in index, "
+ + reader.numDeletedDocs() + " deletes).");
+ } finally {
+ reader.close();
}
+ setFresh();
}
}
- private boolean updateDatabase(final IndexWriter writer, final Analyzer analyzer, final String new_sig,
+ private void updateDatabase(final IndexWriter writer, final Analyzer analyzer, final String new_sig,
final String dbname, final String viewname, final Progress progress, final Rhino rhino)
throws HttpException, IOException {
assert rhino != null;
- final long start = System.nanoTime();
+ final long start = now();
final long target_seq = DB.getInfo(dbname).getLong("update_seq");
final String cur_sig = progress.getSignature(viewname);
- boolean result = false;
-
// Reindex the database if sequence is 0 or signature changed.
if (progress.getSeq(viewname) == 0 || cur_sig.equals(new_sig) == false) {
Utils.LOG.info("Indexing " + viewname + " from scratch.");
deleteView(viewname, progress, writer);
progress.update(viewname, new_sig, 0);
- result = true;
}
long update_seq = progress.getSeq(viewname);
@@ -267,7 +251,6 @@ private boolean updateDatabase(final IndexWriter writer, final Analyzer analyzer
if (!obj.has("rows")) {
Utils.LOG.warn("no rows found (" + obj + ").");
- return false;
}
// Process all rows
@@ -287,35 +270,28 @@ private boolean updateDatabase(final IndexWriter writer, final Analyzer analyzer
docs[j].add(token(Config.DB, dbname, false));
docs[j].add(token(Config.VIEW, viewname, false));
docs[j].add(token(Config.ID, docid, true));
-
+
if (Utils.LOG.isTraceEnabled()) {
Utils.LOG.trace("Adding " + docs[j]);
}
writer.addDocument(docs[j], analyzer);
}
-
- result = true;
}
// Deleted document.
if (value != null && value.optBoolean("deleted")) {
writer.deleteDocuments(docQuery(viewname, row.getString("id")));
- result = true;
}
update_seq = row.getLong("key");
}
}
- if (result) {
- progress.update(viewname, new_sig, update_seq);
+ progress.update(viewname, new_sig, update_seq);
- final long duration = System.nanoTime() - start;
- Utils.LOG.info(String.format("%s: index is now at update_seq %,d (took %s).", viewname, update_seq,
- DurationFormatUtils.formatDurationHMS(NANOSECONDS.toMillis(duration))));
- }
-
- return result;
+ final long duration = now() - start;
+ Utils.LOG.debug(String.format("%s: index is now at update_seq %,d (took %s).", viewname, update_seq,
+ DurationFormatUtils.formatDurationHMS(NANOSECONDS.toMillis(duration))));
}
private void deleteView(final String viewname, final Progress progress, final IndexWriter writer)
@@ -368,6 +344,7 @@ public static void main(final String[] args) throws Exception {
Utils.LOG.info("indexer started.");
final Indexer indexer = new Indexer(d);
+ indexer.updateIndex();
final Thread thread = new Thread(indexer, "index");
thread.setDaemon(true);
thread.start();
@@ -377,7 +354,7 @@ public static void main(final String[] args) throws Exception {
final String line = scanner.nextLine();
final JSONObject obj = JSONObject.fromObject(line);
if (obj.has("type") && obj.has("db")) {
- indexer.setStale(true);
+ indexer.setStale();
}
}
Utils.LOG.info("indexer stopped.");

0 comments on commit 1870dbf

Please sign in to comment.