diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java index 2e196c0bf5ee..176695197151 100644 --- a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java +++ b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java @@ -61,7 +61,7 @@ public enum ExceptionCode valueToCode.put(code.value, code); } - private ExceptionCode(int value) + ExceptionCode(int value) { this.value = value; } diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index 82f8bc73ee17..371ac245faac 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -18,9 +18,9 @@ package org.apache.cassandra.transport.messages; import java.net.InetAddress; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.CodecException; @@ -84,30 +84,21 @@ public ErrorMessage decode(ByteBuf body, ProtocolVersion version) case READ_FAILURE: { ConsistencyLevel cl = CBUtil.readConsistencyLevel(body); - int received = body.readInt(); - int blockFor = body.readInt(); + final int received = body.readInt(); + final int blockFor = body.readInt(); // The number of failures is also present in protocol v5, but used instead to specify the size of the failure map - int failure = body.readInt(); + final int failures = body.readInt(); - Map failureReasonByEndpoint = new ConcurrentHashMap<>(); - if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) - { - for (int i = 0; i < failure; i++) - { - InetAddress endpoint = CBUtil.readInetAddr(body); - RequestFailureReason failureReason = RequestFailureReason.fromCode(body.readUnsignedShort()); - failureReasonByEndpoint.put(InetAddressAndPort.getByAddress(endpoint), failureReason); - } - } + final Map failureReasonByEndpoint = failureReasonByEndPoint(body, version, failures); if (code == ExceptionCode.WRITE_FAILURE) { - WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body)); + final WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body)); te = new WriteFailureException(cl, received, blockFor, writeType, failureReasonByEndpoint); } else { - byte dataPresent = body.readByte(); + final byte dataPresent = body.readByte(); te = new ReadFailureException(cl, received, blockFor, dataPresent != 0, failureReasonByEndpoint); } } @@ -116,14 +107,14 @@ public ErrorMessage decode(ByteBuf body, ProtocolVersion version) case READ_TIMEOUT: { ConsistencyLevel cl = CBUtil.readConsistencyLevel(body); - int received = body.readInt(); - int blockFor = body.readInt(); + final int received = body.readInt(); + final int blockFor = body.readInt(); if (code == ExceptionCode.WRITE_TIMEOUT) { - WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body)); + final WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body)); if (version.isGreaterOrEqualTo(ProtocolVersion.V5) && writeType == WriteType.CAS) { - int contentions = body.readShort(); + final int contentions = body.readShort(); te = new CasWriteTimeoutException(writeType, cl, received, blockFor, contentions); } else @@ -133,7 +124,7 @@ public ErrorMessage decode(ByteBuf body, ProtocolVersion version) } else { - byte dataPresent = body.readByte(); + final byte dataPresent = body.readByte(); te = new ReadTimeoutException(cl, received, blockFor, dataPresent != 0); } } @@ -336,6 +327,21 @@ public int encodedSize(ErrorMessage msg, ProtocolVersion version) } return size; } + + private Map failureReasonByEndPoint(ByteBuf body, ProtocolVersion version, int failures) + { + final Map failureReasons = new HashMap<>(); + if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) + { + for (int i = 0; i < failures; i++) + { + final InetAddress endpoint = CBUtil.readInetAddr(body); + final RequestFailureReason failureReason = RequestFailureReason.fromCode(body.readUnsignedShort()); + failureReasons.put(InetAddressAndPort.getByAddress(endpoint), failureReason); + } + } + return failureReasons; + } }; private static TransportException getBackwardsCompatibleException(ErrorMessage msg, ProtocolVersion version) @@ -363,12 +369,12 @@ private static TransportException getBackwardsCompatibleException(ErrorMessage m case WRITE_TIMEOUT: if (msg.error instanceof CasWriteTimeoutException) { - CasWriteTimeoutException cwte = (CasWriteTimeoutException) msg.error; + final CasWriteTimeoutException cwte = (CasWriteTimeoutException) msg.error; return new WriteTimeoutException(WriteType.CAS, cwte.consistency, cwte.received, cwte.blockFor); } break; case CAS_WRITE_UNKNOWN: - CasWriteUnknownResultException cwue = (CasWriteUnknownResultException) msg.error; + final CasWriteUnknownResultException cwue = (CasWriteUnknownResultException) msg.error; return new WriteTimeoutException(WriteType.CAS, cwue.consistency, cwue.received, cwue.blockFor); } }