From 31a0fe3aa4e57400dd1dd731a060b97f983f750a Mon Sep 17 00:00:00 2001 From: Aaron Date: Wed, 14 Mar 2018 11:52:19 -0700 Subject: [PATCH 1/4] Multi-part messages. Improved debug. --- .../iot/dsa/dslink/io/DSByteBuffer.java | 31 +- .../protocol/requester/DSOutboundStub.java | 2 + .../requester/DSOutboundSubscriptions.java | 4 +- .../protocol/responder/DSInboundInvoke.java | 4 +- .../dsa/dslink/protocol/v2/AckMessage.java | 2 +- .../dslink/protocol/v2/DS2LinkConnection.java | 4 +- .../dsa/dslink/protocol/v2/DS2Message.java | 45 ++- .../dslink/protocol/v2/DS2MessageReader.java | 71 ++-- .../dslink/protocol/v2/DS2MessageWriter.java | 71 ++-- .../dsa/dslink/protocol/v2/DS2Session.java | 103 ++---- .../dslink/protocol/v2/MessageConstants.java | 31 +- .../dslink/protocol/v2/MultipartReader.java | 79 +++++ .../dslink/protocol/v2/MultipartWriter.java | 89 ++++++ .../v2/requester/DS2OutboundInvokeStub.java | 24 +- .../v2/requester/DS2OutboundListStub.java | 23 +- .../v2/requester/DS2OutboundSetStub.java | 27 +- .../requester/DS2OutboundSubscriptions.java | 8 +- .../v2/responder/DS2InboundInvoke.java | 28 +- .../protocol/v2/responder/DS2InboundList.java | 34 +- .../v2/responder/DS2InboundSubscription.java | 2 +- .../protocol/v2/responder/DS2Responder.java | 16 +- .../protocol/v2/responder/ErrorMessage.java | 22 +- .../iot/dsa/dslink/test/TestTransport.java | 19 +- .../transport/BufferedBinaryTransport.java | 302 ++++++++++++++++++ .../iot/dsa/dslink/transport/DSTransport.java | 37 +-- .../dslink/transport/PushBinaryTransport.java | 183 ----------- .../dsa/dslink/transport/SocketTransport.java | 175 +--------- .../transport/StreamBinaryTransport.java | 254 +++++++++++++++ .../org/iot/dsa/logging/AsyncLogHandler.java | 7 +- .../dslink/websocket/WsBinaryTransport.java | 52 ++- .../dsa/dslink/websocket/WsTextTransport.java | 14 +- 31 files changed, 1095 insertions(+), 668 deletions(-) create mode 100644 dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MultipartReader.java create mode 100644 dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MultipartWriter.java create mode 100644 dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/BufferedBinaryTransport.java delete mode 100644 dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/PushBinaryTransport.java create mode 100644 dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/StreamBinaryTransport.java diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/io/DSByteBuffer.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/io/DSByteBuffer.java index be86e7df..295d3125 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/io/DSByteBuffer.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/io/DSByteBuffer.java @@ -1,7 +1,6 @@ package com.acuity.iot.dsa.dslink.io; import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintStream; @@ -269,21 +268,14 @@ public DSByteBuffer put(int dest, byte[] msg, int off, int len) { } public int put(InputStream in, int len) { - int count = 0; + growBuffer(offset + len); try { - int ch; - while (count < len) { - ch = in.read(); - if (ch < 0) { - return count; - } - put((byte) ch); - count++; - } - } catch (IOException x) { + len = in.read(buffer, offset, len); + } catch (Exception x) { DSException.throwRuntime(x); } - return count; + length += len; + return len; } /** @@ -554,6 +546,19 @@ public void sendTo(DSBinaryTransport transport, boolean isLast) { length = 0; } + /** + * Push bytes from the internal buffer to the given. + */ + public void sendTo(DSByteBuffer buf, int len) { + buf.put(buffer, offset, len); + length -= len; + if (length == 0) { + offset = 0; + } else { + offset += len; + } + } + /** * Push bytes from the internal buffer to the stream. */ diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundStub.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundStub.java index b57d2f32..d5a97983 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundStub.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundStub.java @@ -79,6 +79,7 @@ public void handleClose() { getRequester().removeRequest(getRequestId()); } + /* public void handleError(DSElement details) { if (!open) { return; @@ -121,6 +122,7 @@ public void handleError(DSElement details) { getRequester().error(getRequester().getPath(), x); } } + */ public void handleError(ErrorType type, String message) { if (!open) { diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscriptions.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscriptions.java index 750ad519..db5b715c 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscriptions.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscriptions.java @@ -4,6 +4,7 @@ import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter; import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage; import com.acuity.iot.dsa.dslink.protocol.requester.DSOutboundSubscribeStubs.State; +import com.acuity.iot.dsa.dslink.protocol.v2.DS2MessageWriter; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -217,8 +218,7 @@ public void write(MessageWriter writer) { if (!pendingSubscribe.isEmpty()) { doBeginSubscribe(writer); Iterator it = pendingSubscribe.iterator(); - //while (it.hasNext() && !session.shouldEndMessage()) { - while (it.hasNext()) { //todo + while (it.hasNext() && !session.shouldEndMessage()) { DSOutboundSubscribeStubs stubs = it.next(); if (!stubs.hasSid()) { synchronized (pathMap) { diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundInvoke.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundInvoke.java index c1218af9..ec259a1c 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundInvoke.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundInvoke.java @@ -79,7 +79,7 @@ public void close() { } state = STATE_CLOSE_PENDING; enqueueResponse(); - fine(fine() ? getPath() + " list closed locally" : null); + fine(fine() ? getPath() + " invoke closed locally" : null); } @Override @@ -90,7 +90,7 @@ public void close(Exception reason) { closeReason = reason; state = STATE_CLOSE_PENDING; enqueueResponse(); - fine(fine() ? getPath() + " list closed locally" : null); + fine(fine() ? getPath() + " invoke closed locally" : null); } private synchronized Update dequeueUpdate() { diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/AckMessage.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/AckMessage.java index e1a7b730..658681ad 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/AckMessage.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/AckMessage.java @@ -18,7 +18,7 @@ public AckMessage(DS2Session session) { public void write(MessageWriter writer) { DS2MessageWriter out = (DS2MessageWriter) writer; out.init(-1, -1); - out.setMethod(MessageConstants.MSG_ACK); + out.setMethod(MSG_ACK); out.getBody().putInt(session.getNextAck(), false); out.write(session.getTransport()); } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2LinkConnection.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2LinkConnection.java index d77c4fd5..39f4ed46 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2LinkConnection.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2LinkConnection.java @@ -249,7 +249,7 @@ private void sendF0() { String dsId = link.getDsId(); DSKeys dsKeys = link.getKeys(); DS2MessageWriter writer = new DS2MessageWriter(); - writer.setMethod((byte) 0xf0); + writer.setMethod(0xf0); DSByteBuffer buffer = writer.getBody(); buffer.put((byte) 2).put((byte) 0); //dsa version writer.writeString(dsId, buffer); @@ -260,7 +260,7 @@ private void sendF0() { private void sendF2() throws Exception { DS2MessageWriter writer = new DS2MessageWriter(); - writer.setMethod((byte) 0xf2); + writer.setMethod(0xf2); DSByteBuffer buffer = writer.getBody(); String token = getLink().getConfig().getToken(); if (token == null) { diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Message.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Message.java index 0520198b..b336c3bf 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Message.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Message.java @@ -1,5 +1,7 @@ package com.acuity.iot.dsa.dslink.protocol.v2; +import java.util.Map; +import org.iot.dsa.logging.DSLogger; import org.iot.dsa.node.DSBytes; /** @@ -7,19 +9,15 @@ * * @author Aaron Hansen */ -class DS2Message implements MessageConstants { +abstract class DS2Message extends DSLogger implements MessageConstants { - private StringBuilder debug; - - public StringBuilder getDebug() { - return debug; - } + private StringBuilder logBuffer; protected StringBuilder debugMethod(int arg, StringBuilder buf) { if (buf == null) { buf = new StringBuilder(); } - switch (arg & 0xFF) { + switch (arg) { case MSG_SUBSCRIBE_REQ: buf.append("Sub req"); break; @@ -69,13 +67,13 @@ protected StringBuilder debugMethod(int arg, StringBuilder buf) { buf.append("Handshake 4"); break; default: - buf.append("?? 0x"); + buf.append("0x"); DSBytes.toHex((byte) arg, buf); } return buf; } - protected StringBuilder debugHeader(int arg, StringBuilder buf) { + private StringBuilder debugHeader(int arg, StringBuilder buf) { if (buf == null) { buf = new StringBuilder(); } @@ -132,18 +130,37 @@ protected StringBuilder debugHeader(int arg, StringBuilder buf) { buf.append("Sourth Path"); break; default: - buf.append("?? 0x"); + buf.append("0x"); DSBytes.toHex((byte) arg, buf); } return buf; } - public boolean isDebug() { - return debug != null; + protected void debugHeaders(Map headers, StringBuilder buf) { + for (Map.Entry e : headers.entrySet()) { + buf.append(", "); + debugHeader(e.getKey(), buf); + if (e.getValue() != NO_HEADER_VAL) { + buf.append("="); + buf.append(e.getValue()); + } + } + } + + protected abstract void getDebug(StringBuilder buf); + + private StringBuilder getLogBuffer() { + if (logBuffer == null) { + logBuffer = new StringBuilder(); + } + return logBuffer; } - public void setDebug(StringBuilder debug) { - this.debug = debug; + protected void printDebug() { + StringBuilder buf = getLogBuffer(); + getDebug(buf); + debug(buf.toString()); + buf.setLength(0); } } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2MessageReader.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2MessageReader.java index 33027585..3cfd3473 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2MessageReader.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2MessageReader.java @@ -1,5 +1,6 @@ package com.acuity.iot.dsa.dslink.protocol.v2; +import com.acuity.iot.dsa.dslink.io.DSByteBuffer; import com.acuity.iot.dsa.dslink.io.msgpack.MsgpackReader; import java.io.IOException; import java.io.InputStream; @@ -44,22 +45,17 @@ public DS2MessageReader() { // Methods // ------- - public void debugSummary() { - StringBuilder buf = getDebug(); + @Override + protected void getDebug(StringBuilder buf) { buf.append("RECV "); debugMethod(getMethod(), buf); - if (requestId >= 0) { + if (requestId > 0) { buf.append(", ").append("Rid ").append(requestId); } - if (ackId >= 0) { + if (ackId > 0) { buf.append(", ").append("Ack ").append(ackId); } - for (Map.Entry e : headers.entrySet()) { - buf.append(", "); - debugHeader(e.getKey(), buf); - buf.append("="); - buf.append(e.getValue()); - } + debugHeaders(headers, buf); } public int getAckId() { @@ -145,7 +141,19 @@ private ByteBuffer getStringBuffer(int len) { return strBuffer; } - public DS2MessageReader init(InputStream in) { + void init(int requestId, + int method, + DSByteBuffer body, + Map headers) { + this.requestId = requestId; + this.method = method; + input = body; + bodyLength = body.length(); + this.headers.clear(); + this.headers.putAll(headers); + } + + public void init(InputStream in) { try { input = in; getBodyReader().reset(); @@ -168,13 +176,12 @@ public DS2MessageReader init(InputStream in) { parseDynamicHeaders(in, hlen, headers); } } - if (isDebug()) { - debugSummary(); + if (debug()) { + printDebug(); } } catch (IOException x) { DSException.throwRuntime(x); } - return this; } public boolean isAck() { @@ -185,6 +192,11 @@ public boolean isPing() { return method == MSG_PING; } + public boolean isMultipart() { + Integer page = (Integer) headers.get(HDR_PAGE_ID); + return page != null; + } + public boolean isRequest() { switch (method) { case MSG_CLOSE: @@ -208,35 +220,34 @@ void parseDynamicHeaders(InputStream in, int len, Map headers) Object val; while (len > 0) { code = in.read() & 0xFF; - val = null; len--; switch (code) { - case 0x00: - case 0x08: - case 0x12: - case 0x32: + case HDR_STATUS: + case HDR_ALIAS_COUNT: + case HDR_QOS: + case HDR_MAX_PERMISSION: val = (byte) in.read(); len--; break; - case 0x01: - case 0x02: - case 0x14: - case 0x15: + case HDR_SEQ_ID: + case HDR_PAGE_ID: + case HDR_QUEUE_SIZE: + case HDR_QUEUE_DURATION: val = DSBytes.readInt(in, false); len -= 4; break; - case 0x21: - case 0x41: - case 0x60: - case 0x80: - case 0x81: + case HDR_PUB_PATH: + case HDR_ATTRIBUTE_FIELD: + case HDR_PERMISSION_TOKEN: + case HDR_TARGET_PATH: + case HDR_SOURCE_PATH: short slen = DSBytes.readShort(in, false); len -= 2; - len -= slen; val = readString(in, slen); + len -= slen; break; default: - val = (byte) code; + val = NO_HEADER_VAL; } headers.put(code, val); } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2MessageWriter.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2MessageWriter.java index 19d7efd2..1bd1d6f3 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2MessageWriter.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2MessageWriter.java @@ -8,6 +8,8 @@ import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharsetEncoder; +import java.util.HashMap; +import java.util.Map; import org.iot.dsa.node.DSString; /** @@ -26,6 +28,7 @@ public class DS2MessageWriter extends DS2Message implements MessageWriter { private DSByteBuffer body; private CharBuffer charBuffer; private DSByteBuffer header; + private Map headers = new HashMap(); private int method; private int requestId = -1; private ByteBuffer strBuffer; @@ -49,16 +52,18 @@ public DS2MessageWriter() { /** * Encodes the key into the header buffer. */ - public DS2MessageWriter addHeader(byte key) { - header.put(key); + public DS2MessageWriter addHeader(int key) { + headers.put(key, NO_HEADER_VAL); + header.put((byte) key); return this; } /** * Encodes the key value pair into the header buffer. */ - public DS2MessageWriter addByteHeader(byte key, byte value) { - header.put(key); + public DS2MessageWriter addByteHeader(int key, byte value) { + headers.put(key, value); + header.put((byte) key); header.put(value); return this; } @@ -66,8 +71,9 @@ public DS2MessageWriter addByteHeader(byte key, byte value) { /** * Encodes the key value pair into the header buffer. */ - public DS2MessageWriter addIntHeader(byte key, int value) { - header.put(key); + public DS2MessageWriter addIntHeader(int key, int value) { + headers.put(key, value); + header.put((byte) key); header.putInt(value, false); return this; } @@ -75,34 +81,20 @@ public DS2MessageWriter addIntHeader(byte key, int value) { /** * Encodes the key value pair into the header buffer. */ - public DS2MessageWriter addStringHeader(byte key, String value) { - header.put(key); + public DS2MessageWriter addStringHeader(int key, String value) { + headers.put(key, value); + header.put((byte) key); writeString(value, header); return this; } - private void debugSummary() { - StringBuilder debug = getDebug(); - StringBuilder buf = debug; - boolean insert = buf.length() > 0; - if (insert) { - buf = new StringBuilder(); - } - buf.append("SEND "); - debugMethod(method, buf); - buf.append(", Rid ").append(requestId < 0 ? 0 : requestId); - buf.append(", Ack ").append(ackId < 0 ? 0 : ackId); - if (insert) { - debug.insert(0, buf); - } - } private void finishHeader() { int hlen = header.length(); int blen = body.length(); header.replaceInt(0, hlen + blen, false); header.replaceShort(4, (short) hlen, false); - header.replace(6, (byte) method); + header.replace(6, (byte) (method & 0xFF)); } public DSByteBuffer getBody() { @@ -139,8 +131,17 @@ private CharBuffer getCharBuffer(CharSequence arg) { return charBuffer; } - public int getHeaderLength() { - return header.length(); + @Override + protected void getDebug(StringBuilder buf) { + buf.append("SEND "); + debugMethod(method, buf); + if (requestId > 0) { + buf.append(", ").append("Rid ").append(requestId); + } + if (ackId > 0) { + buf.append(", ").append("Ack ").append(ackId); + } + debugHeaders(headers, buf); } /** @@ -181,6 +182,7 @@ public DS2MessageWriter init(int requestId, int ackId) { this.requestId = requestId; this.ackId = ackId; body.clear(); + headers.clear(); writer.reset(); header.clear(); header.skip(7); @@ -197,6 +199,17 @@ public DS2MessageWriter init(int requestId, int ackId) { return this; } + public MultipartWriter makeMultipart() { + DSByteBuffer tmp = body; + body = new DSByteBuffer(); + writer = new MsgpackWriter(body); + return new MultipartWriter(requestId, method, headers, tmp); + } + + public boolean requiresMultipart() { + return getBodyLength() > MAX_BODY; + } + public DS2MessageWriter setMethod(int method) { this.method = method; return this; @@ -218,11 +231,11 @@ public byte[] toByteArray() { */ public DS2MessageWriter write(DSBinaryTransport out) { finishHeader(); - if (isDebug()) { - debugSummary(); - } header.sendTo(out, false); body.sendTo(out, true); + if (debug()) { + printDebug(); + } return this; } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Session.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Session.java index af8dddf4..3b426003 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Session.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Session.java @@ -6,6 +6,8 @@ import com.acuity.iot.dsa.dslink.protocol.v2.responder.DS2Responder; import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport; import com.acuity.iot.dsa.dslink.transport.DSTransport; +import java.util.HashMap; +import java.util.Map; import org.iot.dsa.dslink.DSIRequester; import org.iot.dsa.node.DSBytes; import org.iot.dsa.node.DSInfo; @@ -30,16 +32,11 @@ public class DS2Session extends DSSession implements MessageConstants { // Fields /////////////////////////////////////////////////////////////////////////// - private boolean debugRecv = false; - private StringBuilder debugRecvTranport = null; - private StringBuilder debugRecvMessage = null; - private boolean debugSend = false; - private StringBuilder debugSendTranport = null; - private StringBuilder debugSendMessage = null; private DSInfo lastAckRecv = getInfo(LAST_ACK_RECV); private long lastMessageSent; private DS2MessageReader messageReader; private DS2MessageWriter messageWriter; + private Map multiparts = new HashMap(); private boolean requestsNext = false; private DS2Requester requester = new DS2Requester(this); private DS2Responder responder = new DS2Responder(this); @@ -68,32 +65,24 @@ protected void declareDefaults() { protected void doRecvMessage() { DSBinaryTransport transport = getTransport(); DS2MessageReader reader = getMessageReader(); - boolean debug = debug(); - if (debug != debugRecv) { - if (debug) { - debugRecvMessage = new StringBuilder(); - debugRecvTranport = new StringBuilder(); - reader.setDebug(debugRecvMessage); - transport.setDebugIn(debugRecvTranport); - } else { - debugRecvMessage = null; - debugRecvTranport = null; - reader.setDebug(null); - transport.setDebugIn(null); - } - debugRecv = debug; - } - if (debug) { - debugRecvMessage.setLength(0); - debugRecvTranport.setLength(0); - debugRecvTranport.append("Bytes read\n"); - } transport.beginRecvMessage(); reader.init(transport.getInput()); int ack = reader.getAckId(); if (ack > 0) { put(lastAckRecv, DSInt.valueOf(ack)); } + if (reader.isMultipart()) { + MultipartReader multi = multiparts.get(reader.getRequestId()); + if (multi == null) { + multi = new MultipartReader(reader); + multiparts.put(reader.getRequestId(), multi); + return; + } else if (multi.update(reader)) { + return; + } + multiparts.remove(reader.getRequestId()); + reader = multi.makeReader(); + } if (reader.isRequest()) { responder.handleRequest(reader); setNextAck(reader.getRequestId()); @@ -104,47 +93,18 @@ protected void doRecvMessage() { } else if (reader.isResponse()) { requester.handleResponse(reader); setNextAck(reader.getRequestId()); + } else { + error("Unknown method: " + reader.getMethod()); } - if (debug) { - debug(debugRecvMessage); - debug(debugRecvTranport); - debugRecvMessage.setLength(0); - debugRecvTranport.setLength(0); - } + transport.endRecvMessage(); } @Override protected void doSendMessage() { DSTransport transport = getTransport(); - boolean debug = debug(); - if (debug != debugSend) { - if (debug) { - debugSendMessage = new StringBuilder(); - debugSendTranport = new StringBuilder(); - getMessageWriter().setDebug(debugSendMessage); - transport.setDebugOut(debugSendTranport); - } else { - debugSendMessage = null; - debugSendTranport = null; - getMessageWriter().setDebug(null); - transport.setDebugOut(null); - } - debugSend = debug; - } - if (debug) { - debugSendMessage.setLength(0); - debugSendTranport.setLength(0); - debugSendTranport.append("Bytes sent\n"); - } if (this.hasSomethingToSend()) { transport.beginSendMessage(); - boolean sent = send(requestsNext = !requestsNext); //alternate reqs and resps - if (sent && debug) { - debug(debugSendMessage); - debug(debugSendTranport); - debugSendMessage.setLength(0); - debugSendTranport.setLength(0); - } + send(requestsNext = !requestsNext); //alternate reqs and resps transport.endSendMessage(); } } @@ -204,18 +164,6 @@ public void onConnect() { super.onConnect(); messageReader = null; messageWriter = null; - if (debugRecv) { - debugRecvMessage = new StringBuilder(); - debugRecvTranport = new StringBuilder(); - getMessageReader().setDebug(debugRecvMessage); - getTransport().setDebugIn(debugRecvTranport); - } - if (debugSend) { - debugSendMessage = new StringBuilder(); - debugSendTranport = new StringBuilder(); - getMessageWriter().setDebug(debugSendMessage); - getTransport().setDebugOut(debugSendTranport); - } requester.onConnect(); responder.onConnect(); } @@ -240,7 +188,7 @@ public void onDisconnect() { * @param requests Determines which queue to use; True for outgoing requests, false for * responses. */ - private boolean send(boolean requests) { + private void send(boolean requests) { boolean hasSomething = false; if (requests) { hasSomething = hasOutgoingRequests(); @@ -249,7 +197,11 @@ private boolean send(boolean requests) { } OutboundMessage msg = null; if (hasSomething) { - msg = requests ? dequeueOutgoingRequest() : dequeueOutgoingResponse(); + if (requests) { + msg = dequeueOutgoingRequest(); + } else { + msg = dequeueOutgoingResponse(); + } } else if (hasPingToSend()) { msg = new PingMessage(this); } else if (hasAckToSend()) { @@ -257,10 +209,9 @@ private boolean send(boolean requests) { } if (msg != null) { lastMessageSent = System.currentTimeMillis(); - msg.write(getMessageWriter()); - return true; + DS2MessageWriter out = getMessageWriter(); + msg.write(out); } - return false; } /** diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MessageConstants.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MessageConstants.java index 2d1b9a00..677dfb9a 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MessageConstants.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MessageConstants.java @@ -9,8 +9,9 @@ */ public interface MessageConstants { - int MAX_HEADER = 1024 * 48; - int MAX_BODY = 16320; + int MAX_BODY = 1024 * 48; + int MAX_HEADER = 16320; + Object NO_HEADER_VAL = new Object(); int HDR_STATUS = 0x0; int HDR_SEQ_ID = 0x01; @@ -28,25 +29,25 @@ public interface MessageConstants { int HDR_MAX_PERMISSION = 0x32; int HDR_ATTRIBUTE_FIELD = 0x41; int HDR_PERMISSION_TOKEN = 0x60; - int HDR_TARGET_PATH = (0x80 & 0xFF); - int HDR_SOURCE_PATH = (0x81 & 0xFF); + int HDR_TARGET_PATH = 0x80; + int HDR_SOURCE_PATH = 0x81; int MSG_SUBSCRIBE_REQ = 0x01; - int MSG_SUBSCRIBE_RES = (0x81 & 0xFF); + int MSG_SUBSCRIBE_RES = 0x81; int MSG_LIST_REQ = 0x02; - int MSG_LIST_RES = (0x82 & 0xFF); + int MSG_LIST_RES = 0x82; int MSG_INVOKE_REQ = 0x03; - int MSG_INVOKE_RES = (0x83 & 0xFF); + int MSG_INVOKE_RES = 0x83; int MSG_SET_REQ = 0x04; - int MSG_SET_RES = (0x84 & 0xFF); + int MSG_SET_RES = 0x84; int MSG_OBSERVE_REQ = 0x0A; int MSG_CLOSE = 0x0F; - int MSG_ACK = (0xF8 & 0xFF); - int MSG_PING = (0xF9 & 0xFF); - int MSG_HANDSHAKE_1 = (0xF0 & 0xFF); - int MSG_HANDSHAKE_2 = (0xF1 & 0xFF); - int MSG_HANDSHAKE_3 = (0xF2 & 0xFF); - int MSG_HANDSHAKE_4 = (0xF3 & 0xFF); + int MSG_ACK = 0xF8; + int MSG_PING = 0xF9; + int MSG_HANDSHAKE_1 = 0xF0; + int MSG_HANDSHAKE_2 = 0xF1; + int MSG_HANDSHAKE_3 = 0xF2; + int MSG_HANDSHAKE_4 = 0xF3; byte STS_OK = 0; byte STS_INITIALIZING = 0x01; @@ -61,6 +62,6 @@ public interface MessageConstants { byte STS_BUSY = 0x48; byte STS_INTERNAL_ERR = 0x50; byte STS_ALIAS_LOOP = 0x61; - byte STS_INVALID_AUTH = (byte) (0xF9 & 0xFF); + byte STS_INVALID_AUTH = (byte) 0xF9; } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MultipartReader.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MultipartReader.java new file mode 100644 index 00000000..91e53acb --- /dev/null +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MultipartReader.java @@ -0,0 +1,79 @@ +package com.acuity.iot.dsa.dslink.protocol.v2; + +import com.acuity.iot.dsa.dslink.io.DSByteBuffer; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +/** + * Used to combine multiple inbound messages into one when the body of the entire message exceeds + * the max possible size. + * + * @author Aaron Hansen + */ +public class MultipartReader implements MessageConstants { + + // Fields + // ------ + + private DSByteBuffer body = new DSByteBuffer(); + private int currentPage = 0; + private Map headers = new HashMap(); + private int lastPage; + private int method; + private int requestId = -1; + private Byte status; + + // Constructors + // ------------ + + MultipartReader(DS2MessageReader reader) { + this.requestId = reader.getRequestId(); + this.method = reader.getMethod(); + this.headers.putAll(reader.getHeaders()); + int page = (Integer) headers.get(HDR_PAGE_ID); + if (page >= 0) { + throw new IllegalArgumentException("Invalid page id: " + page); + } + page = -page; + lastPage = page - 1; + } + + // Methods + // ------- + + /** + * Call this once update returns false. + */ + public DS2MessageReader makeReader() { + DS2MessageReader reader = new DS2MessageReader(); + reader.init(requestId, method, body, headers); + return reader; + } + + private MultipartReader putBody(InputStream in, int len) { + body.put(in, len); + return this; + } + + private MultipartReader putHeaders(Map headers) { + this.headers.putAll(headers); + status = (Byte) this.headers.get(HDR_STATUS); + return this; + } + + /** + * Call for each part, will return true when there are more parts expected. + */ + public boolean update(DS2MessageReader reader) { + int page = (Integer) headers.get(HDR_PAGE_ID); + if (page < currentPage) { + throw new IllegalStateException("Out of order page"); + } + currentPage = page; + putHeaders(reader.getHeaders()); + putBody(reader.getBody(), reader.getBodyLength()); + return currentPage < lastPage; + } + +} diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MultipartWriter.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MultipartWriter.java new file mode 100644 index 00000000..37d2f883 --- /dev/null +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MultipartWriter.java @@ -0,0 +1,89 @@ +package com.acuity.iot.dsa.dslink.protocol.v2; + +import com.acuity.iot.dsa.dslink.io.DSByteBuffer; +import java.util.Iterator; +import java.util.Map; + +/** + * Used to write multipart outbound messages when the body of a single message is larger than + * the max possible body size. + * + * @author Aaron Hansen + */ +public class MultipartWriter implements MessageConstants { + + // Fields + // ------ + + private DSByteBuffer body; + private Map headers; + private int method; + private int page = 0; + private int requestId = -1; + private Byte status; + + // Constructors + // ------------ + + MultipartWriter(int requestId, + int method, + Map headers, + DSByteBuffer body) { + this.requestId = requestId; + this.method = method; + this.headers = headers; + this.body = body; + status = (Byte) headers.get(HDR_STATUS); + if (status != null) { + headers.put(HDR_STATUS, STS_OK); + } + page = -((body.length() / MAX_BODY) + 1); + } + + // Methods + // ------- + + private void writeHeaders(DS2MessageWriter writer) { + Object val; + Map.Entry me; + Iterator> it = headers.entrySet().iterator(); + while (it.hasNext()) { + me = it.next(); + val = me.getValue(); + if (val == NO_HEADER_VAL) { + writer.addHeader(me.getKey()); + } else if (val instanceof Byte) { + writer.addByteHeader(me.getKey(), (Byte) val); + } else if (val instanceof Integer) { + writer.addIntHeader(me.getKey(), (Integer) val); + } else if (val instanceof String) { + writer.addStringHeader(me.getKey(), (String) val); + } + } + } + + /** + * Call for each part, will return true when there are more part remain. + */ + public boolean update(DS2MessageWriter writer, int ackId) { + writer.init(requestId, ackId); + writer.setMethod(method); + headers.put(HDR_PAGE_ID, Integer.valueOf(page)); + int len = body.length(); + if (len < MessageConstants.MAX_BODY) { + if (status != null) { + headers.put(HDR_STATUS, status); + } + } + writeHeaders(writer); + len = Math.min(MAX_BODY, len); + body.sendTo(writer.getBody(), len); + if (page < 0) { + page = 1; + } else { + page++; + } + return body.length() > 0; + } + +} diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundInvokeStub.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundInvokeStub.java index e96922e2..52edf331 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundInvokeStub.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundInvokeStub.java @@ -6,6 +6,7 @@ import com.acuity.iot.dsa.dslink.protocol.v2.DS2MessageReader; import com.acuity.iot.dsa.dslink.protocol.v2.DS2MessageWriter; import com.acuity.iot.dsa.dslink.protocol.v2.MessageConstants; +import com.acuity.iot.dsa.dslink.protocol.v2.MultipartWriter; import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport; import org.iot.dsa.dslink.requester.OutboundInvokeHandler; import org.iot.dsa.node.DSMap; @@ -13,6 +14,8 @@ public class DS2OutboundInvokeStub extends DSOutboundInvokeStub implements DS2OutboundStub, MessageConstants { + MultipartWriter multipart; + protected DS2OutboundInvokeStub(DSRequester requester, Integer requestId, String path, @@ -29,17 +32,28 @@ public void handleResponse(DS2MessageReader response) { @Override public void write(MessageWriter writer) { - //if has multipart remaining send that DS2MessageWriter out = (DS2MessageWriter) writer; - out.init(getRequestId(), getSession().getNextAck()); + if (multipart != null) { + if (multipart.update(out, getSession().getNextAck())) { + getRequester().sendRequest(this); + } + return; + } + int ack = getSession().getNextAck(); + out.init(getRequestId(), ack); out.setMethod(MSG_INVOKE_REQ); - out.addStringHeader((byte) HDR_TARGET_PATH, getPath()); + out.addStringHeader(HDR_TARGET_PATH, getPath()); DSMap params = getParams(); if (params != null) { out.getWriter().value(params); } - out.write((DSBinaryTransport) getRequester().getTransport()); - //if multipart + if (out.requiresMultipart()) { + multipart = out.makeMultipart(); + multipart.update(out, ack); + getRequester().sendRequest(this); + } else { + out.write((DSBinaryTransport) getRequester().getTransport()); + } } } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundListStub.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundListStub.java index b8dbbb48..da7df9df 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundListStub.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundListStub.java @@ -7,6 +7,7 @@ import com.acuity.iot.dsa.dslink.protocol.v2.DS2MessageReader; import com.acuity.iot.dsa.dslink.protocol.v2.DS2MessageWriter; import com.acuity.iot.dsa.dslink.protocol.v2.MessageConstants; +import com.acuity.iot.dsa.dslink.protocol.v2.MultipartWriter; import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport; import java.io.IOException; import java.io.InputStream; @@ -19,6 +20,7 @@ public class DS2OutboundListStub extends DSOutboundListStub implements DS2OutboundStub, MessageConstants { + private MultipartWriter multipart; private byte state = STS_INITIALIZING; protected DS2OutboundListStub(DSRequester requester, @@ -64,13 +66,24 @@ public void handleResponse(DS2MessageReader response) { @Override public void write(MessageWriter writer) { - //if has multipart remaining send that DS2MessageWriter out = (DS2MessageWriter) writer; - out.init(getRequestId(), getSession().getNextAck()); + if (multipart != null) { + if (multipart.update(out, getSession().getNextAck())) { + getRequester().sendRequest(this); + } + return; + } + int ack = getSession().getNextAck(); + out.init(getRequestId(), ack); out.setMethod(MSG_LIST_REQ); - out.addStringHeader((byte) HDR_TARGET_PATH, getPath()); - out.write((DSBinaryTransport) getRequester().getTransport()); - //if multipart + out.addStringHeader(HDR_TARGET_PATH, getPath()); + if (out.requiresMultipart()) { + multipart = out.makeMultipart(); + multipart.update(out, ack); + getRequester().sendRequest(this); + } else { + out.write((DSBinaryTransport) getRequester().getTransport()); + } } } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundSetStub.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundSetStub.java index 6dc1ae41..158698d6 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundSetStub.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundSetStub.java @@ -6,6 +6,7 @@ import com.acuity.iot.dsa.dslink.protocol.v2.DS2MessageReader; import com.acuity.iot.dsa.dslink.protocol.v2.DS2MessageWriter; import com.acuity.iot.dsa.dslink.protocol.v2.MessageConstants; +import com.acuity.iot.dsa.dslink.protocol.v2.MultipartWriter; import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport; import org.iot.dsa.dslink.requester.OutboundRequestHandler; import org.iot.dsa.node.DSIValue; @@ -14,6 +15,8 @@ public class DS2OutboundSetStub extends DSOutboundSetStub implements DS2OutboundStub, MessageConstants { + private MultipartWriter multipart; + protected DS2OutboundSetStub(DSRequester requester, Integer requestId, String path, @@ -28,9 +31,15 @@ public void handleResponse(DS2MessageReader response) { @Override public void write(MessageWriter writer) { - //if has multipart remaining send that DS2MessageWriter out = (DS2MessageWriter) writer; - out.init(getRequestId(), getSession().getNextAck()); + if (multipart != null) { + if (multipart.update(out, getSession().getNextAck())) { + getRequester().sendRequest(this); + } + return; + } + int ack = getSession().getNextAck(); + out.init(getRequestId(), ack); out.setMethod(MSG_SET_REQ); String path = getPath(); int idx = 1 + path.lastIndexOf('/'); @@ -40,13 +49,19 @@ public void write(MessageWriter writer) { idx = elems.length - 1; String attr = elems[idx]; path = DSPath.encodePath(path.charAt(0) == '/', elems, idx); - out.addStringHeader((byte) HDR_ATTRIBUTE_FIELD, attr); + out.addStringHeader(HDR_ATTRIBUTE_FIELD, attr); } } - out.addStringHeader((byte) HDR_TARGET_PATH, path); + out.addStringHeader(HDR_TARGET_PATH, path); out.getBody().put((byte) 0, (byte) 0); out.getWriter().value(getValue().toElement()); - out.write((DSBinaryTransport) getRequester().getTransport()); - //if multipart + if (out.requiresMultipart()) { + multipart = out.makeMultipart(); + multipart.update(out, ack); + getRequester().sendRequest(this); + } else { + out.write((DSBinaryTransport) getRequester().getTransport()); + } + } } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundSubscriptions.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundSubscriptions.java index 9fcf177f..97a9b46e 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundSubscriptions.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundSubscriptions.java @@ -4,6 +4,7 @@ import com.acuity.iot.dsa.dslink.protocol.requester.DSOutboundSubscriptions; import com.acuity.iot.dsa.dslink.protocol.requester.DSRequester; import com.acuity.iot.dsa.dslink.protocol.v2.DS2MessageWriter; +import com.acuity.iot.dsa.dslink.protocol.v2.DS2Session; import com.acuity.iot.dsa.dslink.protocol.v2.MessageConstants; import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport; @@ -45,10 +46,11 @@ protected void doEndMessage(MessageWriter writer) { @Override protected void doWriteSubscribe(MessageWriter writer, String path, Integer sid, int qos) { DS2MessageWriter ds2 = (DS2MessageWriter) writer; - ds2.init(sid, getRequester().getSession().getNextAck()); + DS2Session session = (DS2Session) getRequester().getSession(); + ds2.init(sid, session.getNextAck()); ds2.setMethod(MSG_SUBSCRIBE_REQ); - ds2.addStringHeader((byte) HDR_TARGET_PATH, path); - ds2.addByteHeader((byte) HDR_QOS, (byte) qos); + ds2.addStringHeader(HDR_TARGET_PATH, path); + ds2.addByteHeader(HDR_QOS, (byte) qos); ds2.write((DSBinaryTransport) getRequester().getTransport()); } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundInvoke.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundInvoke.java index 96ce4721..6ada81b5 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundInvoke.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundInvoke.java @@ -4,6 +4,7 @@ import com.acuity.iot.dsa.dslink.protocol.responder.DSInboundInvoke; import com.acuity.iot.dsa.dslink.protocol.v2.DS2MessageWriter; import com.acuity.iot.dsa.dslink.protocol.v2.MessageConstants; +import com.acuity.iot.dsa.dslink.protocol.v2.MultipartWriter; import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport; import org.iot.dsa.node.DSMap; import org.iot.dsa.security.DSPermission; @@ -15,19 +16,32 @@ */ class DS2InboundInvoke extends DSInboundInvoke implements MessageConstants { + MultipartWriter multipart; + DS2InboundInvoke(DSMap parameters, DSPermission permission) { super(parameters, permission); } @Override public void write(MessageWriter writer) { - //if has remaining multipart, send that DS2MessageWriter out = (DS2MessageWriter) writer; - out.init(getRequestId(), getSession().getNextAck()); - out.setMethod((byte) MSG_INVOKE_RES); + if (multipart != null) { + if (multipart.update(out, getSession().getNextAck())) { + getResponder().sendResponse(this); + } + return; + } + int ack = getSession().getNextAck(); + out.init(getRequestId(), ack); + out.setMethod(MSG_INVOKE_RES); super.write(writer); - out.write((DSBinaryTransport) getResponder().getTransport()); - //if has multipart + if (out.requiresMultipart()) { + multipart = out.makeMultipart(); + multipart.update(out, ack); + getResponder().sendResponse(this); + } else { + out.write((DSBinaryTransport) getResponder().getTransport()); + } } @Override @@ -38,13 +52,13 @@ protected void writeBegin(MessageWriter writer) { @Override protected void writeClose(MessageWriter writer) { DS2MessageWriter out = (DS2MessageWriter) writer; - out.addByteHeader((byte) HDR_STATUS, STS_CLOSED); + out.addByteHeader(HDR_STATUS, STS_CLOSED); } @Override protected void writeOpen(MessageWriter writer) { DS2MessageWriter out = (DS2MessageWriter) writer; - out.addByteHeader((byte) HDR_STATUS, STS_OK); + out.addByteHeader(HDR_STATUS, STS_OK); } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundList.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundList.java index 5c4774d7..9bc054e3 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundList.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundList.java @@ -6,6 +6,7 @@ import com.acuity.iot.dsa.dslink.protocol.responder.DSInboundList; import com.acuity.iot.dsa.dslink.protocol.v2.DS2MessageWriter; import com.acuity.iot.dsa.dslink.protocol.v2.MessageConstants; +import com.acuity.iot.dsa.dslink.protocol.v2.MultipartWriter; import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport; import org.iot.dsa.node.DSElement; import org.iot.dsa.node.DSPath; @@ -17,6 +18,8 @@ */ class DS2InboundList extends DSInboundList implements MessageConstants { + MultipartWriter multipart; + @Override protected void beginMessage(MessageWriter writer) { } @@ -28,9 +31,11 @@ protected void beginUpdates(MessageWriter writer) { @Override protected void encode(String key, DSElement value, MessageWriter writer) { DS2MessageWriter out = (DS2MessageWriter) writer; + /* if (out.isDebug()) { out.getDebug().append('\n').append(key).append(" : ").append(value); } + */ DSByteBuffer buf = out.getBody(); MsgpackWriter mp = out.getWriter(); buf.skip(2); @@ -49,9 +54,11 @@ protected void encode(String key, DSElement value, MessageWriter writer) { @Override protected void encode(String key, String value, MessageWriter writer) { DS2MessageWriter out = (DS2MessageWriter) writer; + /* if (out.isDebug()) { out.getDebug().append('\n').append(key).append(" : ").append(value); } + */ DSByteBuffer buf = out.getBody(); MsgpackWriter mp = out.getWriter(); buf.skip(2); @@ -91,11 +98,11 @@ protected void encodeUpdate(Update update, MessageWriter writer, StringBuilder b protected void endMessage(MessageWriter writer, Boolean streamOpen) { DS2MessageWriter out = (DS2MessageWriter) writer; if (streamOpen == null) { - out.addByteHeader((byte) HDR_STATUS, STS_INITIALIZING); + out.addByteHeader(HDR_STATUS, STS_INITIALIZING); } else if (streamOpen) { - out.addByteHeader((byte) HDR_STATUS, STS_OK); + out.addByteHeader(HDR_STATUS, STS_OK); } else { - out.addByteHeader((byte) HDR_STATUS, STS_CLOSED); + out.addByteHeader(HDR_STATUS, STS_CLOSED); } } @@ -105,13 +112,24 @@ protected void endUpdates(MessageWriter writer) { @Override public void write(MessageWriter writer) { - //if has remaining multipart, send that DS2MessageWriter out = (DS2MessageWriter) writer; - out.init(getRequestId(), getSession().getNextAck()); - out.setMethod((byte) MSG_LIST_RES); + if (multipart != null) { + if (multipart.update(out, getSession().getNextAck())) { + getResponder().sendResponse(this); + } + return; + } + int ack = getSession().getNextAck(); + out.init(getRequestId(), ack); + out.setMethod(MSG_LIST_RES); super.write(writer); - out.write((DSBinaryTransport) getResponder().getTransport()); - //if has multipart + if (out.requiresMultipart()) { + multipart = out.makeMultipart(); + multipart.update(out, ack); + getResponder().sendResponse(this); + } else { + out.write((DSBinaryTransport) getResponder().getTransport()); + } } } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundSubscription.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundSubscription.java index 13f2ae27..d8bee2ae 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundSubscription.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundSubscription.java @@ -38,7 +38,7 @@ protected DS2InboundSubscription(DSInboundSubscriptions manager, protected void write(Update update, MessageWriter writer, StringBuilder buf) { DS2MessageWriter messageWriter = (DS2MessageWriter) writer; messageWriter.init(getSubscriptionId(), getSession().getNextAck()); - messageWriter.setMethod((byte) MSG_SUBSCRIBE_RES); + messageWriter.setMethod(MSG_SUBSCRIBE_RES); DSIWriter dsiWriter = messageWriter.getWriter(); DSByteBuffer byteBuffer = messageWriter.getBody(); byteBuffer.skip(2); diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2Responder.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2Responder.java index 223e80a0..441ab116 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2Responder.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2Responder.java @@ -120,11 +120,11 @@ private void processInvoke(DS2MessageReader msg) { params = msg.getBodyReader().getMap(); } DSPermission perm = DSPermission.READ; - Object obj = msg.getHeader(MessageConstants.HDR_MAX_PERMISSION); + Object obj = msg.getHeader(HDR_MAX_PERMISSION); if (obj != null) { perm = DSPermission.valueOf(obj.hashCode()); } - boolean stream = msg.getHeader(MessageConstants.HDR_NO_STREAM) == null; + boolean stream = msg.getHeader(HDR_NO_STREAM) == null; DS2InboundInvoke invokeImpl = new DS2InboundInvoke(params, perm); invokeImpl.setStream(stream) .setPath((String) msg.getHeader(HDR_TARGET_PATH)) @@ -141,7 +141,7 @@ private void processInvoke(DS2MessageReader msg) { private void processList(DS2MessageReader msg) { int rid = msg.getRequestId(); String path = (String) msg.getHeader(HDR_TARGET_PATH); - boolean stream = msg.getHeader(MessageConstants.HDR_NO_STREAM) == null; + boolean stream = msg.getHeader(HDR_NO_STREAM) == null; DS2InboundList listImpl = new DS2InboundList(); listImpl.setStream(stream) .setPath(path) @@ -158,7 +158,7 @@ private void processList(DS2MessageReader msg) { private void processSet(DS2MessageReader msg) { int rid = msg.getRequestId(); DSPermission perm = DSPermission.READ; - Object obj = msg.getHeader(MessageConstants.HDR_MAX_PERMISSION); + Object obj = msg.getHeader(HDR_MAX_PERMISSION); if (obj != null) { perm = DSPermission.valueOf(obj.hashCode()); } @@ -169,8 +169,8 @@ private void processSet(DS2MessageReader msg) { } DSElement value = msg.getBodyReader().getElement(); DSInboundSet setImpl = new DSInboundSet(value, perm); - String path = (String) msg.getHeader(MessageConstants.HDR_TARGET_PATH); - String attr = (String) msg.getHeader(MessageConstants.HDR_ATTRIBUTE_FIELD); + String path = (String) msg.getHeader(HDR_TARGET_PATH); + String attr = (String) msg.getHeader(HDR_ATTRIBUTE_FIELD); if ((attr != null) && !attr.isEmpty()) { path = DSPath.concat(path, attr, null).toString(); } @@ -188,13 +188,13 @@ private void processSubscribe(DS2MessageReader msg) { Integer sid = msg.getRequestId(); //todo if no stream String path = (String) msg.getHeader(HDR_TARGET_PATH); - Number qos = (Number) msg.getHeader(MessageConstants.HDR_QOS); + Number qos = (Number) msg.getHeader(HDR_QOS); if (qos == null) { qos = Integer.valueOf(0); } //Integer queueSize = (Integer) msg.getHeader(MessageConstants.HDR_QUEUE_SIZE); DSInboundSubscription sub = subscriptions.subscribe(sid, path, qos.intValue()); - if (msg.getHeader(MessageConstants.HDR_NO_STREAM) != null) { + if (msg.getHeader(HDR_NO_STREAM) != null) { sub.setCloseAfterUpdate(true); } } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/ErrorMessage.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/ErrorMessage.java index 10f61ce1..85a25d6c 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/ErrorMessage.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/ErrorMessage.java @@ -31,30 +31,26 @@ public void write(MessageWriter writer) { DS2MessageWriter out = (DS2MessageWriter) writer; out.init(req.getRequestId(), req.getSession().getNextAck()); if (req instanceof DS2InboundInvoke) { - out.setMethod((byte) MSG_INVOKE_RES); + out.setMethod(MSG_INVOKE_RES); } else if (req instanceof DS2InboundList) { - out.setMethod((byte) MSG_LIST_RES); + out.setMethod(MSG_LIST_RES); } else if (req instanceof DS2InboundSet) { - out.setMethod((byte) MSG_SET_RES); + out.setMethod(MSG_SET_RES); } else { - out.setMethod((byte) MSG_CLOSE); + out.setMethod(MSG_CLOSE); } if (reason instanceof DSRequestException) { if (reason instanceof DSInvalidPathException) { - out.addByteHeader((byte) MessageConstants.HDR_STATUS, - MessageConstants.STS_NOT_AVAILABLE); + out.addByteHeader(HDR_STATUS, STS_NOT_AVAILABLE); } else if (reason instanceof DSPermissionException) { - out.addByteHeader((byte) MessageConstants.HDR_STATUS, - MessageConstants.STS_PERMISSION_DENIED); + out.addByteHeader(HDR_STATUS, STS_PERMISSION_DENIED); } else { - out.addByteHeader((byte) MessageConstants.HDR_STATUS, - MessageConstants.STS_INVALID_MESSAGE); + out.addByteHeader(HDR_STATUS, STS_INVALID_MESSAGE); } } else { - out.addByteHeader((byte) MessageConstants.HDR_STATUS, - MessageConstants.STS_INTERNAL_ERR); + out.addByteHeader(HDR_STATUS, STS_INTERNAL_ERR); } - out.addStringHeader((byte) HDR_ERROR_DETAIL, DSException.makeMessage(reason)); + out.addStringHeader(HDR_ERROR_DETAIL, DSException.makeMessage(reason)); out.write((DSBinaryTransport) req.getResponder().getTransport()); } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/test/TestTransport.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/test/TestTransport.java index 2b44ab4f..223e56fa 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/test/TestTransport.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/test/TestTransport.java @@ -1,28 +1,17 @@ package com.acuity.iot.dsa.dslink.test; -import com.acuity.iot.dsa.dslink.transport.PushBinaryTransport; +import com.acuity.iot.dsa.dslink.transport.BufferedBinaryTransport; /** * Routes requests and responses back to self. * * @author Aaron Hansen */ -public class TestTransport extends PushBinaryTransport { - - private int messageSize; - - @Override - public int messageSize() { - return messageSize; - } +public class TestTransport extends BufferedBinaryTransport { @Override - public void write(byte[] buf, int off, int len, boolean isLast) { - messageSize += len; - push(buf, off, len); - if (isLast) { - messageSize = 0; - } + protected void doWrite(byte[] buf, int off, int len, boolean isLast) { + receive(buf, off, len); } } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/BufferedBinaryTransport.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/BufferedBinaryTransport.java new file mode 100644 index 00000000..e5fef457 --- /dev/null +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/BufferedBinaryTransport.java @@ -0,0 +1,302 @@ +package com.acuity.iot.dsa.dslink.transport; + +import com.acuity.iot.dsa.dslink.io.DSByteBuffer; +import com.acuity.iot.dsa.dslink.io.DSIoException; +import java.io.InputStream; +import org.iot.dsa.node.DSBytes; +import org.iot.dsa.util.DSException; + +/** + * For binary transports that do not have an InputStream (such as websockets). Subclasses + * should call receive() for incoming bytes. + * + * @author Aaron Hansen + */ +public abstract class BufferedBinaryTransport extends DSBinaryTransport { + + /////////////////////////////////////////////////////////////////////////// + // Fields + /////////////////////////////////////////////////////////////////////////// + + private static final int DEBUG_COLS = 30; + + private RuntimeException closeException; + private InputStream input = new MyInputStream(); + private int messageSize; + private boolean open = false; + private DSByteBuffer readBuffer = new DSByteBuffer(); + private StringBuilder traceIn; + private int traceInSize = 0; + private StringBuilder traceOut; + private int traceOutSize = 0; + + ///////////////////////////////////////////////////////////////// + // Methods - In alphabetical order by method name. + ///////////////////////////////////////////////////////////////// + + @Override + public void beginRecvMessage() { + if (trace()) { + if (traceIn == null) { + traceIn = new StringBuilder(); + } + traceInSize = 0; + traceIn.append("Recv:\n"); + } + } + + @Override + public void beginSendMessage() { + if (trace()) { + if (traceOut == null) { + traceOut = new StringBuilder(); + } + traceOutSize = 0; + traceOut.append("Send:\n"); + } + } + + protected void close(Throwable reason) { + closeException = DSException.makeRuntime(reason); + close(); + } + + @Override + public DSTransport close() { + synchronized (this) { + open = false; + notifyAll(); + } + return this; + } + + /** + * Called by the write method. + */ + protected abstract void doWrite(byte[] buf, int off, int len, boolean isLast); + + @Override + public void endRecvMessage() { + if (trace()) { + if (traceIn != null) { + if (traceIn.length() > 6) { + trace(traceIn.toString()); + } + traceIn.setLength(0); + } + } else if (traceIn != null) { + traceIn = null; + } + } + + @Override + public void endSendMessage() { + if (trace()) { + if (traceOut != null) { + if (traceOut.length() > 6) { + trace(traceOut.toString()); + } + traceOut.setLength(0); + } + } else if (traceOut != null) { + traceOut = null; + } + } + + @Override + public InputStream getInput() { + return input; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public int messageSize() { + return messageSize; + } + + /** + * Overrides must call super to set the open state. + */ + public DSTransport open() { + open = true; + return this; + } + + /** + * Call this for all incoming bytes. + */ + protected void receive(byte[] bytes, int off, int len) { + if (!testOpen()) { + throw new IllegalStateException("Transport closed"); + } + synchronized (this) { + readBuffer.put(bytes, 0, len); + notifyAll(); + } + } + + /** + * Returns true if open, false if closed, throws and exception if closed with an exception. + */ + protected boolean testOpen() { + if (!open) { + if (closeException != null) { + throw closeException; + } + return false; + } + return true; + } + + /** + * Handles logging and calls doWrite. + */ + public final void write(byte[] buf, int off, int len, boolean isLast) { + if (trace()) { + for (int i = off, j = off + len; i < j; i++) { + if (traceOutSize > 0) { + traceOut.append(' '); + } + DSBytes.toHex(buf[i], traceOut); + if (++traceOutSize == DEBUG_COLS) { + traceOutSize = 0; + traceOut.append('\n'); + } + } + } + if (isLast) { + messageSize = 0; + } else { + messageSize += len; + } + doWrite(buf, off, len, isLast); + } + + ///////////////////////////////////////////////////////////////// + // Inner Classes + ///////////////////////////////////////////////////////////////// + + private class MyInputStream extends InputStream { + + @Override + public int available() { + return readBuffer.available(); + } + + @Override + public void close() { + BufferedBinaryTransport.this.close(); + } + + @Override + public int read() { + synchronized (BufferedBinaryTransport.this) { + if (testOpen() && readBuffer.available() == 0) { + try { + BufferedBinaryTransport.this.wait(getReadTimeout()); + } catch (Exception x) { + } + } + if (!testOpen()) { + return -1; + } + if (readBuffer.available() == 0) { + throw new DSIoException("Read timeout"); + } + int ch = readBuffer.read(); + if (trace() && (ch >= 0)) { + if (traceInSize > 0) { + traceIn.append(' '); + } + DSBytes.toHex((byte) ch, traceIn); + if (++traceInSize == DEBUG_COLS) { + traceInSize = 0; + traceIn.append('\n'); + } + } + return ch; + } + } + + @Override + public int read(byte[] buf) { + if (buf.length == 0) { + if (!testOpen()) { + return -1; + } + return 0; + } + synchronized (BufferedBinaryTransport.this) { + if (testOpen() && readBuffer.available() == 0) { + try { + BufferedBinaryTransport.this.wait(getReadTimeout()); + } catch (Exception x) { + } + } + if (!testOpen()) { + return -1; + } + if (readBuffer.available() == 0) { + throw new DSIoException("Read timeout"); + } + int ret = readBuffer.sendTo(buf, 0, buf.length); + if (trace() && (ret > 0)) { + for (int i = 0; i < ret; i++) { + if (traceInSize > 0) { + traceIn.append(' '); + } + DSBytes.toHex(buf[i], traceIn); + if (++traceInSize == DEBUG_COLS) { + traceInSize = 0; + traceIn.append('\n'); + } + } + } + return ret; + } + } + + @Override + public int read(byte[] buf, int off, int len) { + if (len == 0) { + if (!testOpen()) { + return -1; + } + return 0; + } + synchronized (BufferedBinaryTransport.this) { + if (testOpen() && readBuffer.available() == 0) { + try { + BufferedBinaryTransport.this.wait(getReadTimeout()); + } catch (Exception x) { + } + } + if (!testOpen()) { + return -1; + } + if (readBuffer.available() == 0) { + throw new DSIoException("Read timeout"); + } + int ret = readBuffer.sendTo(buf, off, len); + if (trace() && (ret > 0)) { + for (int i = 0; i < ret; i++) { + if (traceInSize > 0) { + traceIn.append(' '); + } + DSBytes.toHex(buf[i], traceIn); + if (++traceInSize == DEBUG_COLS) { + traceInSize = 0; + traceIn.append('\n'); + } + } + } + return ret; + } + } + } + +} diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/DSTransport.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/DSTransport.java index 5b4aa836..b46d228b 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/DSTransport.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/DSTransport.java @@ -25,8 +25,6 @@ public abstract class DSTransport extends DSNode { /////////////////////////////////////////////////////////////////////////// private DSLinkConnection connection; - private StringBuilder debugIn; - private StringBuilder debugOut; /////////////////////////////////////////////////////////////////////////// // Methods @@ -37,16 +35,14 @@ public abstract class DSTransport extends DSNode { * * @return This */ - public DSTransport beginRecvMessage() { - return this; - } + public abstract void beginRecvMessage(); /** * Called at the start of a new outbound message. * * @return This */ - public abstract DSTransport beginSendMessage(); + public abstract void beginSendMessage(); /** * Close the actual connection and clean up resources. Calling when already closed will have no @@ -63,11 +59,14 @@ protected void declareDefaults() { } /** - * Signifies the end of an outgoing message. Needed because websockets are frame based. - * - * @return This + * Signifies the end of an incoming message. */ - public abstract DSTransport endSendMessage(); + public abstract void endRecvMessage(); + + /** + * Signifies the end of an outgoing message. + */ + public abstract void endSendMessage(); public DSLinkConnection getConnection() { return connection; @@ -77,14 +76,6 @@ public String getConnectionUrl() { return String.valueOf(get(CONNECTION_URL)); } - public StringBuilder getDebugIn() { - return debugIn; - } - - public StringBuilder getDebugOut() { - return debugOut; - } - @Override protected String getLogName() { return getClass().getSimpleName(); @@ -127,16 +118,6 @@ public DSTransport setConnectionUrl(String url) { return this; } - public DSTransport setDebugIn(StringBuilder buf) { - this.debugIn = buf; - return this; - } - - public DSTransport setDebugOut(StringBuilder buf) { - this.debugOut = buf; - return this; - } - /** * The number of millis the input stream will block on a read before throwing a DSIoException. * diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/PushBinaryTransport.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/PushBinaryTransport.java deleted file mode 100644 index 6ee794c8..00000000 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/PushBinaryTransport.java +++ /dev/null @@ -1,183 +0,0 @@ -package com.acuity.iot.dsa.dslink.transport; - -import com.acuity.iot.dsa.dslink.io.DSByteBuffer; -import com.acuity.iot.dsa.dslink.io.DSIoException; -import java.io.IOException; -import java.io.InputStream; -import org.iot.dsa.util.DSException; - -/** - * For transports that push values for reading (such as the TestTransport and websockets). - * - * @author Aaron Hansen - */ -public abstract class PushBinaryTransport extends DSBinaryTransport { - - /////////////////////////////////////////////////////////////////////////// - // Fields - /////////////////////////////////////////////////////////////////////////// - - private RuntimeException closeException; - private InputStream input = new MyInputStream(); - private boolean open = false; - private DSByteBuffer readBuffer = new DSByteBuffer(); - - ///////////////////////////////////////////////////////////////// - // Methods - In alphabetical order by method name. - ///////////////////////////////////////////////////////////////// - - @Override - public DSTransport beginSendMessage() { - return this; - } - - protected void close(Throwable reason) { - closeException = DSException.makeRuntime(reason); - close(); - } - - @Override - public DSTransport close() { - synchronized (this) { - open = false; - notifyAll(); - } - return this; - } - - @Override - public DSTransport endSendMessage() { - return this; - } - - @Override - public InputStream getInput() { - return input; - } - - @Override - public boolean isOpen() { - return open; - } - - /** - * Call super to set the open state. - */ - public DSTransport open() { - open = true; - return this; - } - - /** - * Call this to push bytes. - */ - protected void push(byte[] bytes, int off, int len) { - if (!testOpen()) { - throw new IllegalStateException("Transport closed"); - } - synchronized (this) { - readBuffer.put(bytes, 0, len); - notifyAll(); - } - } - - /** - * Returns true if open, false if closed, throws and exception if closed with an exception. - */ - protected boolean testOpen() { - if (!open) { - if (closeException != null) { - throw closeException; - } - return false; - } - return true; - } - - ///////////////////////////////////////////////////////////////// - // Inner Classes - ///////////////////////////////////////////////////////////////// - - private class MyInputStream extends InputStream { - - @Override - public int available() { - return readBuffer.available(); - } - - @Override - public void close() { - PushBinaryTransport.this.close(); - } - - @Override - public int read() throws IOException { - synchronized (PushBinaryTransport.this) { - if (testOpen() && readBuffer.available() == 0) { - try { - PushBinaryTransport.this.wait(getReadTimeout()); - } catch (Exception x) { - } - } - if (!testOpen()) { - return -1; - } - if (readBuffer.available() == 0) { - throw new DSIoException("Read timeout"); - } - return readBuffer.read(); - } - } - - @Override - public int read(byte[] buf) throws IOException { - if (buf.length == 0) { - if (!testOpen()) { - return -1; - } - return 0; - } - synchronized (PushBinaryTransport.this) { - if (testOpen() && readBuffer.available() == 0) { - try { - PushBinaryTransport.this.wait(getReadTimeout()); - } catch (Exception x) { - } - } - if (!testOpen()) { - return -1; - } - if (readBuffer.available() == 0) { - throw new DSIoException("Read timeout"); - } - return readBuffer.sendTo(buf, 0, buf.length); - } - } - - @Override - public int read(byte[] buf, int off, int len) throws IOException { - if (len == 0) { - if (!testOpen()) { - return -1; - } - return 0; - } - synchronized (PushBinaryTransport.this) { - if (testOpen() && readBuffer.available() == 0) { - try { - PushBinaryTransport.this.wait(getReadTimeout()); - } catch (Exception x) { - } - } - if (!testOpen()) { - return -1; - } - if (readBuffer.available() == 0) { - throw new DSIoException("Read timeout"); - } - return readBuffer.sendTo(buf, off, len); - } - } - } - -} diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/SocketTransport.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/SocketTransport.java index 08944a84..d68cb0c2 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/SocketTransport.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/SocketTransport.java @@ -1,12 +1,10 @@ package com.acuity.iot.dsa.dslink.transport; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.URI; import javax.net.ssl.SSLSocketFactory; -import org.iot.dsa.node.DSBytes; import org.iot.dsa.util.DSException; /** @@ -14,19 +12,10 @@ * * @author Aaron Hansen */ -public class SocketTransport extends DSBinaryTransport { +public class SocketTransport extends StreamBinaryTransport { - /////////////////////////////////////////////////////////////////////////// - // Fields - /////////////////////////////////////////////////////////////////////////// - - private static final int DEBUG_COLS = 30; - - private int debugInSize; - private int debugOutSize; - private int messageSize; + private InputStream in; private OutputStream out; - private boolean open = false; private Socket socket; ///////////////////////////////////////////////////////////////// @@ -34,76 +23,36 @@ public class SocketTransport extends DSBinaryTransport { ///////////////////////////////////////////////////////////////// @Override - public DSTransport beginRecvMessage() { - debugInSize = 0; - return this; - } - - @Override - public DSTransport beginSendMessage() { - debugOutSize = 0; - return this; - } - - @Override - public DSTransport close() { - if (!open) { - return this; + protected void doClose() { + try { + if (in != null) { + in.close(); + } + } catch (Exception x) { + trace(getConnection().getConnectionId(), x); } try { if (out != null) { out.close(); } } catch (Exception x) { - debug(getConnection().getConnectionId(), x); + trace(getConnection().getConnectionId(), x); } try { if (socket != null) { socket.close(); } } catch (Exception x) { - debug(getConnection().getConnectionId(), x); + trace(getConnection().getConnectionId(), x); } + in = null; out = null; socket = null; - open = false; - return this; - } - - @Override - public DSTransport endSendMessage() { - return this; - } - - @Override - public InputStream getInput() { - InputStream ret = null; - try { - ret = socket.getInputStream(); - if (getDebugIn() != null) { - ret = new DebugInputStream(ret); - } - } catch (IOException x) { - DSException.throwRuntime(x); - } - return ret; - } - - @Override - public boolean isOpen() { - if ((socket == null) || socket.isClosed()) { - return false; - } - return true; - } - - public int messageSize() { - return messageSize; } @Override public DSTransport open() { - if (open) { + if (isOpen()) { throw new IllegalStateException("Already open"); } try { @@ -126,7 +75,7 @@ public DSTransport open() { throw new IllegalArgumentException("Invalid broker URI: " + url); } socket.setSoTimeout((int) getReadTimeout()); - open = true; + init(socket.getInputStream(), socket.getOutputStream()); fine(fine() ? "SocketTransport open" : null); } catch (Exception x) { DSException.throwRuntime(x); @@ -134,100 +83,4 @@ public DSTransport open() { return this; } - /** - * Write the bytes to the socket, isLast is ignored. - */ - public void write(byte[] buf, int off, int len, boolean isLast) { - try { - StringBuilder debug = getDebugOut(); - if (debug != null) { - for (int i = off, j = off + len; i < j; i++) { - if (debugOutSize > 0) { - debug.append(' '); - } - DSBytes.toHex(buf[i], debug); - if (++debugOutSize == DEBUG_COLS) { - debugOutSize = 0; - debug.append('\n'); - } - } - } - messageSize += len; - if (out == null) { - out = socket.getOutputStream(); - } - out.write(buf, 0, len); - } catch (IOException x) { - DSException.throwRuntime(x); - } - } - - // Inner Classes - // ------------- - - private class DebugInputStream extends InputStream { - - private InputStream inner; - - public DebugInputStream(InputStream inner) { - this.inner = inner; - } - - @Override - public int read() throws IOException { - StringBuilder debug = getDebugIn(); - int ch = inner.read(); - if (debug != null) { - if (debugInSize > 0) { - debug.append(' '); - } - DSBytes.toHex((byte) ch, debug); - if (++debugInSize == DEBUG_COLS) { - debugInSize = 0; - debug.append('\n'); - } - } - return ch; - } - - @Override - public int read(byte[] buf) throws IOException { - int ret = inner.read(buf); - StringBuilder debug = getDebugOut(); - if ((debug != null) && (ret > 0)) { - for (int i = 0; i < ret; i++) { - if (debugInSize > 0) { - debug.append(' '); - } - DSBytes.toHex(buf[i], debug); - if (++debugInSize == DEBUG_COLS) { - debugInSize = 0; - debug.append('\n'); - } - } - } - return ret; - } - - @Override - public int read(byte[] buf, int off, int len) throws IOException { - int ret = inner.read(buf, off, len); - StringBuilder debug = getDebugOut(); - if ((debug != null) && (ret > 0)) { - for (int i = off, j = off + ret; i < j; i++) { - if (debugInSize > 0) { - debug.append(' '); - } - DSBytes.toHex(buf[i], debug); - if (++debugInSize == DEBUG_COLS) { - debugInSize = 0; - debug.append('\n'); - } - } - } - return ret; - } - - } //DebugInputStream - } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/StreamBinaryTransport.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/StreamBinaryTransport.java new file mode 100644 index 00000000..9aeb1442 --- /dev/null +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/StreamBinaryTransport.java @@ -0,0 +1,254 @@ +package com.acuity.iot.dsa.dslink.transport; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.iot.dsa.node.DSBytes; +import org.iot.dsa.util.DSException; + +/** + * For stream based binary transports. Subclasses must call init(InputStream,OutputStream). + * + * @author Aaron Hansen + */ +public abstract class StreamBinaryTransport extends DSBinaryTransport { + + /////////////////////////////////////////////////////////////////////////// + // Fields + /////////////////////////////////////////////////////////////////////////// + + private static final int DEBUG_COLS = 30; + + private RuntimeException closeException; + private InputStream innerIn; + private OutputStream innerOut; + private int messageSize; + private boolean open = false; + private InputStream outerIn = new MyInputStream(); + private StringBuilder traceIn; + private int traceInSize = 0; + private StringBuilder traceOut; + private int traceOutSize = 0; + + ///////////////////////////////////////////////////////////////// + // Methods - In alphabetical order by method name. + ///////////////////////////////////////////////////////////////// + + @Override + public void beginRecvMessage() { + if (trace()) { + if (traceIn == null) { + traceIn = new StringBuilder(); + } + traceInSize = 0; + traceIn.append("Recv:\n"); + } + } + + @Override + public void beginSendMessage() { + if (trace()) { + if (traceOut == null) { + traceOut = new StringBuilder(); + } + traceOutSize = 0; + traceOut.append("Send:\n"); + } + } + + protected final void close(Throwable reason) { + closeException = DSException.makeRuntime(reason); + close(); + } + + @Override + public final DSTransport close() { + synchronized (this) { + open = false; + notifyAll(); + } + doClose(); + return this; + } + + /** + * Subclass to perform any cleanup, this does nothing. + */ + protected void doClose() { + } + + @Override + public void endRecvMessage() { + if (trace()) { + if (traceIn != null) { + if (traceIn.length() > 6) { + trace(traceIn.toString()); + } + traceIn.setLength(0); + } + } else if (traceIn != null) { + traceIn = null; + } + } + + @Override + public void endSendMessage() { + if (trace()) { + if (traceOut != null) { + if (traceOut.length() > 6) { + trace(traceOut.toString()); + } + traceOut.setLength(0); + } + } else if (traceOut != null) { + traceOut = null; + } + } + + @Override + public InputStream getInput() { + if (trace()) { + return outerIn; + } + return innerIn; + } + + /** + * Must be called by the open() implementation. + */ + public void init(InputStream in, OutputStream out) { + this.innerIn = in; + this.innerOut = out; + open = true; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public int messageSize() { + return messageSize; + } + + /** + * Returns true if open, false if closed, throws and exception if closed with an exception. + */ + protected boolean testOpen() { + if (!open) { + if (closeException != null) { + throw closeException; + } + return false; + } + return true; + } + + /** + * Handles logging and calls doWrite. + */ + public void write(byte[] buf, int off, int len, boolean isLast) { + if (trace()) { + for (int i = off, j = off + len; i < j; i++) { + if (traceOutSize > 0) { + traceOut.append(' '); + } + DSBytes.toHex(buf[i], traceOut); + if (++traceOutSize == DEBUG_COLS) { + traceOutSize = 0; + traceOut.append('\n'); + } + } + } + if (isLast) { + messageSize = 0; + } else { + messageSize += len; + } + try { + innerOut.write(buf, off, len); + } catch (Exception x) { + DSException.throwRuntime(x); + } + } + + ///////////////////////////////////////////////////////////////// + // Inner Classes + ///////////////////////////////////////////////////////////////// + + private class MyInputStream extends InputStream { + + @Override + public int available() throws IOException { + return innerIn.available(); + } + + @Override + public void close() { + StreamBinaryTransport.this.close(); + } + + @Override + public int read() throws IOException { + if (!testOpen()) { + return -1; + } + int ch = innerIn.read(); + if (trace() && (ch >= 0)) { + if (traceInSize > 0) { + traceIn.append(' '); + } + DSBytes.toHex((byte) ch, traceIn); + if (++traceInSize == DEBUG_COLS) { + traceInSize = 0; + traceIn.append('\n'); + } + } + return ch; + } + + @Override + public int read(byte[] buf) throws IOException { + if (!testOpen()) { + return -1; + } + int ret = innerIn.read(buf); + if (trace() && (ret > 0)) { + for (int i = 0; i < ret; i++) { + if (traceInSize > 0) { + traceIn.append(' '); + } + DSBytes.toHex(buf[i], traceIn); + if (++traceInSize == DEBUG_COLS) { + traceInSize = 0; + traceIn.append('\n'); + } + } + } + return ret; + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + if (!testOpen()) { + return -1; + } + int ret = innerIn.read(buf, off, len); + if (trace() && (ret > 0)) { + for (int i = 0; i < ret; i++) { + if (traceInSize > 0) { + traceIn.append(' '); + } + DSBytes.toHex(buf[i], traceIn); + if (++traceInSize == DEBUG_COLS) { + traceInSize = 0; + traceIn.append('\n'); + } + } + } + return ret; + } + } + +} diff --git a/dslink-core/src/main/java/org/iot/dsa/logging/AsyncLogHandler.java b/dslink-core/src/main/java/org/iot/dsa/logging/AsyncLogHandler.java index ad960169..3724bd07 100644 --- a/dslink-core/src/main/java/org/iot/dsa/logging/AsyncLogHandler.java +++ b/dslink-core/src/main/java/org/iot/dsa/logging/AsyncLogHandler.java @@ -232,18 +232,19 @@ protected void write(LogRecord record) { return; } // severity - builder.append(toString(record.getLevel())).append(' '); + builder.append('[').append(toString(record.getLevel())).append(' '); // timestamp calendar.setTimeInMillis(record.getMillis()); DSTime.encodeForLogs(calendar, builder); + builder.append(']'); // log name String name = record.getLoggerName(); if ((name != null) && !name.isEmpty()) { - builder.append(" ["); + builder.append("["); builder.append(record.getLoggerName()); builder.append(']'); } else { - builder.append(" [Default]"); + builder.append("[Default]"); } // class if (record.getSourceClassName() != null) { diff --git a/dslink-websocket-standalone/src/main/java/org/iot/dsa/dslink/websocket/WsBinaryTransport.java b/dslink-websocket-standalone/src/main/java/org/iot/dsa/dslink/websocket/WsBinaryTransport.java index ad7305f2..2e332f43 100644 --- a/dslink-websocket-standalone/src/main/java/org/iot/dsa/dslink/websocket/WsBinaryTransport.java +++ b/dslink-websocket-standalone/src/main/java/org/iot/dsa/dslink/websocket/WsBinaryTransport.java @@ -1,8 +1,7 @@ package org.iot.dsa.dslink.websocket; -import com.acuity.iot.dsa.dslink.io.DSIoException; +import com.acuity.iot.dsa.dslink.transport.BufferedBinaryTransport; import com.acuity.iot.dsa.dslink.transport.DSTransport; -import com.acuity.iot.dsa.dslink.transport.PushBinaryTransport; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; @@ -25,14 +24,13 @@ * @author Aaron Hansen */ @ClientEndpoint -public class WsBinaryTransport extends PushBinaryTransport { +public class WsBinaryTransport extends BufferedBinaryTransport { /////////////////////////////////////////////////////////////////////////// // Fields /////////////////////////////////////////////////////////////////////////// private ClientManager client; - private int messageSize; private Session session; private ByteBuffer writeBuffer; @@ -40,6 +38,20 @@ public class WsBinaryTransport extends PushBinaryTransport { // Methods - In alphabetical order by method name. ///////////////////////////////////////////////////////////////// + @Override + protected void doWrite(byte[] buf, int off, int len, boolean isLast) { + ByteBuffer byteBuffer = getByteBuffer(len); + try { + byteBuffer.put(buf, off, len); + byteBuffer.flip(); + RemoteEndpoint.Basic basic = session.getBasicRemote(); + basic.sendBinary(byteBuffer, isLast); + byteBuffer.clear(); + } catch (IOException x) { + DSException.throwRuntime(x); + } + } + /** * Called by write(), returns a bytebuffer for the given capacity ready for writing * (putting). Attempts to reuse the same buffer as much as possible. @@ -63,13 +75,9 @@ private ByteBuffer getByteBuffer(int len) { return writeBuffer; } - public int messageSize() { - return messageSize; - } - @OnClose public void onClose(Session session, CloseReason reason) { - info(getConnectionUrl() + " remotely closed, reason = " + reason.toString()); + info(getConnectionUrl() + " closed remotely, reason = " + reason.toString()); close(); } @@ -80,7 +88,7 @@ public void onError(Session session, Throwable err) { @OnMessage public void onMessage(Session session, byte[] msgPart, boolean isLast) { - push(msgPart, 0, msgPart.length); + receive(msgPart, 0, msgPart.length); } @OnOpen @@ -98,34 +106,10 @@ public DSTransport open() { client.connectToServer(this, new URI(getConnectionUrl())); fine(fine() ? "Transport open" : null); } catch (Exception x) { - close(x); DSException.throwRuntime(x); } return this; } - @Override - public void write(byte[] buf, int off, int len, boolean isLast) { - if (!testOpen()) { - throw new DSIoException("Closed"); - } - ByteBuffer byteBuffer = getByteBuffer(len); - messageSize += len; - try { - byteBuffer.put(buf, off, len); - byteBuffer.flip(); - RemoteEndpoint.Basic basic = session.getBasicRemote(); - basic.sendBinary(byteBuffer, isLast); - byteBuffer.clear(); - if (isLast) { - messageSize = 0; - } - } catch (IOException x) { - if (isOpen()) { - close(); - DSException.throwRuntime(x); - } - } - } } diff --git a/dslink-websocket-standalone/src/main/java/org/iot/dsa/dslink/websocket/WsTextTransport.java b/dslink-websocket-standalone/src/main/java/org/iot/dsa/dslink/websocket/WsTextTransport.java index e84f5f51..221dd8c6 100644 --- a/dslink-websocket-standalone/src/main/java/org/iot/dsa/dslink/websocket/WsTextTransport.java +++ b/dslink-websocket-standalone/src/main/java/org/iot/dsa/dslink/websocket/WsTextTransport.java @@ -46,9 +46,12 @@ public class WsTextTransport extends DSTextTransport { ///////////////////////////////////////////////////////////////// @Override - public DSTransport beginSendMessage() { + public void beginRecvMessage() { + } + + @Override + public void beginSendMessage() { messageSize = 0; - return this; } @Override @@ -70,10 +73,13 @@ public DSTransport close() { } @Override - public DSTransport endSendMessage() { + public void endRecvMessage() { + } + + @Override + public void endSendMessage() { write("", true); messageSize = 0; - return this; } public Reader getReader() { From a531d71b79f46ff00730004fb8b72cdd60f71b93 Mon Sep 17 00:00:00 2001 From: Aaron Date: Wed, 14 Mar 2018 14:33:55 -0700 Subject: [PATCH 2/4] Multi-part messages. Improved debug. --- .../com/acuity/iot/dsa/dslink/protocol/v2/DS2Message.java | 8 +++++++- .../iot/dsa/dslink/protocol/v2/DS2MessageReader.java | 2 ++ .../iot/dsa/dslink/protocol/v2/MessageConstants.java | 1 + .../iot/dsa/dslink/transport/BufferedBinaryTransport.java | 5 ++++- .../iot/dsa/dslink/transport/StreamBinaryTransport.java | 5 ++++- 5 files changed, 18 insertions(+), 3 deletions(-) diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Message.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Message.java index b336c3bf..d56759c9 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Message.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Message.java @@ -87,6 +87,12 @@ private StringBuilder debugHeader(int arg, StringBuilder buf) { case HDR_PAGE_ID: buf.append("Page ID"); break; + case HDR_AUDIT: + buf.append("Audit"); + break; + case HDR_ERROR_DETAIL: + buf.append("Error Detail"); + break; case HDR_ALIAS_COUNT: buf.append("Alias Ct"); break; @@ -127,7 +133,7 @@ private StringBuilder debugHeader(int arg, StringBuilder buf) { buf.append("Target Path"); break; case HDR_SOURCE_PATH: - buf.append("Sourth Path"); + buf.append("Source Path"); break; default: buf.append("0x"); diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2MessageReader.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2MessageReader.java index 3cfd3473..1abae9ea 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2MessageReader.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2MessageReader.java @@ -236,6 +236,8 @@ void parseDynamicHeaders(InputStream in, int len, Map headers) val = DSBytes.readInt(in, false); len -= 4; break; + case HDR_AUDIT: + case HDR_ERROR_DETAIL: case HDR_PUB_PATH: case HDR_ATTRIBUTE_FIELD: case HDR_PERMISSION_TOKEN: diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MessageConstants.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MessageConstants.java index 677dfb9a..8eb556fe 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MessageConstants.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/MessageConstants.java @@ -16,6 +16,7 @@ public interface MessageConstants { int HDR_STATUS = 0x0; int HDR_SEQ_ID = 0x01; int HDR_PAGE_ID = 0x02; + int HDR_AUDIT = 0x04; int HDR_ERROR_DETAIL = 0x05; int HDR_ALIAS_COUNT = 0x08; int HDR_PRIORITY = 0x10; diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/BufferedBinaryTransport.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/BufferedBinaryTransport.java index e5fef457..aebbefed 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/BufferedBinaryTransport.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/BufferedBinaryTransport.java @@ -208,7 +208,10 @@ public int read() { throw new DSIoException("Read timeout"); } int ch = readBuffer.read(); - if (trace() && (ch >= 0)) { + if (ch == -1) { + return ch; + } + if (trace()) { if (traceInSize > 0) { traceIn.append(' '); } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/StreamBinaryTransport.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/StreamBinaryTransport.java index 9aeb1442..3a65462b 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/StreamBinaryTransport.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/StreamBinaryTransport.java @@ -195,7 +195,10 @@ public int read() throws IOException { return -1; } int ch = innerIn.read(); - if (trace() && (ch >= 0)) { + if (ch == -1) { + return ch; + } + if (trace()) { if (traceInSize > 0) { traceIn.append(' '); } From 6d181ac09b6d4582bf63072ea317bebf53617bf4 Mon Sep 17 00:00:00 2001 From: Aaron Date: Thu, 15 Mar 2018 18:00:47 -0700 Subject: [PATCH 3/4] Use info flags for action permission. Validate permissions on set and invoke requests. --- .../protocol/responder/DSInboundInvoke.java | 15 ++++++++++- .../protocol/responder/DSInboundList.java | 13 +++++++-- .../protocol/responder/DSInboundSet.java | 9 ++++++- .../v2/responder/DS2InboundInvoke.java | 3 +++ .../protocol/v2/responder/DS2InboundList.java | 5 +++- .../v2/responder/DS2InboundSubscription.java | 26 +++++++----------- .../protocol/v2/responder/DS2Responder.java | 5 ++-- .../main/java/org/iot/dsa/io/NodeEncoder.java | 2 +- .../org/iot/dsa/node/action/DSAction.java | 27 +++++++++---------- .../org/iot/dsa/security/DSPermission.java | 7 +++++ .../dsa/dslink/RequesterSubscribeTest.java | 11 +------- 11 files changed, 73 insertions(+), 50 deletions(-) diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundInvoke.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundInvoke.java index ec259a1c..26a014a9 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundInvoke.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundInvoke.java @@ -7,6 +7,7 @@ import java.util.Iterator; import org.iot.dsa.DSRuntime; import org.iot.dsa.dslink.DSIResponder; +import org.iot.dsa.dslink.DSPermissionException; import org.iot.dsa.dslink.DSRequestException; import org.iot.dsa.dslink.responder.InboundInvokeRequest; import org.iot.dsa.io.DSIWriter; @@ -236,7 +237,19 @@ public void run() { if (!info.isAction()) { throw new DSRequestException("Not an action " + path.getPath()); } - //TODO verify incoming permission + if (info.isAdmin()) { + if (!permission.isConfig()) { + throw new DSPermissionException("Config permission required"); + } + } else if (!info.isReadOnly()) { + if (DSPermission.WRITE.isGreaterThan(permission)) { + throw new DSPermissionException("Write permission required"); + } + } else { + if (DSPermission.READ.isGreaterThan(permission)) { + throw new DSPermissionException("Read permission required"); + } + } DSAction action = info.getAction(); result = action.invoke(info, this); } catch (Exception x) { diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundList.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundList.java index 55fccdfe..1778b567 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundList.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundList.java @@ -27,6 +27,7 @@ import org.iot.dsa.node.event.DSISubscriber; import org.iot.dsa.node.event.DSInfoTopic; import org.iot.dsa.node.event.DSTopic; +import org.iot.dsa.security.DSPermission; /** * List implementation for a responder. @@ -219,7 +220,11 @@ protected void encodeChild(ApiObject child, MessageWriter writer) { if (e != null) { map.put("$invokable", e); } else { - map.put("$invokable", action.getPermission().toString()); + if (child.isAdmin()) { + map.put("$invokable", DSPermission.CONFIG.toString()); + } else if (!child.isReadOnly()) { + map.put("$invokable", DSPermission.WRITE.toString()); + } } } else if (child.isValue()) { e = cacheMap.remove("$type"); @@ -301,7 +306,11 @@ private void encodeTargetAction(ApiObject object, MessageWriter writer) { } DSElement e = cacheMap.remove("$invokable"); if (e == null) { - encode("$invokable", action.getPermission().toString(), writer); + if (object.isAdmin()) { + encode("$invokable", DSPermission.CONFIG.toString(), writer); + } else if (!object.isReadOnly()) { + encode("$invokable", DSPermission.WRITE.toString(), writer); + } } else { encode("$invokable", e, writer); } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSet.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSet.java index a0c87616..7f31e913 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSet.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSet.java @@ -2,6 +2,7 @@ import com.acuity.iot.dsa.dslink.protocol.message.RequestPath; import org.iot.dsa.dslink.DSIResponder; +import org.iot.dsa.dslink.DSPermissionException; import org.iot.dsa.dslink.DSRequestException; import org.iot.dsa.dslink.responder.InboundSetRequest; import org.iot.dsa.node.DSElement; @@ -44,7 +45,13 @@ public void run() { if (info.isReadOnly()) { throw new DSRequestException("Not writable: " + getPath()); } - //TODO verify incoming permission + if (!permission.isConfig()) { + if (info.isAdmin()) { + throw new DSPermissionException("Config permission required"); + } else if (DSPermission.WRITE.isGreaterThan(permission)) { + throw new DSPermissionException("Write permission required"); + } + } if (info.isNode()) { info.getNode().onSet(value); } else { diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundInvoke.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundInvoke.java index 6ada81b5..f020c41d 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundInvoke.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundInvoke.java @@ -16,6 +16,7 @@ */ class DS2InboundInvoke extends DSInboundInvoke implements MessageConstants { + private int seqId = 0; MultipartWriter multipart; DS2InboundInvoke(DSMap parameters, DSPermission permission) { @@ -34,6 +35,8 @@ public void write(MessageWriter writer) { int ack = getSession().getNextAck(); out.init(getRequestId(), ack); out.setMethod(MSG_INVOKE_RES); + out.addIntHeader(HDR_SEQ_ID, seqId); + seqId++; super.write(writer); if (out.requiresMultipart()) { multipart = out.makeMultipart(); diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundList.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundList.java index 9bc054e3..7323c497 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundList.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundList.java @@ -18,7 +18,8 @@ */ class DS2InboundList extends DSInboundList implements MessageConstants { - MultipartWriter multipart; + private MultipartWriter multipart; + private int seqId = 0; @Override protected void beginMessage(MessageWriter writer) { @@ -122,6 +123,8 @@ public void write(MessageWriter writer) { int ack = getSession().getNextAck(); out.init(getRequestId(), ack); out.setMethod(MSG_LIST_RES); + out.addIntHeader(HDR_SEQ_ID,seqId); + seqId++; super.write(writer); if (out.requiresMultipart()) { multipart = out.makeMultipart(); diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundSubscription.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundSubscription.java index d8bee2ae..adb26490 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundSubscription.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundSubscription.java @@ -17,30 +17,22 @@ */ public class DS2InboundSubscription extends DSInboundSubscription implements MessageConstants { - /////////////////////////////////////////////////////////////////////////// - // Fields - /////////////////////////////////////////////////////////////////////////// - - /////////////////////////////////////////////////////////////////////////// - // Constructors - /////////////////////////////////////////////////////////////////////////// + private int seqId = 0; protected DS2InboundSubscription(DSInboundSubscriptions manager, Integer sid, String path, int qos) { super(manager, sid, path, qos); } - /////////////////////////////////////////////////////////////////////////// - // Methods in alphabetical order - /////////////////////////////////////////////////////////////////////////// - @Override protected void write(Update update, MessageWriter writer, StringBuilder buf) { - DS2MessageWriter messageWriter = (DS2MessageWriter) writer; - messageWriter.init(getSubscriptionId(), getSession().getNextAck()); - messageWriter.setMethod(MSG_SUBSCRIBE_RES); - DSIWriter dsiWriter = messageWriter.getWriter(); - DSByteBuffer byteBuffer = messageWriter.getBody(); + DS2MessageWriter out = (DS2MessageWriter) writer; + out.init(getSubscriptionId(), getSession().getNextAck()); + out.setMethod(MSG_SUBSCRIBE_RES); + out.addIntHeader(HDR_SEQ_ID, seqId); + seqId++; + DSIWriter dsiWriter = out.getWriter(); + DSByteBuffer byteBuffer = out.getBody(); byteBuffer.skip(2); int start = byteBuffer.length(); dsiWriter.beginMap(); @@ -55,7 +47,7 @@ protected void write(Update update, MessageWriter writer, StringBuilder buf) { byteBuffer.replaceShort(start - 2, (short) (end - start), false); dsiWriter.reset(); dsiWriter.value(update.value.toElement()); - messageWriter.write((DSBinaryTransport) getResponder().getTransport()); + out.write((DSBinaryTransport) getResponder().getTransport()); } } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2Responder.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2Responder.java index 441ab116..075206bd 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2Responder.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2Responder.java @@ -119,7 +119,7 @@ private void processInvoke(DS2MessageReader msg) { if (msg.getBodyLength() > 0) { params = msg.getBodyReader().getMap(); } - DSPermission perm = DSPermission.READ; + DSPermission perm = DSPermission.CONFIG; Object obj = msg.getHeader(HDR_MAX_PERMISSION); if (obj != null) { perm = DSPermission.valueOf(obj.hashCode()); @@ -157,7 +157,7 @@ private void processList(DS2MessageReader msg) { */ private void processSet(DS2MessageReader msg) { int rid = msg.getRequestId(); - DSPermission perm = DSPermission.READ; + DSPermission perm = DSPermission.CONFIG; Object obj = msg.getHeader(HDR_MAX_PERMISSION); if (obj != null) { perm = DSPermission.valueOf(obj.hashCode()); @@ -186,7 +186,6 @@ private void processSet(DS2MessageReader msg) { */ private void processSubscribe(DS2MessageReader msg) { Integer sid = msg.getRequestId(); - //todo if no stream String path = (String) msg.getHeader(HDR_TARGET_PATH); Number qos = (Number) msg.getHeader(HDR_QOS); if (qos == null) { diff --git a/dslink-core/src/main/java/org/iot/dsa/io/NodeEncoder.java b/dslink-core/src/main/java/org/iot/dsa/io/NodeEncoder.java index 3c756ba2..71b47e76 100644 --- a/dslink-core/src/main/java/org/iot/dsa/io/NodeEncoder.java +++ b/dslink-core/src/main/java/org/iot/dsa/io/NodeEncoder.java @@ -122,7 +122,7 @@ void writeChildren(DSNode arg) { writeObject(info); } } catch (IndexOutOfBoundsException x) { - //TODO log a fine - modified during save which is okay. + arg.trace(arg.getPath(), x); } info = info.next(); } diff --git a/dslink-core/src/main/java/org/iot/dsa/node/action/DSAction.java b/dslink-core/src/main/java/org/iot/dsa/node/action/DSAction.java index 0f2c5112..6f0e15e4 100644 --- a/dslink-core/src/main/java/org/iot/dsa/node/action/DSAction.java +++ b/dslink-core/src/main/java/org/iot/dsa/node/action/DSAction.java @@ -15,6 +15,11 @@ /** * Fully describes an action and routes invocations to DSNode.onInvoke. * + * Permissions + * Permissions are determined using info flags. If the admin flag is set, + * the action requires admin level permissions. If the action is readonly, then only read + * permissions are required. Otherwise the action will require write permissions. + * * @author Aaron Hansen * @see org.iot.dsa.node.DSNode#onInvoke(DSInfo, ActionInvocation) */ @@ -53,7 +58,7 @@ public DSAction copy() { /** * A convenience which calls addParameter with the same arguments, and also sets the metadata - * for default value. + * for the default value. * * @param name Must not be null. * @param value Must not be null. @@ -69,7 +74,7 @@ public DSMetadata addDefaultParameter(String name, DSIValue value, String descri /** * Fully describes a parameter for method invocation. At the very least, the map should have a - * unique name and a value type, use the metadata utility class to build the map. + * unique name and a value type. You should use the metadata utility class to build the map. * * @return This. * @see DSMetadata @@ -202,9 +207,14 @@ public Iterator getParameters() { return null; } + /** + * Not used. Permissions are determined using info flags. If the admin flag is set, + * the action requires admin level permissions. If the action is readonly, then only read + * permissions are required. Otherwise the action will require write permissions. + */ @Override public DSPermission getPermission() { - return permission; + return DSPermission.WRITE; } @Override @@ -261,17 +271,6 @@ public boolean isNull() { public void prepareParameter(DSInfo info, DSMap parameter) { } - /** - * Returns this, it is not necessary to set the permission to read. - */ - public DSAction setPermission(DSPermission permission) { - if (this == DEFAULT) { - throw new IllegalStateException("Cannot modify the default action."); - } - this.permission = permission; - return this; - } - /** * Returns this, it is not necessary to set the result to void. */ diff --git a/dslink-core/src/main/java/org/iot/dsa/security/DSPermission.java b/dslink-core/src/main/java/org/iot/dsa/security/DSPermission.java index ab19a118..b3f449c7 100644 --- a/dslink-core/src/main/java/org/iot/dsa/security/DSPermission.java +++ b/dslink-core/src/main/java/org/iot/dsa/security/DSPermission.java @@ -29,6 +29,13 @@ public boolean isConfig() { return this == CONFIG; } + /** + * True if this level is higher than the given. + */ + public boolean isGreaterThan(DSPermission arg) { + return level >= arg.getLevel(); + } + public boolean isRead() { return this == READ; } diff --git a/dslink-core/src/test/java/org/iot/dsa/dslink/RequesterSubscribeTest.java b/dslink-core/src/test/java/org/iot/dsa/dslink/RequesterSubscribeTest.java index 913762ad..96f50c81 100644 --- a/dslink-core/src/test/java/org/iot/dsa/dslink/RequesterSubscribeTest.java +++ b/dslink-core/src/test/java/org/iot/dsa/dslink/RequesterSubscribeTest.java @@ -3,6 +3,7 @@ import com.acuity.iot.dsa.dslink.test.TestLink; import org.iot.dsa.dslink.requester.AbstractSubscribeHandler; import org.iot.dsa.dslink.requester.ErrorType; +import org.iot.dsa.dslink.requester.SimpleRequestHandler; import org.iot.dsa.node.DSElement; import org.iot.dsa.node.DSInfo; import org.iot.dsa.node.DSInt; @@ -31,9 +32,6 @@ public class RequesterSubscribeTest implements DSLinkConnection.Listener { public void onConnect(DSLinkConnection connection) { DSIRequester requester = link.getConnection().getRequester(); success = !root.isSubscribed(); - synchronized (RequesterSubscribeTest.this) { - RequesterSubscribeTest.this.notify(); - } //todo handler = (AbstractSubscribeHandler) requester.subscribe( "/main/int", 0, new AbstractSubscribeHandler() { boolean first = true; @@ -75,11 +73,6 @@ public void theTest() throws Exception { link.getConnection().addListener(this); Thread t = new Thread(link, "DSLink Runner"); t.start(); - synchronized (this) { //todo - this.wait(5000); - } - Assert.assertTrue(success); //todo - /* Assert.assertFalse(root.isSubscribed()); Assert.assertFalse(success); //Wait for onConnected to subscribe and receive the first update value of 0 @@ -107,8 +100,6 @@ public void theTest() throws Exception { //Validate that the root was unsubscribed Assert.assertFalse(root.isSubscribed()); //Subscribe a lower value, validate onSubscribe. - */ - DSIRequester requester = link.getConnection().getRequester();//todo ANode node = (ANode) root.getNode("aNode"); testChild(requester); //Test the same path, but different instance. From 3b7f4241f5f14541356bf0125b48db4dbf6f2919 Mon Sep 17 00:00:00 2001 From: Aaron Date: Mon, 19 Mar 2018 16:21:55 -0700 Subject: [PATCH 4/4] Clean up on disconnection. --- build.gradle | 2 +- .../protocol/requester/DSOutboundStub.java | 46 ------------------- .../requester/DSOutboundSubscribeStubs.java | 19 ++++---- .../requester/DSOutboundSubscriptions.java | 11 ++++- .../protocol/requester/DSRequester.java | 17 +++++-- .../responder/DSInboundSubscriptions.java | 24 ++++++---- .../protocol/responder/DSResponder.java | 16 +++++++ .../protocol/v1/requester/DS1Requester.java | 6 +++ .../protocol/v1/responder/DS1Responder.java | 25 ++-------- .../dslink/protocol/v2/DS2LinkConnection.java | 37 ++++++++++++++- .../dsa/dslink/protocol/v2/DS2Session.java | 6 +++ .../protocol/v2/responder/DS2InboundList.java | 2 +- .../protocol/v2/responder/DS2Responder.java | 19 +++----- .../transport/BufferedBinaryTransport.java | 3 ++ .../transport/StreamBinaryTransport.java | 3 ++ .../java/org/iot/dsa/node/DSMetadata.java | 26 +++++------ .../org/iot/dsa/node/action/DSAction.java | 4 +- 17 files changed, 144 insertions(+), 122 deletions(-) diff --git a/build.gradle b/build.gradle index e78bbb31..1d3d3c99 100644 --- a/build.gradle +++ b/build.gradle @@ -2,7 +2,7 @@ apply plugin: 'java' apply plugin: 'maven' group 'org.iot.dsa' -version '0.19.0' +version '0.20.0' sourceCompatibility = 1.6 targetCompatibility = 1.6 diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundStub.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundStub.java index d5a97983..03ffe1d0 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundStub.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundStub.java @@ -5,7 +5,6 @@ import org.iot.dsa.dslink.requester.ErrorType; import org.iot.dsa.dslink.requester.OutboundRequestHandler; import org.iot.dsa.dslink.requester.OutboundStream; -import org.iot.dsa.node.DSElement; import org.iot.dsa.node.DSMap; /** @@ -79,51 +78,6 @@ public void handleClose() { getRequester().removeRequest(getRequestId()); } - /* - public void handleError(DSElement details) { - if (!open) { - return; - } - try { - ErrorType type = ErrorType.internalError; - String msg; - if (details.isMap()) { - String detail = null; - DSMap map = details.toMap(); - String tmp = map.getString("type"); - if (tmp.equals("permissionDenied")) { - type = ErrorType.permissionDenied; - } else if (tmp.equals("invalidRequest")) { - type = ErrorType.badRequest; - } else if (tmp.equals("invalidPath")) { - type = ErrorType.badRequest; - } else if (tmp.equals("notSupported")) { - type = ErrorType.notSupported; - } else { - type = ErrorType.internalError; - } - msg = map.getString("msg"); - detail = map.getString("detail"); - if (msg == null) { - msg = detail; - } - if (msg == null) { - msg = details.toString(); - } - } else { - type = ErrorType.internalError; - msg = details.toString(); - } - if (msg == null) { - msg = ""; - } - getHandler().onError(type, msg); - } catch (Exception x) { - getRequester().error(getRequester().getPath(), x); - } - } - */ - public void handleError(ErrorType type, String message) { if (!open) { return; diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscribeStubs.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscribeStubs.java index a5573efe..356d231b 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscribeStubs.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscribeStubs.java @@ -72,17 +72,6 @@ void add(DSOutboundSubscribeStub stub) { } } - public void close() { - //TODO who calls this and for what purpose - /* - DSOutboundSubscribeStub cur = first; - while (cur != null) { - cur.close(); - cur = cur.getNext(); - } - */ - } - private boolean contains(DSOutboundSubscribeStub stub) { if (stub == first) { return true; @@ -114,6 +103,14 @@ public boolean hasSid() { return sid != null; } + public void onDisconnect() { + DSOutboundSubscribeStub cur = first; + while (cur != null) { + cur.closeStream(); + cur = cur.getNext(); + } + } + /** * Null if the arg is the first in the list, last if stub is not contained. */ diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscriptions.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscriptions.java index db5b715c..ee472b9e 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscriptions.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscriptions.java @@ -4,7 +4,6 @@ import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter; import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage; import com.acuity.iot.dsa.dslink.protocol.requester.DSOutboundSubscribeStubs.State; -import com.acuity.iot.dsa.dslink.protocol.v2.DS2MessageWriter; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -133,6 +132,16 @@ public void onConnectFail() { } public void onDisconnect() { + for (DSOutboundSubscribeStubs stubs : pendingSubscribe) { + stubs.onDisconnect(); + } + pendingSubscribe.clear(); + pendingUnsubscribe.clear(); + for (DSOutboundSubscribeStubs stubs : pathMap.values()) { + stubs.onDisconnect(); + } + sidMap.clear(); + pathMap.clear(); } public void handleUpdate(int sid, String ts, String sts, DSElement value) { diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSRequester.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSRequester.java index c06b7eab..b6e9b057 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSRequester.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSRequester.java @@ -3,7 +3,9 @@ import com.acuity.iot.dsa.dslink.protocol.DSSession; import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage; import com.acuity.iot.dsa.dslink.transport.DSTransport; +import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.iot.dsa.dslink.DSIRequester; @@ -28,8 +30,7 @@ public abstract class DSRequester extends DSNode implements DSIRequester { private AtomicInteger nextRid = new AtomicInteger(); private DSSession session; - private Map requests = - new ConcurrentHashMap(); + private Map requests = new ConcurrentHashMap(); private DSOutboundSubscriptions subscriptions = makeSubscriptions(); /////////////////////////////////////////////////////////////////////////// @@ -120,7 +121,6 @@ protected DSOutboundSetStub makeSet(String path, DSIValue value, OutboundRequest public void onConnect() { subscriptions.onConnect(); - session.setRequesterAllowed(); } public void onConnectFail() { @@ -129,6 +129,17 @@ public void onConnectFail() { public void onDisconnect() { subscriptions.onDisconnect(); + Iterator> it = requests.entrySet().iterator(); + Map.Entry me; + while (it.hasNext()) { + me = it.next(); + try { + me.getValue().getHandler().onClose(); + } catch (Exception x) { + error(getPath(), x); + } + it.remove(); + } } @Override diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSubscriptions.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSubscriptions.java index 55ecf37a..a3b0967f 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSubscriptions.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundSubscriptions.java @@ -47,15 +47,6 @@ public DSInboundSubscriptions(DSResponder responder) { // Methods in alphabetical order /////////////////////////////////////////////////////////////////////////// - /** - * Unsubscribes all. - */ - public void close() { - for (Integer i : sidMap.keySet()) { - unsubscribe(i); - } - } - /** * Add to the outbound queue if not already enqueued. */ @@ -89,6 +80,21 @@ protected DSInboundSubscription makeSubscription(Integer sid, String path, int q return new DSInboundSubscription(this, sid, path, qos); } + public void onConnect() { + } + + public void onConnectFail() { + } + + /** + * Unsubscribes all. + */ + public void onDisconnect() { + for (Integer i : sidMap.keySet()) { + unsubscribe(i); + } + } + /** * Create or update a subscription. */ diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSResponder.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSResponder.java index d615720c..d2912a0c 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSResponder.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSResponder.java @@ -4,7 +4,9 @@ import com.acuity.iot.dsa.dslink.protocol.DSStream; import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage; import com.acuity.iot.dsa.dslink.transport.DSTransport; +import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import org.iot.dsa.dslink.DSLink; import org.iot.dsa.dslink.DSLinkConnection; @@ -65,6 +67,8 @@ public DSSession getSession() { return session; } + protected abstract DSInboundSubscriptions getSubscriptions(); + public DSTransport getTransport() { return getConnection().getTransport(); } @@ -83,6 +87,18 @@ public void onConnectFail() { } public void onDisconnect() { + Iterator> it = inboundRequests.entrySet().iterator(); + Map.Entry me; + while (it.hasNext()) { + me = it.next(); + try { + me.getValue().onClose(me.getKey()); + } catch (Exception x) { + error(getPath(), x); + } + it.remove(); + } + getSubscriptions().onDisconnect(); } protected DSStream putRequest(Integer rid, DSStream request) { diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v1/requester/DS1Requester.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v1/requester/DS1Requester.java index 355dbcb6..5272def5 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v1/requester/DS1Requester.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v1/requester/DS1Requester.java @@ -139,6 +139,12 @@ private void processUpdate(DSElement updateElement) { getSubscriptions().handleUpdate(sid, ts, sts, value); } + @Override + public void onConnect() { + super.onConnect(); + getSession().setRequesterAllowed(); + } + private void processUpdates(DSMap map) { DSList updates = map.getList("updates"); for (int i = 0; i < updates.size(); i++) { diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v1/responder/DS1Responder.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v1/responder/DS1Responder.java index 290225ae..4e916b24 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v1/responder/DS1Responder.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v1/responder/DS1Responder.java @@ -10,7 +10,6 @@ import com.acuity.iot.dsa.dslink.protocol.responder.DSInboundSubscriptions; import com.acuity.iot.dsa.dslink.protocol.responder.DSResponder; import com.acuity.iot.dsa.dslink.protocol.v1.CloseMessage; -import java.util.Map; import org.iot.dsa.DSRuntime; import org.iot.dsa.node.DSElement; import org.iot.dsa.node.DSList; @@ -53,6 +52,11 @@ private String getPath(DSMap req) { return path; } + @Override + protected DSInboundSubscriptions getSubscriptions() { + return subscriptions; + } + /** * Process an individual request. */ @@ -139,25 +143,6 @@ public void run() { } } - public void onConnect() { - } - - public void onConnectFail() { - } - - public void onDisconnect() { - debug(debug() ? "Close" : null); - subscriptions.close(); - for (Map.Entry entry : getRequests().entrySet()) { - try { - entry.getValue().onClose(entry.getKey()); - } catch (Exception x) { - debug(debug() ? "Close" : null, x); - } - } - getRequests().clear(); - } - /** * Handles an invoke request. */ diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2LinkConnection.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2LinkConnection.java index 39f4ed46..5f6a41a0 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2LinkConnection.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2LinkConnection.java @@ -1,5 +1,10 @@ package com.acuity.iot.dsa.dslink.protocol.v2; +import static com.acuity.iot.dsa.dslink.protocol.v2.MessageConstants.HDR_STATUS; +import static com.acuity.iot.dsa.dslink.protocol.v2.MessageConstants.STS_INITIALIZING; +import static com.acuity.iot.dsa.dslink.protocol.v2.MessageConstants.STS_INVALID_AUTH; +import static com.acuity.iot.dsa.dslink.protocol.v2.MessageConstants.STS_OK; + import com.acuity.iot.dsa.dslink.io.DSByteBuffer; import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport; import com.acuity.iot.dsa.dslink.transport.DSTransport; @@ -11,6 +16,7 @@ import org.iot.dsa.dslink.DSLink; import org.iot.dsa.dslink.DSLinkConfig; import org.iot.dsa.dslink.DSLinkConnection; +import org.iot.dsa.dslink.DSPermissionException; import org.iot.dsa.node.DSBool; import org.iot.dsa.node.DSBytes; import org.iot.dsa.node.DSInfo; @@ -161,6 +167,9 @@ protected void onConnect() { protected void onDisconnect() { session.onDisconnect(); put(STATUS, DSStatus.down); + transport.close(); + transport = null; + remove(TRANSPORT); } @Override @@ -207,7 +216,19 @@ private void recvF1() throws IOException { throw new IllegalStateException("Expecting handshake method 0xF1 not 0x" + Integer.toHexString(reader.getMethod())); } - //TODO check for header status + Byte status = (Byte) reader.getHeader(HDR_STATUS); + if (status != null) { + switch (status.intValue()) { + case STS_OK: + case STS_INITIALIZING: + break; + case STS_INVALID_AUTH: + throw new DSPermissionException("Invalid Auth"); + default: + throw new IllegalStateException("Unexpected status: 0x" + + DSBytes.toHex(status, null)); + } + } put(brokerDsId, DSString.valueOf(reader.readString(in))); byte[] tmp = new byte[65]; int len = in.read(tmp); @@ -231,7 +252,19 @@ private void recvF3() throws IOException { throw new IllegalStateException("Expecting handshake method 0xF3 not 0x" + Integer.toHexString(reader.getMethod())); } - //TODO check for header status + Byte status = (Byte) reader.getHeader(HDR_STATUS); + if (status != null) { + switch (status.intValue()) { + case STS_OK: + case STS_INITIALIZING: + break; + case STS_INVALID_AUTH: + throw new DSPermissionException("Invalid Auth"); + default: + throw new IllegalStateException("Unexpected status: 0x" + + DSBytes.toHex(status, null)); + } + } boolean allowed = in.read() == 1; put(requesterAllowed, DSBool.valueOf(allowed)); if (allowed) { diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Session.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Session.java index 3b426003..88d813af 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Session.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/DS2Session.java @@ -171,6 +171,9 @@ public void onConnect() { @Override public void onConnectFail() { super.onConnectFail(); + messageReader = null; + messageWriter = null; + multiparts.clear(); requester.onConnectFail(); responder.onConnectFail(); } @@ -178,6 +181,9 @@ public void onConnectFail() { @Override public void onDisconnect() { super.onDisconnect(); + messageReader = null; + messageWriter = null; + multiparts.clear(); requester.onDisconnect(); responder.onDisconnect(); } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundList.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundList.java index 7323c497..ae9c5e66 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundList.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2InboundList.java @@ -123,7 +123,7 @@ public void write(MessageWriter writer) { int ack = getSession().getNextAck(); out.init(getRequestId(), ack); out.setMethod(MSG_LIST_RES); - out.addIntHeader(HDR_SEQ_ID,seqId); + out.addIntHeader(HDR_SEQ_ID, seqId); seqId++; super.write(writer); if (out.requiresMultipart()) { diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2Responder.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2Responder.java index 075206bd..d7d6dbd0 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2Responder.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/responder/DS2Responder.java @@ -4,13 +4,13 @@ import com.acuity.iot.dsa.dslink.protocol.responder.DSInboundRequest; import com.acuity.iot.dsa.dslink.protocol.responder.DSInboundSet; import com.acuity.iot.dsa.dslink.protocol.responder.DSInboundSubscription; +import com.acuity.iot.dsa.dslink.protocol.responder.DSInboundSubscriptions; import com.acuity.iot.dsa.dslink.protocol.responder.DSResponder; import com.acuity.iot.dsa.dslink.protocol.v2.CloseMessage; import com.acuity.iot.dsa.dslink.protocol.v2.DS2MessageReader; import com.acuity.iot.dsa.dslink.protocol.v2.DS2Session; import com.acuity.iot.dsa.dslink.protocol.v2.MessageConstants; import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport; -import java.util.Map; import org.iot.dsa.DSRuntime; import org.iot.dsa.node.DSBytes; import org.iot.dsa.node.DSElement; @@ -43,6 +43,11 @@ public DS2Responder(DS2Session session) { // Methods - In alphabetical order by method name. ///////////////////////////////////////////////////////////////// + @Override + protected DSInboundSubscriptions getSubscriptions() { + return subscriptions; + } + public DSBinaryTransport getTransport() { return (DSBinaryTransport) getConnection().getTransport(); } @@ -85,18 +90,6 @@ public void onConnect() { public void onConnectFail() { } - public void onDisconnect() { - subscriptions.close(); - for (Map.Entry entry : getRequests().entrySet()) { - try { - entry.getValue().onClose(entry.getKey()); - } catch (Exception x) { - error(getPath(), x); - } - } - getRequests().clear(); - } - /** * Handles an invoke request. */ diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/BufferedBinaryTransport.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/BufferedBinaryTransport.java index aebbefed..bf39e6b0 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/BufferedBinaryTransport.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/BufferedBinaryTransport.java @@ -64,6 +64,9 @@ protected void close(Throwable reason) { @Override public DSTransport close() { synchronized (this) { + if (!open) { + return this; + } open = false; notifyAll(); } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/StreamBinaryTransport.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/StreamBinaryTransport.java index 3a65462b..c36efec8 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/StreamBinaryTransport.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/transport/StreamBinaryTransport.java @@ -64,6 +64,9 @@ protected final void close(Throwable reason) { @Override public final DSTransport close() { synchronized (this) { + if (!open) { + return this; + } open = false; notifyAll(); } diff --git a/dslink-core/src/main/java/org/iot/dsa/node/DSMetadata.java b/dslink-core/src/main/java/org/iot/dsa/node/DSMetadata.java index f0eb492c..7270fb64 100644 --- a/dslink-core/src/main/java/org/iot/dsa/node/DSMetadata.java +++ b/dslink-core/src/main/java/org/iot/dsa/node/DSMetadata.java @@ -11,19 +11,19 @@ public class DSMetadata { // Constants /////////////////////////////////////////////////////////////////////////// - public static final String BOOLEAN_RANGE = "booleanRange"; - public static final String DESCRIPTION = "description"; - public static final String DECIMAL_PLACES = "decimalPlaces"; - public static final String DEFAULT = "default"; - public static final String DISPLAY_NAME = "displayName"; - public static final String EDITOR = "editor"; - public static final String ENUM_RANGE = "enumRange"; - public static final String NAME = "name"; - public static final String MAX_VALUE = "maxValue"; - public static final String MIN_VALUE = "minValue"; - public static final String PLACEHOLDER = "placeholder"; - public static final String TYPE = "type"; - public static final String UNIT = "unit"; + public static final String BOOLEAN_RANGE = "$booleanRange"; + public static final String DESCRIPTION = "$description"; + public static final String DECIMAL_PLACES = "$decimalPlaces"; + public static final String DEFAULT = "$default"; + public static final String DISPLAY_NAME = "$displayName"; + public static final String EDITOR = "$editor"; + public static final String ENUM_RANGE = "$enumRange"; + public static final String NAME = "$name"; + public static final String MAX_VALUE = "$maxValue"; + public static final String MIN_VALUE = "$minValue"; + public static final String PLACEHOLDER = "$placeholder"; + public static final String TYPE = "$type"; + public static final String UNIT = "$unit"; //public static final String EDITOR_DATE = "date"; //public static final String EDITOR_DATE_RANGE = "daterange"; diff --git a/dslink-core/src/main/java/org/iot/dsa/node/action/DSAction.java b/dslink-core/src/main/java/org/iot/dsa/node/action/DSAction.java index 6f0e15e4..c312b777 100644 --- a/dslink-core/src/main/java/org/iot/dsa/node/action/DSAction.java +++ b/dslink-core/src/main/java/org/iot/dsa/node/action/DSAction.java @@ -289,7 +289,7 @@ private void validate(DSMap params, List existing) { if (params.isEmpty()) { throw new IllegalArgumentException("Empty metadata"); } - String name = params.getString("name"); + String name = params.getString(DSMetadata.NAME); if ((name == null) || name.isEmpty()) { throw new IllegalArgumentException("Missing name"); } @@ -300,7 +300,7 @@ private void validate(DSMap params, List existing) { return; } for (DSMap param : existing) { - if (name.equals(param.getString("name"))) { + if (name.equals(param.getString(DSMetadata.NAME))) { throw new IllegalArgumentException("Duplicate name: " + name); } }