Skip to content

Commit

Permalink
ISSUE apache#550: add readLastAddConfirmedAndEntry in ReadHandle for …
Browse files Browse the repository at this point in the history
…long poll read

Descriptions of the changes in this PR:
1, add a class LastAddConfirmedAndEntry and metnod readLastAddConfirmedAndEntry() in ReadHandle;
2, add implementation for readLastAddConfirmedAndEntry in LedgerHandler;
3, add testcase in BookKeeperApiTest;
4, remove un-used imports, break down long lines, fix wrong comments.

Author: Jia Zhai <zhaijia@apache.org>

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <sijie@apache.org>, Venkateswararao Jujjuri (JV) <None>

This closes apache#729 from zhaijack/issue-550, closes apache#550
  • Loading branch information
jiazhai authored and Philip Su committed Dec 8, 2017
1 parent 625334f commit 442e6d5
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@
import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmed;
import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmedAndEntry;
import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadResult;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCloseCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadLastConfirmedCallback;
import org.apache.bookkeeper.client.api.WriteAdvHandle;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
Expand Down Expand Up @@ -965,7 +965,7 @@ public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData dat
/**
* Obtains asynchronously the last confirmed write from a quorum of bookies.
* It is similar as
* {@link #asyncTryReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback, Object)},
* {@link #asyncReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback, Object)},
* but it doesn't wait all the responses from the quorum. It would callback
* immediately if it received a LAC which is larger than current LAC.
*
Expand Down Expand Up @@ -1008,7 +1008,7 @@ public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData dat
}

/**
* @{@inheritDoc }
* {@inheritDoc}
*/
@Override
public CompletableFuture<Long> tryReadLastAddConfirmed() {
Expand All @@ -1018,7 +1018,7 @@ public CompletableFuture<Long> tryReadLastAddConfirmed() {
}

/**
* @{@inheritDoc }
* {@inheritDoc}
*/
@Override
public CompletableFuture<Long> readLastAddConfirmed() {
Expand All @@ -1027,6 +1027,18 @@ public CompletableFuture<Long> readLastAddConfirmed() {
return result;
}

/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntry(long entryId,
long timeOutInMillis,
boolean parallel) {
FutureReadLastConfirmedAndEntry result = new FutureReadLastConfirmedAndEntry();
asyncReadLastConfirmedAndEntry(entryId, timeOutInMillis, parallel, result, null);
return result;
}

/**
* Asynchronous read next entry and the latest last add confirmed.
* If the next entryId is less than known last add confirmed, the call will read next entry directly.
Expand Down Expand Up @@ -1083,7 +1095,8 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
return;
}
// wait for entry <i>entryId</i>
ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback innercb = new ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback() {
ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback innercb =
new ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback() {
AtomicBoolean completed = new AtomicBoolean(false);
@Override
public void readLastConfirmedAndEntryComplete(int rc, long lastAddConfirmed, LedgerEntry entry) {
Expand All @@ -1098,7 +1111,13 @@ public void readLastConfirmedAndEntryComplete(int rc, long lastAddConfirmed, Led
}
}
};
new ReadLastConfirmedAndEntryOp(this, innercb, entryId - 1, timeOutInMillis, bk.getScheduler()).parallelRead(parallel).initiate();
new ReadLastConfirmedAndEntryOp(this,
innercb,
entryId - 1,
timeOutInMillis,
bk.getScheduler())
.parallelRead(parallel)
.initiate();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.apache.bookkeeper.client.LedgerHandle.LOG;
import org.apache.bookkeeper.client.api.Handle;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LastConfirmedAndEntryImpl;

/**
* Utility for callbacks
*
*/
@Slf4j
class SyncCallbackUtils {

/**
Expand Down Expand Up @@ -182,10 +184,10 @@ static class LastAddConfirmedCallback implements AsyncCallback.AddLacCallback {
@Override
public void addLacComplete(int rc, LedgerHandle lh, Object ctx) {
if (rc != BKException.Code.OK) {
LOG.warn("LastAddConfirmedUpdate failed: {} ", BKException.getMessage(rc));
log.warn("LastAddConfirmedUpdate failed: {} ", BKException.getMessage(rc));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Callback LAC Updated for: {} ", lh.getId());
if (log.isDebugEnabled()) {
log.debug("Callback LAC Updated for: {} ", lh.getId());
}
}
}
Expand Down Expand Up @@ -266,7 +268,8 @@ public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) {
}
}

static class FutureReadLastConfirmed extends CompletableFuture<Long> implements AsyncCallback.ReadLastConfirmedCallback {
static class FutureReadLastConfirmed extends CompletableFuture<Long>
implements AsyncCallback.ReadLastConfirmedCallback {

@Override
public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
Expand Down Expand Up @@ -312,4 +315,14 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
}
}

static class FutureReadLastConfirmedAndEntry
extends CompletableFuture<LastConfirmedAndEntry> implements AsyncCallback.ReadLastConfirmedAndEntryCallback {

@Override
public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) {
LastConfirmedAndEntry result = new LastConfirmedAndEntryImpl(lastConfirmed, entry);
finish(rc, result, this);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client.api;

/**
* This contains LastAddConfirmed entryId and a LedgerEntry wanted to read.
* It is used for readLastAddConfirmedAndEntry.
*/
public interface LastConfirmedAndEntry {

/**
* Gets LastAddConfirmed entryId.
*
* @return the LastAddConfirmed
*/
Long getLastAddConfirmed();

/**
* Whether this entity contains an entry.
*
* @return true if Entry not null
*/
boolean hasEntry();

/**
* Gets wanted LedgerEntry.
*
* @return the LedgerEntry
*/
LedgerEntry getEntry();

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,23 @@ public interface ReadHandle extends Handle {
*/
long getLength();

/**
* Asynchronous read specific entry and the latest last add confirmed.
* If the next entryId is less than known last add confirmed, the call will read next entry directly.
* If the next entryId is ahead of known last add confirmed, the call will issue a long poll read
* to wait for the next entry <i>entryId</i>.
*
* @param entryId
* next entry id to read
* @param timeOutInMillis
* timeout period to wait for the entry id to be available (for long poll only)
* if timeout for get the entry, it will return null entry.
* @param parallel
* whether to issue the long poll reads in parallel
* @return an handle to the result of the operation
*/
CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntry(long entryId,
long timeOutInMillis,
boolean parallel);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client.impl;

import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntry;

/**
* This contains LastAddConfirmed entryId and a LedgerEntry wanted to read.
* It is used for readLastAddConfirmedAndEntry.
*/
public class LastConfirmedAndEntryImpl implements LastConfirmedAndEntry {

private final Long lac;
private final LedgerEntry entry;

public LastConfirmedAndEntryImpl(Long lac, LedgerEntry entry) {
this.lac = lac;
this.entry = entry;
}

/**
* {@inheritDoc}
*/
@Override
public Long getLastAddConfirmed() {
return lac;
}

/**
* {@inheritDoc}
*/
@Override
public boolean hasEntry() {
return entry != null;
}

/**
* {@inheritDoc}
*/
@Override
public LedgerEntry getEntry() {
return entry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ public void testOpenLedgerRead() throws Exception {
assertEquals(2, result(reader.tryReadLastAddConfirmed()).intValue());
checkEntries(result(reader.read(0, reader.getLastAddConfirmed())), data);
checkEntries(result(reader.readUnconfirmed(0, reader.getLastAddConfirmed())), data);

// test readLastAddConfirmedAndEntry
LastConfirmedAndEntry lastConfirmedAndEntry =
result(reader.readLastAddConfirmedAndEntry(0, 999, false));
assertEquals(2, lastConfirmedAndEntry.getLastAddConfirmed().intValue());
assertArrayEquals(data, lastConfirmedAndEntry.getEntry().getEntry());
}
}

Expand Down

0 comments on commit 442e6d5

Please sign in to comment.