Skip to content
This repository
Browse code

Add support for BigCouch update sequences. (EXPERIMENTAL)

  • Loading branch information...
commit 467a5494c112d5b301f33418a88d879b94702cb9 1 parent 85bf6fa
Robert Newson authored
10 pom.xml
@@ -38,6 +38,11 @@
38 38 <version>1.6</version>
39 39 </dependency>
40 40 <dependency>
  41 + <groupId>commons-codec</groupId>
  42 + <artifactId>commons-codec</artifactId>
  43 + <version>1.5</version>
  44 + </dependency>
  45 + <dependency>
41 46 <groupId>org.json</groupId>
42 47 <artifactId>json</artifactId>
43 48 <version>20090211</version>
@@ -73,6 +78,11 @@
73 78 <version>${tika-version}</version>
74 79 </dependency>
75 80 <dependency>
  81 + <groupId>org.erlang.otp</groupId>
  82 + <artifactId>jinterface</artifactId>
  83 + <version>1.5.3.2</version>
  84 + </dependency>
  85 + <dependency>
76 86 <groupId>org.slf4j</groupId>
77 87 <artifactId>slf4j-log4j12</artifactId>
78 88 <version>1.5.6</version>
47 src/main/java/com/github/rnewson/couchdb/lucene/DatabaseIndexer.java
@@ -63,6 +63,7 @@
63 63 import com.github.rnewson.couchdb.lucene.couchdb.CouchDocument;
64 64 import com.github.rnewson.couchdb.lucene.couchdb.Database;
65 65 import com.github.rnewson.couchdb.lucene.couchdb.DesignDocument;
  66 +import com.github.rnewson.couchdb.lucene.couchdb.UpdateSequence;
66 67 import com.github.rnewson.couchdb.lucene.couchdb.View;
67 68 import com.github.rnewson.couchdb.lucene.util.Analyzers;
68 69 import com.github.rnewson.couchdb.lucene.util.Constants;
@@ -79,7 +80,7 @@
79 80 private String etag;
80 81
81 82 private final Analyzer analyzer;
82   - private long pending_seq;
  83 + private UpdateSequence pending_seq;
83 84 private IndexReader reader;
84 85 private final IndexWriter writer;
85 86 private final Database database;
@@ -161,10 +162,10 @@ private void blockForLatest(final boolean staleOk) throws IOException, JSONExcep
161 162 if (staleOk) {
162 163 return;
163 164 }
164   - final long latest = database.getInfo().getUpdateSequence();
  165 + final UpdateSequence latest = database.getInfo().getUpdateSequence();
