Skip to content

Commit

Permalink
Review
Browse files Browse the repository at this point in the history
  • Loading branch information
dineshjoshi committed Feb 1, 2020
1 parent 603443b commit b9a58b3
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 24 deletions.
Expand Up @@ -61,7 +61,7 @@ public enum ExceptionCode
valueToCode.put(code.value, code);
}

private ExceptionCode(int value)
ExceptionCode(int value)
{
this.value = value;
}
Expand Down
52 changes: 29 additions & 23 deletions src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
Expand Up @@ -18,9 +18,9 @@
package org.apache.cassandra.transport.messages;

import java.net.InetAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.CodecException;
Expand Down Expand Up @@ -84,30 +84,21 @@ public ErrorMessage decode(ByteBuf body, ProtocolVersion version)
case READ_FAILURE:
{
ConsistencyLevel cl = CBUtil.readConsistencyLevel(body);
int received = body.readInt();
int blockFor = body.readInt();
final int received = body.readInt();
final int blockFor = body.readInt();
// The number of failures is also present in protocol v5, but used instead to specify the size of the failure map
int failure = body.readInt();
final int failures = body.readInt();

Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint = new ConcurrentHashMap<>();
if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
{
for (int i = 0; i < failure; i++)
{
InetAddress endpoint = CBUtil.readInetAddr(body);
RequestFailureReason failureReason = RequestFailureReason.fromCode(body.readUnsignedShort());
failureReasonByEndpoint.put(InetAddressAndPort.getByAddress(endpoint), failureReason);
}
}
final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint = failureReasonByEndPoint(body, version, failures);

if (code == ExceptionCode.WRITE_FAILURE)
{
WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body));
final WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body));
te = new WriteFailureException(cl, received, blockFor, writeType, failureReasonByEndpoint);
}
else
{
byte dataPresent = body.readByte();
final byte dataPresent = body.readByte();
te = new ReadFailureException(cl, received, blockFor, dataPresent != 0, failureReasonByEndpoint);
}
}
Expand All @@ -116,14 +107,14 @@ public ErrorMessage decode(ByteBuf body, ProtocolVersion version)
case READ_TIMEOUT:
{
ConsistencyLevel cl = CBUtil.readConsistencyLevel(body);
int received = body.readInt();
int blockFor = body.readInt();
final int received = body.readInt();
final int blockFor = body.readInt();
if (code == ExceptionCode.WRITE_TIMEOUT)
{
WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body));
final WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body));
if (version.isGreaterOrEqualTo(ProtocolVersion.V5) && writeType == WriteType.CAS)
{
int contentions = body.readShort();
final int contentions = body.readShort();
te = new CasWriteTimeoutException(writeType, cl, received, blockFor, contentions);
}
else
Expand All @@ -133,7 +124,7 @@ public ErrorMessage decode(ByteBuf body, ProtocolVersion version)
}
else
{
byte dataPresent = body.readByte();
final byte dataPresent = body.readByte();
te = new ReadTimeoutException(cl, received, blockFor, dataPresent != 0);
}
}
Expand Down Expand Up @@ -336,6 +327,21 @@ public int encodedSize(ErrorMessage msg, ProtocolVersion version)
}
return size;
}

private Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndPoint(ByteBuf body, ProtocolVersion version, int failures)
{
final Map<InetAddressAndPort, RequestFailureReason> failureReasons = new HashMap<>();
if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
{
for (int i = 0; i < failures; i++)
{
final InetAddress endpoint = CBUtil.readInetAddr(body);
final RequestFailureReason failureReason = RequestFailureReason.fromCode(body.readUnsignedShort());
failureReasons.put(InetAddressAndPort.getByAddress(endpoint), failureReason);
}
}
return failureReasons;
}
};

private static TransportException getBackwardsCompatibleException(ErrorMessage msg, ProtocolVersion version)
Expand Down Expand Up @@ -363,12 +369,12 @@ private static TransportException getBackwardsCompatibleException(ErrorMessage m
case WRITE_TIMEOUT:
if (msg.error instanceof CasWriteTimeoutException)
{
CasWriteTimeoutException cwte = (CasWriteTimeoutException) msg.error;
final CasWriteTimeoutException cwte = (CasWriteTimeoutException) msg.error;
return new WriteTimeoutException(WriteType.CAS, cwte.consistency, cwte.received, cwte.blockFor);
}
break;
case CAS_WRITE_UNKNOWN:
CasWriteUnknownResultException cwue = (CasWriteUnknownResultException) msg.error;
final CasWriteUnknownResultException cwue = (CasWriteUnknownResultException) msg.error;
return new WriteTimeoutException(WriteType.CAS, cwue.consistency, cwue.received, cwue.blockFor);
}
}
Expand Down

0 comments on commit b9a58b3

Please sign in to comment.