From 74838cc0a1107676d6224bdee3abad0c03aa848a Mon Sep 17 00:00:00 2001 From: Francis Avila Date: Tue, 17 Dec 2019 09:21:43 -0600 Subject: [PATCH] Try to fix memcached corrupted uploads If a memcached server responds to an op before the write of the op is completed, we can no longer trust the parser read state of the server and we should both abandon the op and reconnect to the server. For example, we may be halfway through a length+blob write. If the server rejects the op before we finish writing, we cannot simply abandon the write and start writing the next op, as those bytes may end up being part of the payload of the failed op! This situation has been observed in production (i.e. corrupted memcached values with binary op headers in them) and this is a best-guess at how to prevent it, but it is difficult to reproduce without mocking a server which produces abrupt failures. --- .../java/net/spy/memcached/FillWriteBufferStatus.java | 9 ++++++++- .../java/net/spy/memcached/MemcachedConnection.java | 6 ++++++ .../spy/memcached/protocol/TCPMemcachedNodeImpl.java | 11 ++++++++++- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/main/java/net/spy/memcached/FillWriteBufferStatus.java b/src/main/java/net/spy/memcached/FillWriteBufferStatus.java index 87b44e4d1..6f235a6a4 100644 --- a/src/main/java/net/spy/memcached/FillWriteBufferStatus.java +++ b/src/main/java/net/spy/memcached/FillWriteBufferStatus.java @@ -52,7 +52,11 @@ public enum FillWriteBufferStatus { */ OP_STATUS_IS_WRITE_QUEUED, OP_STATUS_IS_READING, - OP_STATUS_IS_RETRY + OP_STATUS_IS_RETRY, + /** + * The server completed a write operation but the + */ + OP_STATUS_IS_INTERRUPTED_BY_COMPLETION ; public static FillWriteBufferStatus forOperationState(final OperationState opState) { @@ -74,4 +78,7 @@ public static FillWriteBufferStatus forOperationState(final OperationState opSta public boolean isSuccess() { return this == SUCCESS; } + public boolean needsReconnect() { + return this == OP_STATUS_IS_INTERRUPTED_BY_COMPLETION; + } } diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index 5b0aff4fa..037e7b3fd 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -810,6 +810,9 @@ public void complete() { */ private void handleWrites(final MemcachedNode node) throws IOException { FillWriteBufferStatus bufferFilledStatus = node.fillWriteBuffer(shouldOptimize); + if (bufferFilledStatus.needsReconnect()) { + throw new IOException("Reconnecting because write was interrupted by completion"); + } boolean canWriteMore = node.getBytesRemainingToWrite() > 0; while (canWriteMore && bufferFilledStatus.isSuccess()) { int wrote = node.writeSome(); @@ -817,6 +820,9 @@ private void handleWrites(final MemcachedNode node) throws IOException { bufferFilledStatus = node.fillWriteBuffer(shouldOptimize); canWriteMore = wrote > 0 && node.getBytesRemainingToWrite() > 0; } + if (bufferFilledStatus.needsReconnect()) { + throw new IOException("Reconnecting because write was interrupted by completion"); + } } /** diff --git a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java index 8c6368804..498e400f1 100644 --- a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java +++ b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java @@ -199,8 +199,17 @@ public final FillWriteBufferStatus fillWriteBuffer(boolean shouldOptimize) { final OperationState oState = o.getState(); ByteBuffer obuf = o.getBuffer(); // This cases may happen due to race condition. See FillWriteBufferNPETest. + // If we received a response to an operation before we finished sending + // the full payload we can no longer trust that the server and client + // are synchronized. For example, we may be in the middle of sending + // bytes of a declared length and not sent all the bytes yet, + // so the server may interpret the next op as part of the payload + // of the previous op. + // The only safe thing to do is abandon the current write op and + // close and reestablish the connection. if (oState != OperationState.WRITING || obuf == null) { - return logCleanUpAndReturnStatus(FillWriteBufferStatus.forOperationState(oState), o, obuf); + logCleanUpAndReturnStatus(FillWriteBufferStatus.forOperationState(oState), o, obuf); + return FillWriteBufferStatus.OP_STATUS_IS_INTERRUPTED_BY_COMPLETION; } int bytesToCopy = Math.min(getWbuf().remaining(), obuf.remaining());