165 166 synchronized (this) {
166 167 long timeout = getSearchTimeout();
167   - while (pending_seq < latest) {
  168 + while (pending_seq.isEarlierThan(latest)) {
168 169 try {
169 170 final long start = System.currentTimeMillis();
170 171 wait(timeout);
@@ -179,8 +180,8 @@ private void blockForLatest(final boolean staleOk) throws IOException, JSONExcep
179 180 }
180 181 }
181 182
182   - private synchronized void setPendingSequence(final long newSequence) {
183   - pending_seq = newSequence;
  183 + private synchronized void setPendingSequence(final UpdateSequence seq) {
  184 + pending_seq = seq;
184 185 notifyAll();
185 186 }
186 187
@@ -223,7 +224,7 @@ private static long now() {
223 224
224 225 private final Database database;
225 226
226   - private long ddoc_seq;
  227 + private UpdateSequence ddoc_seq;
227 228
228 229 private long lastCommit;
229 230
@@ -237,7 +238,7 @@ private static long now() {
237 238
238 239 private final File root;
239 240
240   - private long since;
  241 + private UpdateSequence since;
241 242
242 243 private final Map<View, IndexState> states = Collections
243 244 .synchronizedMap(new HashMap<View, IndexState>());
@@ -319,7 +320,7 @@ public Void handleResponse(final HttpResponse response)
319 320 break loop;
320 321 }
321 322
322   - final long seq = json.getLong("seq");
  323 + final UpdateSequence seq = new UpdateSequence(json.getString("seq"));
323 324 final String id = json.getString("id");
324 325 CouchDocument doc;
325 326 if (!json.isNull("doc")) {
@@ -341,7 +342,7 @@ public Void handleResponse(final HttpResponse response)
341 342 }
342 343
343 344 if (id.startsWith("_design")) {
344   - if (seq > ddoc_seq) {
  345 + if (ddoc_seq.isEarlierThan(seq)) {
345 346 logger.info("Exiting due to design document change.");
346 347 break loop;
347 348 }
@@ -358,7 +359,7 @@ public Void handleResponse(final HttpResponse response)
358 359 final View view = entry.getKey();
359 360 final IndexState state = entry.getValue();
360 361
361   - if (seq > state.pending_seq) {
  362 + if (state.pending_seq.isEarlierThan(seq)) {
362 363 final Document[] docs;
363 364 try {
364 365 docs = state.converter.convert(doc, view
@@ -667,9 +668,9 @@ private void commitAll() throws IOException {
667 668 final View view = entry.getKey();
668 669 final IndexState state = entry.getValue();
669 670
670   - if (state.pending_seq > getUpdateSequence(state.writer)) {
  671 + if (getUpdateSequence(state.writer).isEarlierThan(state.pending_seq)) {
671 672 final Map<String, String> userData = new HashMap<String, String>();
672   - userData.put("last_seq", Long.toString(state.pending_seq));
  673 + userData.put("last_seq", state.pending_seq.toString());
673 674 state.writer.commit(userData);
674 675 logger.info(view + " now at update_seq " + state.pending_seq);
675 676 }
@@ -703,22 +704,22 @@ private IndexState getState(final HttpServletRequest req,
703 704 return result;
704 705 }
705 706
706   - private long getUpdateSequence(final Directory dir) throws IOException {
  707 + private UpdateSequence getUpdateSequence(final Directory dir) throws IOException {
707 708 if (!IndexReader.indexExists(dir)) {
708   - return 0L;
  709 + return UpdateSequence.BOTTOM;
709 710 }
710 711 return getUpdateSequence(IndexReader.getCommitUserData(dir));
711 712 }
712 713
713   - private long getUpdateSequence(final IndexWriter writer) throws IOException {
  714 + private UpdateSequence getUpdateSequence(final IndexWriter writer) throws IOException {
714 715 return getUpdateSequence(writer.getDirectory());
715 716 }
716 717
717   - private long getUpdateSequence(final Map<String, String> userData) {
  718 + private UpdateSequence getUpdateSequence(final Map<String, String> userData) {
718 719 if (userData != null && userData.containsKey("last_seq")) {
719   - return Long.parseLong(userData.get("last_seq"));
  720 + return new UpdateSequence(userData.get("last_seq"));
720 721 }
721   - return 0L;
  722 + return UpdateSequence.BOTTOM;
722 723 }
723 724
724 725 private void init() throws IOException, JSONException {
@@ -729,7 +730,7 @@ private void init() throws IOException, JSONException {
729 730 context.setOptimizationLevel(9);
730 731
731 732 this.ddoc_seq = database.getInfo().getUpdateSequence();
732   - this.since = -1L;
  733 + this.since = null;
733 734
734 735 for (final DesignDocument ddoc : database.getAllDesignDocuments()) {
735 736 for (final Entry<String, View> entry : ddoc.getAllViews()
@@ -741,11 +742,13 @@ private void init() throws IOException, JSONException {
741 742 if (!states.containsKey(view)) {
742 743 final Directory dir = FSDirectory.open(viewDir(view, true),
743 744 new SingleInstanceLockFactory());
744   - final long seq = getUpdateSequence(dir);
745   - if (since == -1) {
  745 + final UpdateSequence seq = getUpdateSequence(dir);
  746 + if (since == null) {
  747 + since = seq;
  748 + }
  749 + if (seq.isEarlierThan(since)) {
746 750 since = seq;
747 751 }
748   - since = Math.min(since, seq);
749 752 logger.debug(dir + " bumped since to " + since);
750 753
751 754 final DocumentConverter converter = new DocumentConverter(
2  src/main/java/com/github/rnewson/couchdb/lucene/couchdb/Database.java
@@ -105,7 +105,7 @@ public DatabaseInfo getInfo() throws IOException, JSONException {
105 105 return httpClient.execute(get, handler);
106 106 }
107 107
108   - public HttpUriRequest getChangesRequest(final long since)
  108 + public HttpUriRequest getChangesRequest(final UpdateSequence since)
109 109 throws IOException {
110 110 return new HttpGet(
111 111 url
4 src/main/java/com/github/rnewson/couchdb/lucene/couchdb/DatabaseInfo.java
@@ -11,8 +11,8 @@ public DatabaseInfo(final JSONObject json) {
11 11 this.json = json;
12 12 }
13 13
14   - public long getUpdateSequence() throws JSONException {
15   - return json.getLong("update_seq");
  14 + public UpdateSequence getUpdateSequence() throws JSONException {
  15 + return new UpdateSequence(json.getString("update_seq"));
16 16 }
17 17
18 18 public String getName() throws JSONException {
84 src/main/java/com/github/rnewson/couchdb/lucene/couchdb/UpdateSequence.java
... ... @@ -0,0 +1,84 @@
  1 +package com.github.rnewson.couchdb.lucene.couchdb;
  2 +
  3 +import java.util.HashMap;
  4 +import java.util.Iterator;
  5 +import java.util.Map;
  6 +import java.util.Map.Entry;
  7 +
  8 +import org.apache.commons.codec.binary.Base64;
  9 +
  10 +import com.ericsson.otp.erlang.OtpErlangDecodeException;
  11 +import com.ericsson.otp.erlang.OtpErlangList;
  12 +import com.ericsson.otp.erlang.OtpErlangLong;
  13 +import com.ericsson.otp.erlang.OtpErlangObject;
  14 +import com.ericsson.otp.erlang.OtpErlangTuple;
  15 +import com.ericsson.otp.erlang.OtpInputStream;
  16 +
  17 +public final class UpdateSequence {
  18 +
  19 + public static final UpdateSequence BOTTOM = new UpdateSequence("0");
  20 +
  21 + private long seq;
  22 + private Map<String, Long> vector;
  23 + private final String asString;
  24 +
  25 + public UpdateSequence(final String seq) {
  26 + this.asString = seq;
  27 +
  28 + if (seq.matches("[0-9]+")) {
  29 + this.seq = Long.parseLong(seq);
  30 + return;
  31 + }
  32 +
  33 + if (seq.matches("[0-9]+-[0-9a-zA-Z_-]+")) {
  34 + final String packedSeqs = seq.split("-", 2)[1];
  35 + final byte[] bytes = new Base64(true).decode(packedSeqs);
  36 + final OtpInputStream stream = new OtpInputStream(bytes);
  37 + try {
  38 + final OtpErlangList list = (OtpErlangList) stream.read_any();
  39 + this.vector = new HashMap<String, Long>();
  40 + for (int i = 0, arity = list.arity(); i < arity; i++) {
  41 + final OtpErlangTuple tuple = (OtpErlangTuple) list
  42 + .elementAt(i);
  43 + final OtpErlangObject node = tuple.elementAt(0);
  44 + final OtpErlangObject range = tuple.elementAt(1);
  45 + final OtpErlangLong node_seq = (OtpErlangLong) tuple
  46 + .elementAt(2);
  47 + vector.put(node + "-" + range, node_seq.longValue());
  48 + }
  49 + } catch (final OtpErlangDecodeException e) {
  50 + throw new IllegalArgumentException(seq + " not valid.");
  51 + }
  52 + return;
  53 + }
  54 +
  55 + throw new IllegalArgumentException(seq + " not recognized.");
  56 + }
  57 +
  58 + public boolean isEarlierThan(final UpdateSequence other) {
  59 + if (this == BOTTOM) {
  60 + return true;
  61 + }
  62 +
  63 + if (vector == null && other.vector == null) {
  64 + return this.seq < other.seq;
  65 + } else if (vector != null && other.vector != null) {
  66 + final Iterator<Entry<String, Long>> it = this.vector.entrySet()
  67 + .iterator();
  68 + while (it.hasNext()) {
  69 + final Entry<String, Long> entry = it.next();
  70 + final Long otherValue = other.vector.get(entry.getKey());
  71 + if (otherValue != null && otherValue >= entry.getValue()) {
  72 + return false;
  73 + }
  74 + }
  75 + return true;
  76 + }
  77 + throw new IllegalArgumentException(other + " is not compatible.");
  78 + }
  79 +
  80 + public String toString() {
  81 + return asString;
  82 + }
  83 +
  84 +}

0 comments on commit 467a549

Please sign in to comment.
Something went wrong with that request. Please try again.