diff --git a/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index 1bc04bc6..cc55145e 100644 --- a/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -768,18 +768,16 @@ public CompletableFuture handlePush(PushEntryRequest request) CompletableFuture future = new TimeoutFuture<>(1000); switch (request.getType()) { case APPEND: - if (dLedgerConfig.isEnableBatchPush()) { + if (request.isBatch()) { PreConditions.check(request.getBatchEntry() != null && request.getCount() > 0, DLedgerResponseCode.UNEXPECTED_ARGUMENT); - long firstIndex = request.getFirstEntryIndex(); - writeRequestMap.put(firstIndex, new Pair<>(request, future)); } else { PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT); - long index = request.getEntry().getIndex(); - Pair> old = writeRequestMap.putIfAbsent(index, new Pair<>(request, future)); - if (old != null) { - logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo()); - future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode())); - } + } + long index = request.getFirstEntryIndex(); + Pair> old = writeRequestMap.putIfAbsent(index, new Pair<>(request, future)); + if (old != null) { + logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo()); + future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode())); } break; case COMMIT: @@ -899,49 +897,6 @@ private void handleDoBatchAppend(long writeIndex, PushEntryRequest request, } private void checkAppendFuture(long endIndex) { - long minFastForwardIndex = Long.MAX_VALUE; - for (Pair> pair : writeRequestMap.values()) { - long index = pair.getKey().getEntry().getIndex(); - //Fall behind - if (index <= endIndex) { - try { - DLedgerEntry local = dLedgerStore.get(index); - PreConditions.check(pair.getKey().getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE); - pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode())); - logger.warn("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex); - } catch (Throwable t) { - logger.error("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex, t); - pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode())); - } - writeRequestMap.remove(index); - continue; - } - //Just OK - if (index == endIndex + 1) { - //The next entry is coming, just return - return; - } - //Fast forward - TimeoutFuture future = (TimeoutFuture) pair.getValue(); - if (!future.isTimeOut()) { - continue; - } - if (index < minFastForwardIndex) { - minFastForwardIndex = index; - } - } - if (minFastForwardIndex == Long.MAX_VALUE) { - return; - } - Pair> pair = writeRequestMap.get(minFastForwardIndex); - if (pair == null) { - return; - } - logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", endIndex, minFastForwardIndex); - pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode())); - } - - private void checkBatchAppendFuture(long endIndex) { long minFastForwardIndex = Long.MAX_VALUE; for (Pair> pair : writeRequestMap.values()) { long firstEntryIndex = pair.getKey().getFirstEntryIndex(); @@ -949,7 +904,12 @@ private void checkBatchAppendFuture(long endIndex) { //Fall behind if (lastEntryIndex <= endIndex) { try { - for (DLedgerEntry dLedgerEntry : pair.getKey().getBatchEntry()) { + if (pair.getKey().isBatch()) { + for (DLedgerEntry dLedgerEntry : pair.getKey().getBatchEntry()) { + PreConditions.check(dLedgerEntry.equals(dLedgerStore.get(dLedgerEntry.getIndex())), DLedgerResponseCode.INCONSISTENT_STATE); + } + } else { + DLedgerEntry dLedgerEntry = pair.getKey().getEntry(); PreConditions.check(dLedgerEntry.equals(dLedgerStore.get(dLedgerEntry.getIndex())), DLedgerResponseCode.INCONSISTENT_STATE); } pair.getValue().complete(buildBatchAppendResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode())); @@ -996,11 +956,8 @@ private void checkAbnormalFuture(long endIndex) { if (writeRequestMap.isEmpty()) { return; } - if (dLedgerConfig.isEnableBatchPush()) { - checkBatchAppendFuture(endIndex); - } else { - checkAppendFuture(endIndex); - } + + checkAppendFuture(endIndex); } @Override @@ -1035,7 +992,7 @@ public void doWork() { return; } PushEntryRequest request = pair.getKey(); - if (dLedgerConfig.isEnableBatchPush()) { + if (request.isBatch()) { handleDoBatchAppend(nextIndex, request, pair.getValue()); } else { handleDoAppend(nextIndex, request, pair.getValue()); diff --git a/src/main/java/io/openmessaging/storage/dledger/protocol/PushEntryRequest.java b/src/main/java/io/openmessaging/storage/dledger/protocol/PushEntryRequest.java index aa7b25b1..ccf562d9 100644 --- a/src/main/java/io/openmessaging/storage/dledger/protocol/PushEntryRequest.java +++ b/src/main/java/io/openmessaging/storage/dledger/protocol/PushEntryRequest.java @@ -66,6 +66,8 @@ public void addEntry(DLedgerEntry entry) { public long getFirstEntryIndex() { if (!batchEntry.isEmpty()) { return batchEntry.get(0).getIndex(); + } else if (entry != null) { + return entry.getIndex(); } else { return -1; } @@ -74,6 +76,8 @@ public long getFirstEntryIndex() { public long getLastEntryIndex() { if (!batchEntry.isEmpty()) { return batchEntry.get(batchEntry.size() - 1).getIndex(); + } else if (entry != null) { + return entry.getIndex(); } else { return -1; } @@ -96,6 +100,10 @@ public void clear() { totalSize = 0; } + public boolean isBatch() { + return !batchEntry.isEmpty(); + } + public enum Type { APPEND, COMMIT,