Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -768,18 +768,16 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
CompletableFuture<PushEntryResponse> future = new TimeoutFuture<>(1000);
switch (request.getType()) {
case APPEND:
if (dLedgerConfig.isEnableBatchPush()) {
if (request.isBatch()) {
PreConditions.check(request.getBatchEntry() != null && request.getCount() > 0, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
long firstIndex = request.getFirstEntryIndex();
writeRequestMap.put(firstIndex, new Pair<>(request, future));
} else {
PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
long index = request.getEntry().getIndex();
Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> old = writeRequestMap.putIfAbsent(index, new Pair<>(request, future));
if (old != null) {
logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo());
future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode()));
}
}
long index = request.getFirstEntryIndex();
Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> old = writeRequestMap.putIfAbsent(index, new Pair<>(request, future));
if (old != null) {
logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo());
future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode()));
}
break;
case COMMIT:
Expand Down Expand Up @@ -899,57 +897,19 @@ private void handleDoBatchAppend(long writeIndex, PushEntryRequest request,
}

private void checkAppendFuture(long endIndex) {
long minFastForwardIndex = Long.MAX_VALUE;
for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair : writeRequestMap.values()) {
long index = pair.getKey().getEntry().getIndex();
//Fall behind
if (index <= endIndex) {
try {
DLedgerEntry local = dLedgerStore.get(index);
PreConditions.check(pair.getKey().getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE);
pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));
logger.warn("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex);
} catch (Throwable t) {
logger.error("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex, t);
pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
}
writeRequestMap.remove(index);
continue;
}
//Just OK
if (index == endIndex + 1) {
//The next entry is coming, just return
return;
}
//Fast forward
TimeoutFuture<PushEntryResponse> future = (TimeoutFuture<PushEntryResponse>) pair.getValue();
if (!future.isTimeOut()) {
continue;
}
if (index < minFastForwardIndex) {
minFastForwardIndex = index;
}
}
if (minFastForwardIndex == Long.MAX_VALUE) {
return;
}
Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.get(minFastForwardIndex);
if (pair == null) {
return;
}
logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", endIndex, minFastForwardIndex);
pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
}

private void checkBatchAppendFuture(long endIndex) {
long minFastForwardIndex = Long.MAX_VALUE;
for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair : writeRequestMap.values()) {
long firstEntryIndex = pair.getKey().getFirstEntryIndex();
long lastEntryIndex = pair.getKey().getLastEntryIndex();
//Fall behind
if (lastEntryIndex <= endIndex) {
try {
for (DLedgerEntry dLedgerEntry : pair.getKey().getBatchEntry()) {
if (pair.getKey().isBatch()) {
for (DLedgerEntry dLedgerEntry : pair.getKey().getBatchEntry()) {
PreConditions.check(dLedgerEntry.equals(dLedgerStore.get(dLedgerEntry.getIndex())), DLedgerResponseCode.INCONSISTENT_STATE);
}
} else {
DLedgerEntry dLedgerEntry = pair.getKey().getEntry();
PreConditions.check(dLedgerEntry.equals(dLedgerStore.get(dLedgerEntry.getIndex())), DLedgerResponseCode.INCONSISTENT_STATE);
}
pair.getValue().complete(buildBatchAppendResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));
Expand Down Expand Up @@ -996,11 +956,8 @@ private void checkAbnormalFuture(long endIndex) {
if (writeRequestMap.isEmpty()) {
return;
}
if (dLedgerConfig.isEnableBatchPush()) {
checkBatchAppendFuture(endIndex);
} else {
checkAppendFuture(endIndex);
}

checkAppendFuture(endIndex);
}

@Override
Expand Down Expand Up @@ -1035,7 +992,7 @@ public void doWork() {
return;
}
PushEntryRequest request = pair.getKey();
if (dLedgerConfig.isEnableBatchPush()) {
if (request.isBatch()) {
handleDoBatchAppend(nextIndex, request, pair.getValue());
} else {
handleDoAppend(nextIndex, request, pair.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public void addEntry(DLedgerEntry entry) {
public long getFirstEntryIndex() {
if (!batchEntry.isEmpty()) {
return batchEntry.get(0).getIndex();
} else if (entry != null) {
return entry.getIndex();
} else {
return -1;
}
Expand All @@ -74,6 +76,8 @@ public long getFirstEntryIndex() {
public long getLastEntryIndex() {
if (!batchEntry.isEmpty()) {
return batchEntry.get(batchEntry.size() - 1).getIndex();
} else if (entry != null) {
return entry.getIndex();
} else {
return -1;
}
Expand All @@ -96,6 +100,10 @@ public void clear() {
totalSize = 0;
}

public boolean isBatch() {
return !batchEntry.isEmpty();
}

public enum Type {
APPEND,
COMMIT,
Expand Down