Permalink
Browse files

Merge pull request #5 from dgomezferro/master

Fix state recovery from BK
  • Loading branch information...
fpj committed Apr 13, 2012
2 parents 4c822f7 + 90440d3 commit 79560783cb94551bd113bba99a666b2ed628891e
@@ -213,7 +213,7 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entri
LedgerEntry le = entries.nextElement();
lp.execute(ByteBuffer.wrap(le.getEntry()));
- if(le.getEntryId() == lh.getLastAddConfirmed()){
+ if(le.getEntryId() == 0){
((BookKeeperStateBuilder.Context) ctx).setState(lp.getState());
}
}
@@ -344,8 +344,8 @@ public void openComplete(int rc, LedgerHandle lh, Object ctx){
((BookKeeperStateBuilder.Context) ctx).setState(null);
} else {
long counter = lh.getLastAddConfirmed();
- while(counter > 0){
- long nextBatch = Math.max(counter - BKREADBATCHSIZE, 0);
+ while(counter >= 0){
+ long nextBatch = Math.max(counter - BKREADBATCHSIZE + 1, 0);
lh.asyncReadEntries(nextBatch, counter, new LoggerExecutor(), ctx);
counter -= BKREADBATCHSIZE;
}
@@ -16,7 +16,10 @@
package com.yahoo.omid.tso;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.number.OrderingComparison.greaterThan;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import org.apache.commons.logging.Log;
@@ -70,8 +73,40 @@ public void testCommit() throws Exception {
assertTrue(cr2.committed);
assertTrue(cr2.commitTimestamp > tr2.timestamp);
assertEquals(tr2.timestamp, cr2.startTimestamp);
- assertTrue(tr2.timestamp > tr1.timestamp);
+ assertThat(tr2.timestamp, is(greaterThan(tr1.timestamp)));
}
+
+ @Test
+ public void testBigLog() throws Exception {
+ clientHandler.sendMessage(new TimestampRequest());
+ clientHandler.receiveBootstrap();
+ clientHandler.receiveMessage(TimestampResponse.class);
+
+ clientHandler.sendMessage(new TimestampRequest());
+ for (int i = 0; i < 10000; ++i) {
+ Object msg;
+ while (!((msg = clientHandler.receiveMessage()) instanceof TimestampResponse))
+ // iterate until we get a TimestampResponse
+ ;
+
+ TimestampResponse tr1 = (TimestampResponse) msg;
+
+ clientHandler.sendMessage(new CommitRequest(tr1.timestamp, new RowKey[] { r1, r2 }));
+ clientHandler.receiveMessage(CommitResponse.class);
+ clientHandler.sendMessage(new TimestampRequest());
+ }
+ clientHandler.clearMessages();
+
+ LOG.info("Going to shut down TSO");
+ teardownTSO();
+
+ LOG.info("Going to restart TSO");
+ setupTSO();
+
+ clientHandler.sendMessage(new TimestampRequest());
+ clientHandler.receiveBootstrap();
+ clientHandler.receiveMessage(TimestampResponse.class);
+ }
}

0 comments on commit 7956078

Please sign in to comment.