Skip to content

Commit

Permalink
Try to fix memcached corrupted uploads
Browse files Browse the repository at this point in the history
If a memcached server responds to an op before the
write of the op is completed, we can no longer trust
the parser read state of the server and we should
both abandon the op and reconnect to the server.

For example, we may be halfway through a length+blob
write. If the server rejects the op before we finish
writing, we cannot simply abandon the write and start
writing the next op, as those bytes may end up being
part of the payload of the failed op!

This situation has been observed in production (i.e.
corrupted memcached values with binary op headers in them)
and this is a best-guess at how to prevent it, but it is
difficult to reproduce without mocking a server which
produces abrupt failures.
  • Loading branch information
favila committed Dec 17, 2019
1 parent e280bef commit 74838cc
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 2 deletions.
9 changes: 8 additions & 1 deletion src/main/java/net/spy/memcached/FillWriteBufferStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ public enum FillWriteBufferStatus {
*/
OP_STATUS_IS_WRITE_QUEUED,
OP_STATUS_IS_READING,
OP_STATUS_IS_RETRY
OP_STATUS_IS_RETRY,
/**
* The server completed a write operation but the
*/
OP_STATUS_IS_INTERRUPTED_BY_COMPLETION
;

public static FillWriteBufferStatus forOperationState(final OperationState opState) {
Expand All @@ -74,4 +78,7 @@ public static FillWriteBufferStatus forOperationState(final OperationState opSta
public boolean isSuccess() {
return this == SUCCESS;
}
public boolean needsReconnect() {
return this == OP_STATUS_IS_INTERRUPTED_BY_COMPLETION;
}
}
6 changes: 6 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -810,13 +810,19 @@ public void complete() {
*/
private void handleWrites(final MemcachedNode node) throws IOException {
FillWriteBufferStatus bufferFilledStatus = node.fillWriteBuffer(shouldOptimize);
if (bufferFilledStatus.needsReconnect()) {
throw new IOException("Reconnecting because write was interrupted by completion");
}
boolean canWriteMore = node.getBytesRemainingToWrite() > 0;
while (canWriteMore && bufferFilledStatus.isSuccess()) {
int wrote = node.writeSome();
metrics.updateHistogram(OVERALL_AVG_BYTES_WRITE_METRIC, wrote);
bufferFilledStatus = node.fillWriteBuffer(shouldOptimize);
canWriteMore = wrote > 0 && node.getBytesRemainingToWrite() > 0;
}
if (bufferFilledStatus.needsReconnect()) {
throw new IOException("Reconnecting because write was interrupted by completion");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,17 @@ public final FillWriteBufferStatus fillWriteBuffer(boolean shouldOptimize) {
final OperationState oState = o.getState();
ByteBuffer obuf = o.getBuffer();
// This cases may happen due to race condition. See FillWriteBufferNPETest.
// If we received a response to an operation before we finished sending
// the full payload we can no longer trust that the server and client
// are synchronized. For example, we may be in the middle of sending
// bytes of a declared length and not sent all the bytes yet,
// so the server may interpret the next op as part of the payload
// of the previous op.
// The only safe thing to do is abandon the current write op and
// close and reestablish the connection.
if (oState != OperationState.WRITING || obuf == null) {
return logCleanUpAndReturnStatus(FillWriteBufferStatus.forOperationState(oState), o, obuf);
logCleanUpAndReturnStatus(FillWriteBufferStatus.forOperationState(oState), o, obuf);
return FillWriteBufferStatus.OP_STATUS_IS_INTERRUPTED_BY_COMPLETION;
}

int bytesToCopy = Math.min(getWbuf().remaining(), obuf.remaining());
Expand Down

0 comments on commit 74838cc

Please sign in to comment.