Skip to content

Commit

Permalink
Fix MQTTFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill380 committed Sep 28, 2016
1 parent 4323069 commit ba30b2a
Showing 1 changed file with 23 additions and 29 deletions.
Expand Up @@ -32,7 +32,7 @@
* *
* @author Andrey Panasenko * @author Andrey Panasenko
*/ */
abstract public class MqttFrame { public abstract class MqttFrame {


public static final Logger LOG = LoggerFactory //NOSONAR public static final Logger LOG = LoggerFactory //NOSONAR
.getLogger(MqttFrame.class); .getLogger(MqttFrame.class);
Expand All @@ -48,16 +48,12 @@ abstract public class MqttFrame {
* to clone all fileds. * to clone all fileds.
*/ */
private MessageType messageType; private MessageType messageType;
/**
*
*/
protected MqttFrame() {



protected MqttFrame() {
} }


/**
* @param old te old
*/
protected MqttFrame(MqttFrame old) { protected MqttFrame(MqttFrame old) {
this.messageType = old.getMessageType(); this.messageType = old.getMessageType();
this.buffer = old.getBuffer(); this.buffer = old.getBuffer();
Expand All @@ -67,16 +63,11 @@ protected MqttFrame(MqttFrame old) {
this.currentState = old.currentState; this.currentState = old.currentState;
} }


/**
* @return the messageType
*/
public MessageType getMessageType() { public MessageType getMessageType() {
return messageType; return messageType;
} }


/**
* @param messageType the messageType to set
*/
protected void setMessageType(MessageType messageType) { protected void setMessageType(MessageType messageType) {
this.messageType = messageType; this.messageType = messageType;
} }
Expand All @@ -102,12 +93,12 @@ public ByteBuffer getFrame() {
} }


/** /**
* Pack message into mqtt frame * Pack message into mqtt frame.
*/ */
abstract protected void pack(); protected abstract void pack();


/** /**
* Return remaining length of mqtt frame, necessary for ByteBuffer size calculation * Return remaining length of mqtt frame, necessary for ByteBuffer size calculation.
* *
* @return remaining length of mqtt frame * @return remaining length of mqtt frame
*/ */
Expand All @@ -116,21 +107,21 @@ protected int getRemainingLegth() {
} }


/** /**
* Decode message from mqttFrame ByteBuffer * Decode message from mqttFrame ByteBuffer.
* *
* @throws KaaTcpProtocolException the kaa tcp protocol exception * @throws KaaTcpProtocolException the kaa tcp protocol exception
*/ */
abstract protected void decode() throws KaaTcpProtocolException; protected abstract void decode() throws KaaTcpProtocolException;


/** /**
* Check if this Mqtt frame should be last frame on connection and connection should be closed. * Check if this Mqtt frame should be last frame on connection and connection should be closed.
* *
* @return boolean 'true' if connection should be closed after frame transmition. * @return boolean 'true' if connection should be closed after frame transmition.
*/ */
abstract public boolean isNeedCloseConnection(); public abstract boolean isNeedCloseConnection();


/** /**
* Fill mqtt frame fixed header * Fill mqtt frame fixed header.
* *
* @param remainingLegth the remaining legth * @param remainingLegth the remaining legth
* @param dst the dst * @param dst the dst
Expand All @@ -152,7 +143,8 @@ private int fillFixedHeader(int remainingLegth, byte[] dst) {
} }
dst[size] = digit; dst[size] = digit;
++size; ++size;
} while (remainingLegth > 0); }
while (remainingLegth > 0);
return size; return size;
} }


Expand All @@ -169,11 +161,11 @@ private void onFrameDone() throws KaaTcpProtocolException {
frameDecodeComplete = true; frameDecodeComplete = true;
} }


private void processByte(byte b) throws KaaTcpProtocolException { private void processByte(byte value) throws KaaTcpProtocolException {
if (currentState.equals(FrameParsingState.PROCESSING_LENGTH)) { if (currentState.equals(FrameParsingState.PROCESSING_LENGTH)) {
remainingLength += ((b & 0xFF) & 127) * multiplier; remainingLength += ((value & 0xFF) & 127) * multiplier;
multiplier *= 128; multiplier *= 128;
if (((b & 0xFF) & 128) == 0) { if (((value & 0xFF) & 128) == 0) {
LOG.trace("Frame ({}): payload length = {}", getMessageType(), remainingLength); LOG.trace("Frame ({}): payload length = {}", getMessageType(), remainingLength);
if (remainingLength != 0) { if (remainingLength != 0) {
buffer = ByteBuffer.allocate(remainingLength); buffer = ByteBuffer.allocate(remainingLength);
Expand All @@ -186,7 +178,7 @@ private void processByte(byte b) throws KaaTcpProtocolException {
} }


/** /**
* Push bytes of frame * Push bytes of frame.
* *
* @param bytes the bytes array * @param bytes the bytes array
* @param position the position in buffer * @param position the position in buffer
Expand All @@ -201,11 +193,13 @@ public int push(byte[] bytes, int position) throws KaaTcpProtocolException {
} }
while (pos < bytes.length && !frameDecodeComplete) { while (pos < bytes.length && !frameDecodeComplete) {
if (currentState.equals(FrameParsingState.PROCESSING_PAYLOAD)) { if (currentState.equals(FrameParsingState.PROCESSING_PAYLOAD)) {
int bytesToCopy = (remainingLength > bytes.length - pos) ? bytes.length - pos : remainingLength; int bytesToCopy = (remainingLength > bytes.length - pos) ? bytes.length - pos :
remainingLength;
buffer.put(bytes, pos, bytesToCopy); buffer.put(bytes, pos, bytesToCopy);
pos += bytesToCopy; pos += bytesToCopy;
remainingLength -= bytesToCopy; remainingLength -= bytesToCopy;
LOG.trace("Frame ({}): copied {} bytes of payload. {} bytes left", getMessageType(), bytesToCopy, remainingLength); LOG.trace("Frame ({}): copied {} bytes of payload. {} bytes left", getMessageType(),
bytesToCopy, remainingLength);
if (remainingLength == 0) { if (remainingLength == 0) {
onFrameDone(); onFrameDone();
} }
Expand All @@ -218,7 +212,7 @@ public int push(byte[] bytes, int position) throws KaaTcpProtocolException {
} }


/** /**
* Test if Mqtt frame decode complete * Test if Mqtt frame decode complete.
* *
* @return boolean 'true' if decode complete * @return boolean 'true' if decode complete
*/ */
Expand Down

0 comments on commit ba30b2a

Please sign in to comment.