diff --git a/build.gradle b/build.gradle index b89b43f9..b68f028f 100644 --- a/build.gradle +++ b/build.gradle @@ -2,7 +2,7 @@ apply plugin: 'java' apply plugin: 'maven' group 'org.iot.dsa' -version '0.15.0' +version '0.16.0' sourceCompatibility = 1.6 targetCompatibility = 1.6 diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/DSProtocolException.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/DSProtocolException.java index 70b4d5e5..e9f5d859 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/DSProtocolException.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/DSProtocolException.java @@ -43,6 +43,14 @@ public String getType() { return type; } + /** + * Optional. + */ + public DSProtocolException setDetail(String arg) { + detail = arg; + return this; + } + /** * Optional. */ diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/DSSession.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/DSSession.java index df6d7990..9e28fb00 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/DSSession.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/DSSession.java @@ -29,6 +29,17 @@ public abstract class DSSession extends DSNode { private List outgoingResponses = new LinkedList(); protected boolean requesterAllowed = false; + /////////////////////////////////////////////////////////////////////////// + // Constructors + /////////////////////////////////////////////////////////////////////////// + + public DSSession() { + } + + public DSSession(DSLinkConnection connection) { + this.connection = connection; + } + /////////////////////////////////////////////////////////////////////////// // Methods /////////////////////////////////////////////////////////////////////////// @@ -202,6 +213,8 @@ public void setRequesterAllowed() { requesterAllowed = true; } + public abstract boolean shouldEndMessage(); + /** * Called by the connection, this manages the running state and calls doRun for the specific * implementation. A separate thread is spun off to manage writing. @@ -221,14 +234,6 @@ public void run() { } } - /** - * For use by the connection object. - */ - public DSSession setConnection(DSLinkConnection connection) { - this.connection = connection; - return this; - } - /////////////////////////////////////////////////////////////////////////// // Inner Classes /////////////////////////////////////////////////////////////////////////// diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/BaseMessage.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/BaseMessage.java index 5e612b82..8fa9ec59 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/BaseMessage.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/BaseMessage.java @@ -97,7 +97,8 @@ public BaseMessage setStream(String arg) { * write the key value pairs that have been configured on this object, but does not close the * response map. */ - public void write(DSIWriter writer) { + public void write(MessageWriter out) { + DSIWriter writer = out.getWriter(); writer.beginMap(); if (rid >= 0) { writer.key("rid").value(rid); diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/CloseMessage.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/CloseMessage.java index 536474b4..48d679dc 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/CloseMessage.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/CloseMessage.java @@ -36,9 +36,9 @@ public CloseMessage(Integer rid) { * Calls the super implementation then writes the error object and closes the entire response * object. */ - public void write(DSIWriter writer) { + public void write(MessageWriter writer) { super.write(writer); - writer.endMap(); + writer.getWriter().endMap(); } ///////////////////////////////////////////////////////////////// diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/ErrorResponse.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/ErrorResponse.java index adfeaa38..33689f00 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/ErrorResponse.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/ErrorResponse.java @@ -52,10 +52,10 @@ public ErrorResponse parse(Throwable reason) { reason = tmp; } } + msg = reason.getMessage(); if (reason instanceof DSProtocolException) { DSProtocolException x = (DSProtocolException) reason; detail = x.getDetail(); - msg = x.getMessage(); phase = x.getPhase(); type = x.getType(); } else if (reason instanceof DSRequestException) { @@ -117,36 +117,25 @@ public ErrorResponse setType(String arg) { * Calls the super implementation then writes the error object and closes the entire response * object. */ - public void write(DSIWriter writer) { + public void write(MessageWriter writer) { super.write(writer); - writer.key("error").beginMap(); + DSIWriter out = writer.getWriter(); + out.key("error").beginMap(); if (type != null) { - writer.key("type").value(type); + out.key("type").value(type); } if (phase == Phase.RESPONSE) { - writer.key("phase").value("response"); + out.key("phase").value("response"); } if (msg != null) { - writer.key("msg").value(msg); + out.key("msg").value(msg); } if (detail != null) { - writer.key("detail").value(detail); + out.key("detail").value(detail); } - writer.endMap().endMap(); + out.endMap().endMap(); } - ///////////////////////////////////////////////////////////////// - // Methods - Protected and in alphabetical order by method name. - ///////////////////////////////////////////////////////////////// - - ///////////////////////////////////////////////////////////////// - // Methods - Package and in alphabetical order by method name. - ///////////////////////////////////////////////////////////////// - - ///////////////////////////////////////////////////////////////// - // Methods - Private and in alphabetical order by method name. - ///////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////// // Inner Classes - in alphabetical order by class name. ///////////////////////////////////////////////////////////////// @@ -156,12 +145,4 @@ public enum Phase { RESPONSE } - ///////////////////////////////////////////////////////////////// - // Facets - in alphabetical order by field name. - ///////////////////////////////////////////////////////////////// - - ///////////////////////////////////////////////////////////////// - // Initialization - ///////////////////////////////////////////////////////////////// - } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/MessageWriter.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/MessageWriter.java new file mode 100644 index 00000000..63f413a9 --- /dev/null +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/MessageWriter.java @@ -0,0 +1,16 @@ +package com.acuity.iot.dsa.dslink.protocol.message; + +import org.iot.dsa.io.DSIWriter; + +/** + * Used to read a DSA 2.n message (header and body). Call init(InputStream) to start a new message, + * can be reused for multiple messages. Not thread safe, the intent is messages will be constructed + * and read serially. + * + * @author Aaron Hansen + */ +public interface MessageWriter { + + public DSIWriter getWriter(); + +} diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/OutboundMessage.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/OutboundMessage.java index 42f1802b..4b59fbcc 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/OutboundMessage.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/OutboundMessage.java @@ -1,7 +1,5 @@ package com.acuity.iot.dsa.dslink.protocol.message; -import org.iot.dsa.io.DSIWriter; - /** * @author Aaron Hansen */ @@ -10,6 +8,6 @@ public interface OutboundMessage { /** * Write the full request or response message object. */ - public void write(DSIWriter writer); + public void write(MessageWriter writer); } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/RequestPath.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/RequestPath.java similarity index 86% rename from dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/RequestPath.java rename to dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/RequestPath.java index 6bc28e19..62f302d7 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/RequestPath.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/message/RequestPath.java @@ -1,4 +1,4 @@ -package com.acuity.iot.dsa.dslink.protocol.protocol_v1.responder; +package com.acuity.iot.dsa.dslink.protocol.message; import org.iot.dsa.dslink.DSIResponder; import org.iot.dsa.dslink.DSInvalidPathException; @@ -12,7 +12,7 @@ * * @author Aaron Hansen */ -class RequestPath { +public class RequestPath { /////////////////////////////////////////////////////////////////////////// // Fields @@ -29,7 +29,7 @@ class RequestPath { // Constructors /////////////////////////////////////////////////////////////////////////// - RequestPath(String path, DSNode root) { + public RequestPath(String path, DSNode root) { this.path = path; this.root = root; names = DSPath.decodePath(path); @@ -40,9 +40,9 @@ class RequestPath { /////////////////////////////////////////////////////////////////////////// /** - * The info of the target in the parent. + * The info of the target in it's parent. */ - DSInfo getInfo() { + public DSInfo getInfo() { if (target == null) { getTarget(); } @@ -52,7 +52,7 @@ DSInfo getInfo() { /** * The parent of the target unless the request was for / */ - DSNode getParent() { + public DSNode getParent() { if (parent == null) { getTarget(); } @@ -60,13 +60,14 @@ DSNode getParent() { } /** - * If the target is a responder, this is path to send to it. + * If the target is a responder, this is path it should use, not the original path of the + * request. */ - String getPath() { + public String getPath() { return path; } - DSIObject getTarget() { + public DSIObject getTarget() { if (parent == null) { parent = root.getParent(); target = root; @@ -105,7 +106,7 @@ DSIObject getTarget() { return target; } - boolean isResponder() { + public boolean isResponder() { return target instanceof DSIResponder; } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/DS1LinkConnection.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/DS1LinkConnection.java index b557c1e7..15a73dba 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/DS1LinkConnection.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/DS1LinkConnection.java @@ -160,9 +160,8 @@ protected void onInitialize() { makeTransport(init); put(TRANSPORT, getTransport()).setTransient(true); if (session == null) { - session = new DS1Session(); + session = new DS1Session(this); put(SESSION, session).setTransient(true); - session.setConnection(this); } connectionInit = init; } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/DS1Session.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/DS1Session.java index 422be3ed..aaff50c1 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/DS1Session.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/DS1Session.java @@ -8,6 +8,7 @@ import com.acuity.iot.dsa.dslink.DSProtocolException; import com.acuity.iot.dsa.dslink.DSSession; +import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter; import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage; import com.acuity.iot.dsa.dslink.protocol.protocol_v1.requester.DS1Requester; import com.acuity.iot.dsa.dslink.protocol.protocol_v1.responder.DS1Responder; @@ -46,12 +47,24 @@ public class DS1Session extends DSSession { private DSInfo lastAckRecv = getInfo(LAST_ACK_RECV); private DSInfo lastAckSent = getInfo(LAST_ACK_SENT); private long lastMessageSent; + private MessageWriter messageWriter; private int nextAck = -1; private int nextMsg = 1; private boolean requestsNext = false; private DS1Requester requester = new DS1Requester(this); private DS1Responder responder = new DS1Responder(this); + ///////////////////////////////////////////////////////////////// + // Constructors + ///////////////////////////////////////////////////////////////// + + public DS1Session() { + } + + public DS1Session(DS1LinkConnection connection) { + super(connection); + } + ///////////////////////////////////////////////////////////////// // Methods ///////////////////////////////////////////////////////////////// @@ -178,6 +191,13 @@ public DS1LinkConnection getConnection() { return (DS1LinkConnection) super.getConnection(); } + private MessageWriter getMessageWriter() { + if (messageWriter == null) { + messageWriter = new MyMessageWriter(getConnection().getWriter()); + } + return messageWriter; + } + public DSIReader getReader() { return getConnection().getReader(); } @@ -187,8 +207,8 @@ public DSIRequester getRequester() { return requester; } - public DSIWriter getWriter() { - return getConnection().getWriter(); + private DSIWriter getWriter() { + return getMessageWriter().getWriter(); } private boolean hasPingToSend() { @@ -390,7 +410,7 @@ public boolean shouldEndMessage() { * @see #endRequests() */ public void writeRequest(OutboundMessage message) { - message.write(getWriter()); + message.write(getMessageWriter()); } /** @@ -401,7 +421,26 @@ public void writeRequest(OutboundMessage message) { * @see #endResponses() */ public void writeResponse(OutboundMessage message) { - message.write(getWriter()); + message.write(getMessageWriter()); + } + + + ///////////////////////////////////////////////////////////////// + // Inner Classes + ///////////////////////////////////////////////////////////////// + + private class MyMessageWriter implements MessageWriter { + + DSIWriter writer; + + MyMessageWriter(DSIWriter writer) { + this.writer = writer; + } + + @Override + public DSIWriter getWriter() { + return writer; + } } } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundInvokeStub.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundInvokeStub.java index 82a7c8e3..2c82bd61 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundInvokeStub.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundInvokeStub.java @@ -1,5 +1,6 @@ package com.acuity.iot.dsa.dslink.protocol.protocol_v1.requester; +import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter; import org.iot.dsa.dslink.requester.OutboundInvokeHandler; import org.iot.dsa.dslink.requester.OutboundInvokeHandler.Mode; import org.iot.dsa.io.DSIWriter; @@ -98,7 +99,8 @@ protected void handleResponse(DSMap response) { } @Override - public void write(DSIWriter out) { + public void write(MessageWriter writer) { + DSIWriter out = writer.getWriter(); out.beginMap(); out.key("rid").value(getRequestId()); out.key("method").value("invoke"); diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundListStub.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundListStub.java index 7e084656..ef211fff 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundListStub.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundListStub.java @@ -1,5 +1,6 @@ package com.acuity.iot.dsa.dslink.protocol.protocol_v1.requester; +import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter; import org.iot.dsa.dslink.DSRequestException; import org.iot.dsa.dslink.requester.OutboundListHandler; import org.iot.dsa.io.DSIWriter; @@ -77,7 +78,8 @@ protected void handleResponse(DSMap response) { } @Override - public void write(DSIWriter out) { + public void write(MessageWriter writer) { + DSIWriter out = writer.getWriter(); out.beginMap(); out.key("rid").value(getRequestId()); out.key("method").value("list"); diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundRemoveStub.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundRemoveStub.java index 533e225c..fa889c79 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundRemoveStub.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundRemoveStub.java @@ -1,5 +1,6 @@ package com.acuity.iot.dsa.dslink.protocol.protocol_v1.requester; +import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter; import org.iot.dsa.dslink.requester.OutboundRequestHandler; import org.iot.dsa.io.DSIWriter; import org.iot.dsa.node.DSMap; @@ -43,7 +44,8 @@ protected void handleResponse(DSMap map) { } @Override - public void write(DSIWriter out) { + public void write(MessageWriter writer) { + DSIWriter out = writer.getWriter(); out.beginMap(); out.key("rid").value(getRequestId()); out.key("method").value("remove"); diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundSetStub.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundSetStub.java index 6b5ca980..c291c905 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundSetStub.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundSetStub.java @@ -1,5 +1,6 @@ package com.acuity.iot.dsa.dslink.protocol.protocol_v1.requester; +import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter; import org.iot.dsa.dslink.requester.OutboundRequestHandler; import org.iot.dsa.io.DSIWriter; import org.iot.dsa.node.DSIValue; @@ -46,7 +47,8 @@ protected void handleResponse(DSMap map) { } @Override - public void write(DSIWriter out) { + public void write(MessageWriter writer) { + DSIWriter out = writer.getWriter(); out.beginMap(); out.key("rid").value(getRequestId()); out.key("method").value("set"); diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundSubscriptions.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundSubscriptions.java index 89e2273d..b43625ce 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundSubscriptions.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/requester/DS1OutboundSubscriptions.java @@ -1,5 +1,6 @@ package com.acuity.iot.dsa.dslink.protocol.protocol_v1.requester; +import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter; import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage; import com.acuity.iot.dsa.dslink.protocol.protocol_v1.DS1Session; import com.acuity.iot.dsa.dslink.protocol.protocol_v1.requester.DS1OutboundSubscribeStubs.State; @@ -202,7 +203,8 @@ void unsubscribe(DS1OutboundSubscribeStubs stubs) { } @Override - public void write(DSIWriter out) { + public void write(MessageWriter writer) { + DSIWriter out = writer.getWriter(); DS1Session session = requester.getSession(); if (!pendingSubscribe.isEmpty()) { out.beginMap(); diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundInvoke.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundInvoke.java index 6e969d21..7f75b4a6 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundInvoke.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundInvoke.java @@ -2,7 +2,9 @@ import com.acuity.iot.dsa.dslink.protocol.DSStream; import com.acuity.iot.dsa.dslink.protocol.message.ErrorResponse; +import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter; import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage; +import com.acuity.iot.dsa.dslink.protocol.message.RequestPath; import java.util.Iterator; import org.iot.dsa.DSRuntime; import org.iot.dsa.dslink.DSIResponder; @@ -113,7 +115,7 @@ private synchronized Update dequeueUpdate() { */ private void doClose() { state = STATE_CLOSED; - getResponder().removeInboundRequest(getRequestId()); + getResponder().removeRequest(getRequestId()); if (result == null) { return; } @@ -250,7 +252,8 @@ public void send(DSList row) { } @Override - public void write(DSIWriter out) { + public void write(MessageWriter writer) { + DSIWriter out = writer.getWriter(); enqueued = false; if (isClosed()) { return; @@ -258,7 +261,7 @@ public void write(DSIWriter out) { if (isClosePending() && (updateHead == null) && (closeReason != null)) { ErrorResponse res = new ErrorResponse(closeReason); res.parseRequest(getRequest()); - res.write(out); + res.write(writer); doClose(); return; } @@ -466,8 +469,4 @@ String typeValue() { } } - /////////////////////////////////////////////////////////////////////////// - // Initialization - /////////////////////////////////////////////////////////////////////////// - } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundList.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundList.java index e39e96d7..99bbb045 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundList.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundList.java @@ -2,7 +2,9 @@ import com.acuity.iot.dsa.dslink.protocol.DSStream; import com.acuity.iot.dsa.dslink.protocol.message.ErrorResponse; +import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter; import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage; +import com.acuity.iot.dsa.dslink.protocol.message.RequestPath; import java.util.Iterator; import org.iot.dsa.DSRuntime; import org.iot.dsa.dslink.DSIResponder; @@ -129,7 +131,7 @@ private synchronized Update dequeue() { private void doClose() { state = STATE_CLOSED; - getResponder().removeInboundRequest(getRequestId()); + getResponder().removeRequest(getRequestId()); if (response == null) { return; } @@ -564,7 +566,8 @@ public void onClose(Integer requestId) { } @Override - public void write(DSIWriter out) { + public void write(MessageWriter writer) { + DSIWriter out = writer.getWriter(); enqueued = false; if (isClosed()) { return; @@ -572,7 +575,7 @@ public void write(DSIWriter out) { if (isClosePending() && (updateHead == null) && (closeReason != null)) { ErrorResponse res = new ErrorResponse(closeReason); res.parseRequest(getRequest()); - res.write(out); + res.write(writer); doClose(); return; } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundRequest.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundRequest.java index dd371510..40bb5960 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundRequest.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundRequest.java @@ -1,6 +1,7 @@ package com.acuity.iot.dsa.dslink.protocol.protocol_v1.responder; import com.acuity.iot.dsa.dslink.protocol.protocol_v1.DS1Session; +import com.acuity.iot.dsa.dslink.protocol.responder.DSInboundRequest; import org.iot.dsa.dslink.DSLink; import org.iot.dsa.dslink.responder.InboundRequest; import org.iot.dsa.logging.DSLogger; @@ -11,82 +12,21 @@ * * @author Aaron Hansen */ -class DS1InboundRequest extends DSLogger implements InboundRequest { +class DS1InboundRequest extends DSInboundRequest implements InboundRequest { - /////////////////////////////////////////////////////////////////////////// - // Fields - /////////////////////////////////////////////////////////////////////////// - - private DSLink link; - private String path; private DSMap request; - private Integer requestId; - private DS1Responder responder; - private DS1Session session; - - /////////////////////////////////////////////////////////////////////////// - // Constructors - /////////////////////////////////////////////////////////////////////////// - - DS1InboundRequest() { - } - - /////////////////////////////////////////////////////////////////////////// - // Methods in alphabetical order - /////////////////////////////////////////////////////////////////////////// - - public DSLink getLink() { - return link; - } - - public String getPath() { - return path; - } public DSMap getRequest() { return request; } - public DS1Responder getResponder() { - return responder; - } - - public Integer getRequestId() { - return requestId; - } - - public DS1Session getSession() { - return session; - } - - public DS1InboundRequest setLink(DSLink link) { - this.link = link; - return this; - } - - public DS1InboundRequest setPath(String path) { - this.path = path; - return this; - } - - public DS1InboundRequest setSession(DS1Session session) { - this.session = session; - return this; - } - - public DS1InboundRequest setRequest(DSMap request) { + public DS1InboundRequest setRequest(DSMap request){ this.request = request; return this; } - public DS1InboundRequest setRequestId(Integer requestId) { - this.requestId = requestId; - return this; - } - - public DS1InboundRequest setResponder(DS1Responder responder) { - this.responder = responder; - return this; + public DS1Responder getResponder() { + return (DS1Responder) super.getResponder(); } } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundSet.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundSet.java index e3bd1a6b..164120fa 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundSet.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundSet.java @@ -2,6 +2,7 @@ import com.acuity.iot.dsa.dslink.protocol.message.CloseMessage; import com.acuity.iot.dsa.dslink.protocol.message.ErrorResponse; +import com.acuity.iot.dsa.dslink.protocol.message.RequestPath; import org.iot.dsa.dslink.DSIResponder; import org.iot.dsa.dslink.DSRequestException; import org.iot.dsa.dslink.responder.InboundSetRequest; diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundSubscription.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundSubscription.java index 52e12b91..6547566f 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundSubscription.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundSubscription.java @@ -1,5 +1,6 @@ package com.acuity.iot.dsa.dslink.protocol.protocol_v1.responder; +import com.acuity.iot.dsa.dslink.protocol.message.RequestPath; import java.util.logging.Level; import java.util.logging.Logger; import org.iot.dsa.dslink.DSIResponder; diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundSubscriptions.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundSubscriptions.java index b7c1c388..f537372a 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundSubscriptions.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1InboundSubscriptions.java @@ -1,5 +1,6 @@ package com.acuity.iot.dsa.dslink.protocol.protocol_v1.responder; +import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter; import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -121,7 +122,8 @@ void unsubscribe(Integer sid) { } @Override - public void write(DSIWriter out) { + public void write(MessageWriter writer) { + DSIWriter out = writer.getWriter(); out.beginMap(); out.key("rid").value(ZERO); out.key("updates").beginList(); @@ -145,12 +147,4 @@ public void write(DSIWriter out) { responder.sendResponse(this); } - /////////////////////////////////////////////////////////////////////////// - // Inner Classes - /////////////////////////////////////////////////////////////////////////// - - /////////////////////////////////////////////////////////////////////////// - // Initialization - /////////////////////////////////////////////////////////////////////////// - } diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1Responder.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1Responder.java index adc4fe0a..18dde7b6 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1Responder.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v1/responder/DS1Responder.java @@ -1,38 +1,28 @@ package com.acuity.iot.dsa.dslink.protocol.protocol_v1.responder; import com.acuity.iot.dsa.dslink.DSProtocolException; +import com.acuity.iot.dsa.dslink.DSSession; import com.acuity.iot.dsa.dslink.protocol.DSStream; import com.acuity.iot.dsa.dslink.protocol.message.CloseMessage; import com.acuity.iot.dsa.dslink.protocol.message.ErrorResponse; -import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage; -import com.acuity.iot.dsa.dslink.protocol.protocol_v1.DS1Session; +import com.acuity.iot.dsa.dslink.protocol.responder.DSResponder; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; -import java.util.logging.Logger; import org.iot.dsa.DSRuntime; -import org.iot.dsa.dslink.DSLink; -import org.iot.dsa.dslink.DSLinkConnection; import org.iot.dsa.node.DSList; import org.iot.dsa.node.DSMap; -import org.iot.dsa.node.DSNode; /** * Implements DSA 1.1.2 * * @author Aaron Hansen */ -public class DS1Responder extends DSNode { +public class DS1Responder extends DSResponder { /////////////////////////////////////////////////////////////////////////// // Fields /////////////////////////////////////////////////////////////////////////// - private ConcurrentHashMap inboundRequests = - new ConcurrentHashMap(); - private DSLink link; - private Logger logger; - private DS1Session session; private DS1InboundSubscriptions subscriptions = new DS1InboundSubscriptions(this); @@ -40,27 +30,14 @@ public class DS1Responder extends DSNode { // Methods - Constructors ///////////////////////////////////////////////////////////////// - public DS1Responder(DS1Session session) { - this.session = session; + public DS1Responder(DSSession session) { + super(session); } ///////////////////////////////////////////////////////////////// // Methods - In alphabetical order by method name. ///////////////////////////////////////////////////////////////// - public DSLinkConnection getConnection() { - return session.getConnection(); - } - - @Override - public Logger getLogger() { - if (logger == null) { - logger = Logger.getLogger( - getConnection().getLink().getLinkName() + ".responderSession"); - } - return logger; - } - /** * Will throw an exception if the request doesn't have the path. */ @@ -85,14 +62,14 @@ public void onConnectFail() { public void onDisconnect() { finer(finer() ? "Close" : null); subscriptions.close(); - for (Map.Entry entry : inboundRequests.entrySet()) { + for (Map.Entry entry : getRequests().entrySet()) { try { entry.getValue().onClose(entry.getKey()); } catch (Exception x) { finer(finer() ? "Close" : null, x); } } - inboundRequests.clear(); + getRequests().clear(); } /** @@ -100,12 +77,13 @@ public void onDisconnect() { */ private void processInvoke(Integer rid, DSMap req) { DS1InboundInvoke invokeImpl = new DS1InboundInvoke(req); - invokeImpl.setPath(getPath(req)) - .setSession(session) + invokeImpl.setRequest(req) + .setPath(getPath(req)) + .setSession(getSession()) .setRequestId(rid) - .setLink(link) + .setLink(getLink()) .setResponder(this); - inboundRequests.put(rid, invokeImpl); + putRequest(rid, invokeImpl); DSRuntime.run(invokeImpl); } @@ -114,13 +92,13 @@ private void processInvoke(Integer rid, DSMap req) { */ private void processList(Integer rid, DSMap req) { DS1InboundList listImpl = new DS1InboundList(); - listImpl.setPath(getPath(req)) - .setSession(session) - .setRequest(req) + listImpl.setRequest(req) + .setPath(getPath(req)) + .setSession(getSession()) .setRequestId(rid) - .setLink(link) + .setLink(getLink()) .setResponder(this); - inboundRequests.put(listImpl.getRequestId(), listImpl); + putRequest(listImpl.getRequestId(), listImpl); DSRuntime.run(listImpl); } @@ -128,12 +106,6 @@ private void processList(Integer rid, DSMap req) { * Process an individual request. */ public void processRequest(final Integer rid, final DSMap map) { - if (link == null) { - link = getConnection().getLink(); - } - if (link == null) { - throw new DSProtocolException("Not a responder"); - } String method = map.get("method", null); try { if ((method == null) || method.isEmpty()) { @@ -144,7 +116,7 @@ public void processRequest(final Integer rid, final DSMap map) { if (!method.equals("close")) { throwInvalidMethod(method, map); } - final DSStream req = inboundRequests.remove(rid); + final DSStream req = removeRequest(rid); if (req != null) { DSRuntime.run(new Runnable() { public void run() { @@ -225,10 +197,11 @@ public void run() { */ private void processSet(Integer rid, DSMap req) { DS1InboundSet setImpl = new DS1InboundSet(req); - setImpl.setPath(getPath(req)) - .setSession(session) + setImpl.setRequest(req) + .setPath(getPath(req)) + .setSession(getSession()) .setRequestId(rid) - .setLink(link) + .setLink(getLink()) .setResponder(this); DSRuntime.run(setImpl); } @@ -277,18 +250,6 @@ private void processUnsubscribe(int rid, DSMap req) { } } - void removeInboundRequest(Integer requestId) { - inboundRequests.remove(requestId); - } - - public boolean shouldEndMessage() { - return session.shouldEndMessage(); - } - - public void sendResponse(OutboundMessage res) { - session.enqueueOutgoingResponse(res); - } - /** * Used throughout processRequest. */ diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/DS2LinkConnection.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/DS2LinkConnection.java index 1a00002c..17fa95cd 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/DS2LinkConnection.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/DS2LinkConnection.java @@ -1,6 +1,5 @@ package com.acuity.iot.dsa.dslink.protocol.protocol_v2; -import com.acuity.iot.dsa.dslink.DSSession; import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport; import com.acuity.iot.dsa.dslink.transport.DSTransport; import com.acuity.iot.dsa.dslink.transport.SocketTransport; @@ -13,10 +12,6 @@ import org.iot.dsa.dslink.DSLink; import org.iot.dsa.dslink.DSLinkConfig; import org.iot.dsa.dslink.DSLinkConnection; -import org.iot.dsa.io.DSIReader; -import org.iot.dsa.io.DSIWriter; -import org.iot.dsa.io.msgpack.MsgpackReader; -import org.iot.dsa.io.msgpack.MsgpackWriter; import org.iot.dsa.node.DSBool; import org.iot.dsa.node.DSBytes; import org.iot.dsa.node.DSInfo; @@ -44,11 +39,12 @@ public class DS2LinkConnection extends DSLinkConnection { private static final String BROKER_PUB_KEY = "Broker Public Key"; private static final String BROKER_SALT = "Broker Salt"; private static final String BROKER_URI = "Broker URI"; - private static final String LINK_SALT = "Link Salt"; - private static final String REQUESTER_ALLOWED = "Requester Allowed"; private static final String LAST_CONNECT_OK = "Last Connect Ok"; private static final String LAST_CONNECT_FAIL = "Last Connect Fail"; + private static final String LINK_SALT = "Link Salt"; private static final String FAIL_CAUSE = "Fail Cause"; + private static final String REQUESTER_ALLOWED = "Requester Allowed"; + private static final String SESSION = "Status"; private static final String STATUS = "Status"; /////////////////////////////////////////////////////////////////////////// @@ -63,7 +59,7 @@ public class DS2LinkConnection extends DSLinkConnection { private DSInfo brokerUri = getInfo(BROKER_URI); private DSInfo linkSalt = getInfo(LINK_SALT); private DSInfo requesterAllowed = getInfo(REQUESTER_ALLOWED); - private DSSession session; + private DS2Session session; private DSBinaryTransport transport; /////////////////////////////////////////////////////////////////////////// @@ -159,7 +155,8 @@ protected void onDisconnect() { @Override protected void onInitialize() { if (session == null) { - session = null;//TODO + session = new DS2Session(this); + put(SESSION, session); } makeTransport(); } @@ -191,7 +188,7 @@ private void performHandshake() { private void recvF1() throws IOException { InputStream in = transport.getInput(); - MessageReader reader = new MessageReader(); + DS2MessageReader reader = new DS2MessageReader(); reader.init(in); if (reader.getMethod() != 0xf1) { throw new IllegalStateException("Expecting handshake method 0xF1 not 0x" + @@ -209,7 +206,7 @@ private void recvF1() throws IOException { private void recvF3() throws IOException { InputStream in = transport.getInput(); - MessageReader reader = new MessageReader(); + DS2MessageReader reader = new DS2MessageReader(); reader.init(in); if (reader.getMethod() != 0xf3) { throw new IllegalStateException("Expecting handshake method 0xF3 not 0x" + @@ -230,7 +227,7 @@ private void sendF0() { DSLink link = getLink(); String dsId = link.getDsId(); DSKeys dsKeys = link.getKeys(); - MessageWriter writer = new MessageWriter(); + DS2MessageWriter writer = new DS2MessageWriter(); writer.setMethod((byte) 0xf0); ByteBuffer buffer = writer.getBody(); buffer.put((byte) 2).put((byte) 0); //dsa version @@ -241,7 +238,7 @@ private void sendF0() { } private void sendF2() throws Exception { - MessageWriter writer = new MessageWriter(); + DS2MessageWriter writer = new DS2MessageWriter(); writer.setMethod((byte) 0xf2); ByteBuffer buffer = writer.getBody(); String token = getLink().getConfig().getToken(); diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/MessageReader.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/DS2MessageReader.java similarity index 93% rename from dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/MessageReader.java rename to dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/DS2MessageReader.java index ad960b9a..2b213106 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/MessageReader.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/DS2MessageReader.java @@ -16,10 +16,9 @@ * can be reused for multiple messages. Not thread safe, the intent is messages will be constructed * and read serially. * - * * @author Aaron Hansen - * 1 + * @author Aaron Hansen */ -public class MessageReader implements MessageConstants { +public class DS2MessageReader implements MessageConstants { // Fields // ------ @@ -37,7 +36,7 @@ public class MessageReader implements MessageConstants { // Constructors // ------------ - public MessageReader() { + public DS2MessageReader() { } // Methods @@ -72,6 +71,18 @@ private CharBuffer getCharBuffer(int size) { return charBuffer; } + public Object getHeader(Byte key) { + return headers.get(key); + } + + public Object getHeader(Byte key, Object def) { + Object ret = headers.get(key); + if (ret == null) { + ret = def; + } + return ret; + } + public Map getHeaders() { return headers; } @@ -107,7 +118,7 @@ private ByteBuffer getStringBuffer(int len) { return strBuffer; } - public MessageReader init(InputStream in) { + public DS2MessageReader init(InputStream in) { try { input = in; headers.clear(); diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/MessageWriter.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/DS2MessageWriter.java similarity index 93% rename from dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/MessageWriter.java rename to dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/DS2MessageWriter.java index 9ada9728..85f94436 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/MessageWriter.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/DS2MessageWriter.java @@ -15,7 +15,7 @@ * * @author Aaron Hansen */ -public class MessageWriter implements MessageConstants { +public class DS2MessageWriter implements MessageConstants { // Fields // ------ @@ -30,7 +30,7 @@ public class MessageWriter implements MessageConstants { // Constructors // ------------ - public MessageWriter() { + public DS2MessageWriter() { body = ByteBuffer.allocateDirect(MAX_HEADER); //do not change endian-ness of the body since most bodies will be big endian msgpack. header = ByteBuffer.allocateDirect(MAX_BODY); @@ -45,7 +45,7 @@ public MessageWriter() { /** * Encodes the key into the header buffer. */ - public MessageWriter addHeader(byte key) { + public DS2MessageWriter addHeader(byte key) { header.put(key); return this; } @@ -53,7 +53,7 @@ public MessageWriter addHeader(byte key) { /** * Encodes the key value pair into the header buffer. */ - public MessageWriter addHeader(byte key, byte value) { + public DS2MessageWriter addHeader(byte key, byte value) { header.put(key); header.put(value); return this; @@ -62,7 +62,7 @@ public MessageWriter addHeader(byte key, byte value) { /** * Encodes the key value pair into the header buffer. */ - public MessageWriter addHeader(byte key, int value) { + public DS2MessageWriter addHeader(byte key, int value) { header.put(key); header.putInt(value); return this; @@ -71,7 +71,7 @@ public MessageWriter addHeader(byte key, int value) { /** * Encodes the key value pair into the header buffer. */ - public MessageWriter addHeader(byte key, String value) { + public DS2MessageWriter addHeader(byte key, String value) { header.put(key); writeString(value, header); return this; @@ -152,7 +152,7 @@ public ByteBuffer getBody() { * @param requestId The request ID or -1 to omit. * @param ackId -1 to omit, but can only be -1 when the requestId is also -1. */ - public MessageWriter init(int requestId, int ackId) { + public DS2MessageWriter init(int requestId, int ackId) { body.clear(); header.clear(); header.position(7); @@ -165,7 +165,7 @@ public MessageWriter init(int requestId, int ackId) { return this; } - public MessageWriter setMethod(byte method) { + public DS2MessageWriter setMethod(byte method) { this.method = method; return this; } @@ -191,7 +191,7 @@ public byte[] toByteArray() { /** * Writes the message to the transport. */ - public MessageWriter write(DSBinaryTransport out) { + public DS2MessageWriter write(DSBinaryTransport out) { encodeHeaderLengths(); out.write(header, false); out.write(body, true); diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/DS2Session.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/DS2Session.java index af094f9e..af9df4ba 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/DS2Session.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/DS2Session.java @@ -34,13 +34,24 @@ public class DS2Session extends DSSession implements MessageConstants { private DSInfo lastAckRecv = getInfo(LAST_ACK_RECV); private DSInfo lastAckSent = getInfo(LAST_ACK_SENT); private long lastMessageSent; - private MessageReader messageReader; + private DS2MessageReader messageReader; private int nextAck = -1; private int nextMsg = 1; private boolean requestsNext = false; private DS1Requester requester;// = new DS1Requester(this); private DS2Responder responder = new DS2Responder(this); + ///////////////////////////////////////////////////////////////// + // Constructors + ///////////////////////////////////////////////////////////////// + + public DS2Session() { + } + + public DS2Session(DS2LinkConnection connection) { + super(connection); + } + ///////////////////////////////////////////////////////////////// // Methods ///////////////////////////////////////////////////////////////// @@ -54,18 +65,18 @@ protected void declareDefaults() { @Override protected void doRecvMessage() throws IOException { if (messageReader == null) { - messageReader = new MessageReader(); + messageReader = new DS2MessageReader(); } messageReader.init(getTransport().getInput()); - if (messageReader.isRequest()) { - responder.processRequest(messageReader); - } else if (messageReader.isResponse()) { - ;//requester.processResponse(messageReader); - } int ack = messageReader.getAckId(); if (ack > 0) { put(lastAckRecv, DSInt.valueOf(ack)); } + if (messageReader.isRequest()) { + responder.handleRequest(messageReader); + } else if (messageReader.isResponse()) { + ;//requester.processResponse(messageReader); + } } @Override diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/responder/DS2InboundList.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/responder/DS2InboundList.java new file mode 100644 index 00000000..47464244 --- /dev/null +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/responder/DS2InboundList.java @@ -0,0 +1,678 @@ +package com.acuity.iot.dsa.dslink.protocol.protocol_v2.responder; + +import com.acuity.iot.dsa.dslink.protocol.DSStream; +import com.acuity.iot.dsa.dslink.protocol.message.ErrorResponse; +import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter; +import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage; +import com.acuity.iot.dsa.dslink.protocol.message.RequestPath; +import java.util.Iterator; +import org.iot.dsa.DSRuntime; +import org.iot.dsa.dslink.DSIResponder; +import org.iot.dsa.dslink.responder.ApiObject; +import org.iot.dsa.dslink.responder.InboundListRequest; +import org.iot.dsa.dslink.responder.OutboundListResponse; +import org.iot.dsa.io.DSIWriter; +import org.iot.dsa.node.DSElement; +import org.iot.dsa.node.DSIValue; +import org.iot.dsa.node.DSInfo; +import org.iot.dsa.node.DSList; +import org.iot.dsa.node.DSMap; +import org.iot.dsa.node.DSMap.Entry; +import org.iot.dsa.node.DSMetadata; +import org.iot.dsa.node.DSNode; +import org.iot.dsa.node.DSPath; +import org.iot.dsa.node.action.ActionSpec; +import org.iot.dsa.node.action.DSAction; +import org.iot.dsa.node.event.DSIEvent; +import org.iot.dsa.node.event.DSISubscriber; +import org.iot.dsa.node.event.DSInfoTopic; +import org.iot.dsa.node.event.DSTopic; + +/** + * List implementation for a responder. + * + * @author Aaron Hansen + */ +class DS2InboundList extends DS2InboundRequest + implements DSISubscriber, DSStream, InboundListRequest, OutboundMessage, + OutboundListResponse, Runnable { + + /////////////////////////////////////////////////////////////////////////// + // Constants + /////////////////////////////////////////////////////////////////////////// + + private static final int STATE_INIT = 0; + private static final int STATE_CHILDREN = 1; + private static final int STATE_UPDATES = 2; + private static final int STATE_CLOSE_PENDING = 3; + private static final int STATE_CLOSED = 4; + + /////////////////////////////////////////////////////////////////////////// + // Fields + /////////////////////////////////////////////////////////////////////////// + + private StringBuilder cacheBuf = new StringBuilder(); + private DSMap cacheMap = new DSMap(); + private DSMetadata cacheMeta = new DSMetadata(cacheMap); + private Iterator children; + private Exception closeReason; + private DSInfo info; + private DSNode node; + private OutboundListResponse response; + private boolean enqueued = false; + private int state = STATE_INIT; + private Update updateHead; + private Update updateTail; + + /////////////////////////////////////////////////////////////////////////// + // Constructors + /////////////////////////////////////////////////////////////////////////// + + DS2InboundList() { + } + + /////////////////////////////////////////////////////////////////////////// + // Methods in alphabetical order + /////////////////////////////////////////////////////////////////////////// + + @Override + public void childAdded(ApiObject child) { + if (!isClosed()) { + enqueue(new Update(child, true)); + } + } + + @Override + public void childRemoved(ApiObject child) { + if (!isClosed()) { + enqueue(new Update(child, false)); + } + } + + @Override + public void close() { + if (!isOpen()) { + return; + } + state = STATE_CLOSE_PENDING; + enqueueResponse(); + fine(fine() ? getPath() + " list closed locally" : null); + } + + @Override + public void close(Exception reason) { + if (!isOpen()) { + return; + } + state = STATE_CLOSE_PENDING; + closeReason = reason; + enqueueResponse(); + fine(fine() ? getPath() + " list closed locally" : null); + } + + /** + * Remove an update from the queue. + */ + private synchronized Update dequeue() { + if (updateHead == null) { + return null; + } + Update ret = null; + ret = updateHead; + if (updateHead == updateTail) { + updateHead = null; + updateTail = null; + } else { + updateHead = updateHead.next; + } + ret.next = null; + return ret; + } + + private void doClose() { + state = STATE_CLOSED; + getResponder().removeInboundRequest(getRequestId()); + if (response == null) { + return; + } + DSRuntime.run(new Runnable() { + @Override + public void run() { + try { + response.onClose(); + } catch (Exception x) { + severe(getPath(), x); + } + } + }); + } + + private void encodeChild(ApiObject child, DSIWriter out) { + out.beginList(); + String name = child.getName(); + String displayName = null; + if (DSPath.encodeName(name, cacheBuf)) { + displayName = name; + out.value(cacheBuf.toString()); + } else { + out.value(name); + } + cacheBuf.setLength(0); + out.beginMap(); + child.getMetadata(cacheMap.clear()); + DSElement e = cacheMap.remove(DSMetadata.DISPLAY_NAME); + if (e != null) { + out.key("$name").value(e); + } else if (displayName != null) { + out.key("$name").value(displayName); + } + e = cacheMap.remove("$is"); + if (e != null) { + out.key("$is").value(e); + } else { + out.key("$is").value("node"); + } + if (child.isAction()) { + ActionSpec action = child.getAction(); + e = cacheMap.remove("$invokable"); + if (e != null) { + out.key("$invokable").value(e); + } else { + out.key("$invokable").value(action.getPermission().toString()); + } + } else if (child.isValue()) { + out.key("$type"); + e = cacheMap.remove("$type"); + if (e != null) { + out.value(e); + } else { + encodeType(child.getValue(), cacheMeta, out); + } + if (!child.isReadOnly()) { + e = cacheMap.remove("$writable"); + if (e != null) { + out.key("$writable").value(e); + } else { + out.key("$writable").value(child.isAdmin() ? "config" : "write"); + } + } + } else if (child.isAdmin()) { + e = cacheMap.remove("$permission"); + if (e != null) { + out.key("$permission").value(e); + } else { + out.key("$permission").value("config"); + } + } + out.endMap().endList(); + cacheMap.clear(); + } + + /** + * Encode all the meta data about the root target of a list request. + */ + private void encodeTarget(ApiObject object, DSIWriter out) { + if (object instanceof DSInfo) { + DSMetadata.getMetadata((DSInfo) object, cacheMap.clear()); + } else { + object.getMetadata(cacheMap.clear()); + } + DSElement e = cacheMap.remove("$is"); + if (e == null) { + out.beginList().value("$is").value("node").endList(); + } else { + out.beginList().value("$is").value(e).endList(); + + } + e = cacheMap.get("$name"); + if (e == null) { + String safeName = object.getName(); + if (DSPath.encodeName(safeName, cacheBuf)) { + safeName = cacheBuf.toString(); + } + cacheBuf.setLength(0); + out.beginList().value("$name").value(safeName).endList(); + } else { + out.beginList().value("$name").value(e).endList(); + } + if (object.isAction()) { + encodeTargetAction(object, out); + } else if (object.isValue()) { + encodeTargetValue(object, out); + } else if (object.isAdmin()) { + e = cacheMap.remove("$permission"); + if (e == null) { + out.beginList().value("$permission").value("config").endList(); + } else { + out.beginList().value("$permission").value(e).endList(); + } + } + encodeTargetMetadata(cacheMap, out); + cacheMap.clear(); + } + + /** + * Called by encodeTarget for actions. + */ + private void encodeTargetAction(ApiObject object, DSIWriter out) { + DSInfo info = null; + if (object instanceof DSInfo) { + info = (DSInfo) object; + } + ActionSpec action = object.getAction(); + DSAction dsAction = null; + if (action instanceof DSAction) { + dsAction = (DSAction) action; + } + DSElement e = cacheMap.remove("$invokable"); + out.beginList().value("$invokable"); + if (e == null) { + out.value(action.getPermission().toString()).endList(); + } else { + out.value(e).endList(); + } + e = cacheMap.remove("params"); + out.beginList().value("$params"); + if (e == null) { + out.beginList(); + Iterator params = action.getParameters(); + if (params != null) { + DSMap param; + while (params.hasNext()) { + param = params.next(); + if (dsAction != null) { + dsAction.prepareParameter(info, param); + } + out.value(fixType(param)); + } + } + out.endList(); + } else { + out.value(e); + } + out.endList(); + if (action.getResultType().isValues()) { + e = cacheMap.remove("$columns"); + out.beginList().value("$columns"); + if (e == null) { + out.beginList(); + Iterator params = action.getParameters(); + if (params != null) { + DSMap param; + while (params.hasNext()) { + param = params.next(); + if (dsAction != null) { + dsAction.prepareParameter(info, param); + } + out.value(fixType(param)); + } + } + out.endList(); + } else { + out.value(e); + } + out.endList(); + } + e = cacheMap.remove("$result"); + if (e != null) { + out.beginList().value("$result").value(e).endList(); + } else if (!action.getResultType().isVoid()) { + out.beginList().value("$result").value(action.getResultType().toString()).endList(); + } + } + + /** + * Called by encodeTarget, encodes meta-data as configs. + */ + private void encodeTargetMetadata(DSMap metadata, DSIWriter out) { + if (cacheMap.isEmpty()) { + return; + } + Entry entry; + String name; + for (int i = 0, len = cacheMap.size(); i < len; i++) { + entry = cacheMap.getEntry(i); + out.beginList(); + name = entry.getKey(); + switch (name.charAt(0)) { + case '$': + case '@': + out.value(name); + default: + cacheBuf.append("@"); //TODO ? + DSPath.encodeName(name, cacheBuf); + out.value(cacheBuf.toString()); + cacheBuf.setLength(0); + + } + out.value(entry.getValue()); + out.endList(); + } + } + + /** + * Called by encodeTarget for values. + */ + private void encodeTargetValue(ApiObject object, DSIWriter out) { + DSElement e = cacheMap.remove("$type"); + out.beginList(); + out.value("$type"); + if (e != null) { + out.value(e); + } else { + encodeType(object.getValue(), cacheMeta, out); + } + out.endList(); + e = cacheMap.remove("$writable"); + if (e != null) { + out.beginList() + .value("$writable") + .value(e) + .endList(); + } else if (!object.isReadOnly()) { + out.beginList() + .value("$writable") + .value(object.isAdmin() ? "config" : "write") + .endList(); + } + } + + private void encodeType(DSIValue value, DSMetadata meta, DSIWriter out) { + String type = meta.getType(); + if ((type == null) && (value != null)) { + meta.setType(value); + } + fixType(meta.getMap()); + DSElement e = cacheMap.remove(DSMetadata.TYPE); + if (e == null) { + throw new IllegalArgumentException("Missing type"); + } + out.value(e); + } + + private void encodeUpdate(Update update, DSIWriter out) { + if (!isOpen()) { + return; + } + if (update.added) { + encodeChild(update.child, out); + } else { + out.beginMap(); + out.key("name").value(DSPath.encodeName(update.child.getName())); + out.key("change").value("remove"); + out.endMap(); + } + } + + private void enqueue(Update update) { + if (!isOpen()) { + return; + } + synchronized (this) { + if (updateHead == null) { + updateHead = update; + updateTail = update; + } else { + updateTail.next = update; + updateTail = update; + } + if (enqueued) { + return; + } + } + getResponder().sendResponse(this); + } + + /** + * Enqueues in the session. + */ + private void enqueueResponse() { + synchronized (this) { + if (enqueued) { + return; + } + enqueued = true; + } + getResponder().sendResponse(this); + } + + /** + * Combines boolean and enum ranges into the type name. + */ + private DSMap fixType(DSMap arg) { + String type = arg.getString(DSMetadata.TYPE); + if ("bool".equals(type)) { + DSList range = (DSList) arg.remove(DSMetadata.BOOLEAN_RANGE); + if ((range == null) || (range.size() != 2)) { + return arg; + } else { + cacheBuf.setLength(0); + cacheBuf.append(type); + cacheBuf.append('['); + cacheBuf.append(range.get(0).toString()); + cacheBuf.append(','); + cacheBuf.append(range.get(1).toString()); + cacheBuf.append(']'); + arg.put(DSMetadata.TYPE, cacheBuf.toString()); + } + } else if ("enum".equals(type)) { + DSList range = (DSList) arg.remove(DSMetadata.ENUM_RANGE); + if (range == null) { + return arg; + } + cacheBuf.setLength(0); + cacheBuf.append(type); + cacheBuf.append('['); + for (int i = 0, len = range.size(); i < len; i++) { + if (i > 0) { + cacheBuf.append(','); + } + cacheBuf.append(range.get(i).toString()); + } + cacheBuf.append(']'); + arg.put(DSMetadata.TYPE, cacheBuf.toString()); + } + return arg; + } + + @Override + public ApiObject getTarget() { + return info; + } + + private boolean isClosed() { + return state == STATE_CLOSED; + } + + private boolean isClosePending() { + return state == STATE_CLOSE_PENDING; + } + + /** + * Not closed or closed pending. + */ + @Override + public boolean isOpen() { + return (state != STATE_CLOSE_PENDING) && (state != STATE_CLOSED); + } + + @Override + public void onClose() { + if (node != null) { + node.unsubscribe(DSNode.INFO_TOPIC, null, this); + } + } + + @Override + public void onEvent(DSTopic topic, DSIEvent event, DSNode node, DSInfo child, + Object... params) { + switch ((DSInfoTopic.Event) event) { + case CHILD_ADDED: + childAdded(child); + break; + case CHILD_REMOVED: + childRemoved(child); + break; + case METADATA_CHANGED: //TODO + break; + default: + } + } + + @Override + public void onUnsubscribed(DSTopic topic, DSNode node, DSInfo child) { + close(); + } + + @Override + public void run() { + try { + RequestPath path = new RequestPath(getPath(), getLink()); + if (path.isResponder()) { + DSIResponder responder = (DSIResponder) path.getTarget(); + setPath(path.getPath()); + response = responder.onList(this); + } else { + info = path.getInfo(); + if (info.isNode()) { + node = info.getNode(); + node.subscribe(DSNode.INFO_TOPIC, null, this); + } + response = this; + } + } catch (Exception x) { + severe(getPath(), x); + close(x); + return; + } + if (response == null) { + close(); + } else { + enqueueResponse(); + } + } + + @Override + public void onClose(Integer requestId) { + if (isClosed()) { + return; + } + state = STATE_CLOSED; + fine(finer() ? getPath() + " list closed" : null); + synchronized (this) { + updateHead = updateTail = null; + } + doClose(); + } + + @Override + public void write(MessageWriter writer) { + DSIWriter out = writer.getWriter(); + enqueued = false; + if (isClosed()) { + return; + } + if (isClosePending() && (updateHead == null) && (closeReason != null)) { + ErrorResponse res = new ErrorResponse(closeReason); + res.parseRequest(getRequest()); + res.write(writer); + doClose(); + return; + } + int last = state; + out.beginMap(); + out.key("rid").value(getRequestId()); + switch (state) { + case STATE_INIT: + out.key("updates").beginList(); + writeInit(out); + break; + case STATE_CHILDREN: + out.key("updates").beginList(); + writeChildren(out); + break; + case STATE_CLOSE_PENDING: + case STATE_UPDATES: + out.key("updates").beginList(); + writeUpdates(out); + break; + default: + ; + } + out.endList(); + if ((state != last) && (state == STATE_UPDATES)) { + out.key("stream").value("open"); + } else if (isClosePending() && (updateHead == null)) { + if (closeReason != null) { + ErrorResponse res = new ErrorResponse(closeReason); + res.parseRequest(getRequest()); + getResponder().sendResponse(res); + } else { + out.key("stream").value("closed"); + } + doClose(); + } + out.endMap(); + } + + private void writeChildren(DSIWriter out) { + if (children != null) { + ApiObject child; + while (children.hasNext()) { + child = children.next(); + if (!child.isHidden()) { + encodeChild(child, out); + } + if (getResponder().shouldEndMessage()) { + enqueueResponse(); + return; + } + } + } + children = null; + state = STATE_UPDATES; + } + + private void writeInit(DSIWriter out) { + ApiObject target = response.getTarget(); + encodeTarget(target, out); + if (target.hasChildren()) { + state = STATE_CHILDREN; + children = target.getChildren(); + writeChildren(out); + } else { + state = STATE_UPDATES; + writeUpdates(out); + } + } + + private void writeUpdates(DSIWriter out) { + DS2Responder session = getResponder(); + Update update = dequeue(); + while (update != null) { + encodeUpdate(update, out); + if (session.shouldEndMessage()) { + enqueueResponse(); + break; + } + update = dequeue(); + } + } + + /////////////////////////////////////////////////////////////////////////// + // Inner Classes + /////////////////////////////////////////////////////////////////////////// + + private static class Update { + + boolean added; + ApiObject child; + Update next; + + Update(ApiObject child, boolean added) { + this.child = child; + this.added = added; + } + } + +} diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/responder/DS2InboundRequest.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/responder/DS2InboundRequest.java new file mode 100644 index 00000000..b305257f --- /dev/null +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/responder/DS2InboundRequest.java @@ -0,0 +1,92 @@ +package com.acuity.iot.dsa.dslink.protocol.protocol_v2.responder; + +import com.acuity.iot.dsa.dslink.protocol.protocol_v2.DS2Session; +import org.iot.dsa.dslink.DSLink; +import org.iot.dsa.dslink.responder.InboundRequest; +import org.iot.dsa.logging.DSLogger; +import org.iot.dsa.node.DSMap; + +/** + * Getters and setters common to most requests. + * + * @author Aaron Hansen + */ +class DS2InboundRequest extends DSLogger implements InboundRequest { + + /////////////////////////////////////////////////////////////////////////// + // Fields + /////////////////////////////////////////////////////////////////////////// + + private DSLink link; + private String path; + private DSMap request; + private Integer requestId; + private DS2Responder responder; + private DS2Session session; + + /////////////////////////////////////////////////////////////////////////// + // Constructors + /////////////////////////////////////////////////////////////////////////// + + DS2InboundRequest() { + } + + /////////////////////////////////////////////////////////////////////////// + // Methods in alphabetical order + /////////////////////////////////////////////////////////////////////////// + + public DSLink getLink() { + return link; + } + + public String getPath() { + return path; + } + + public DSMap getRequest() { + return request; + } + + public DS2Responder getResponder() { + return responder; + } + + public Integer getRequestId() { + return requestId; + } + + public DS2Session getSession() { + return session; + } + + public DS2InboundRequest setLink(DSLink link) { + this.link = link; + return this; + } + + public DS2InboundRequest setPath(String path) { + this.path = path; + return this; + } + + public DS2InboundRequest setSession(DS2Session session) { + this.session = session; + return this; + } + + public DS2InboundRequest setRequest(DSMap request) { + this.request = request; + return this; + } + + public DS2InboundRequest setRequestId(Integer requestId) { + this.requestId = requestId; + return this; + } + + public DS2InboundRequest setResponder(DS2Responder responder) { + this.responder = responder; + return this; + } + +} diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/responder/DS2Responder.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/responder/DS2Responder.java index dcd2296a..a075a49c 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/responder/DS2Responder.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/protocol_v2/responder/DS2Responder.java @@ -4,9 +4,10 @@ import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage; import com.acuity.iot.dsa.dslink.protocol.protocol_v2.DS2Session; import com.acuity.iot.dsa.dslink.protocol.protocol_v2.MessageConstants; -import com.acuity.iot.dsa.dslink.protocol.protocol_v2.MessageReader; +import com.acuity.iot.dsa.dslink.protocol.protocol_v2.DS2MessageReader; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Logger; +import org.iot.dsa.DSRuntime; import org.iot.dsa.dslink.DSLinkConnection; import org.iot.dsa.node.DSNode; @@ -64,6 +65,27 @@ public DS2InboundSubscriptions getSubscriptions() { } */ + /** + * Process an individual request. + */ + public void handleRequest(DS2MessageReader reader) { + switch (reader.getMethod()) { + case MSG_INVOKE_REQ: + break; + case MSG_LIST_REQ: + processList(reader); + break; + case MSG_OBSERVE_REQ: + break; + case MSG_SET_REQ: + break; + case MSG_SUBSCRIBE_REQ: + break; + default: + throw new IllegalArgumentException("Unexpected method: " + reader.getMethod()); + } + } + public void onConnect() { } @@ -101,38 +123,17 @@ private void processInvoke(Integer rid, DSMap req) { /** * Handles a list request. - private void processList(Integer rid, DSMap req) { - DS2InboundList listImpl = new DS2InboundList(); - listImpl.setPath(getPath(req)) - .setSession(session) - .setRequest(req) - .setRequestId(rid) - .setResponderImpl(responder) - .setResponder(this); - inboundRequests.put(listImpl.getRequestId(), listImpl); - DSRuntime.run(listImpl); - } - */ - - /** - * Process an individual request. */ - public void processRequest(MessageReader reader) { - switch (reader.getMethod()) { - case MSG_INVOKE_REQ : - break; - case MSG_LIST_REQ : - //processList() - break; - case MSG_OBSERVE_REQ : - break; - case MSG_SET_REQ : - break; - case MSG_SUBSCRIBE_REQ : - break; - default : - throw new IllegalArgumentException("Unexpected method: " + reader.getMethod()); - } + private void processList(DS2MessageReader msg) { + int rid = msg.getRequestId(); + String path = (String) msg.getHeader(HDR_TARGET_PATH); + DS2InboundList listImpl = new DS2InboundList(); + listImpl.setPath(path) + .setSession(session) + .setRequestId(rid) + .setResponder(this); + inboundRequests.put(listImpl.getRequestId(), listImpl); + DSRuntime.run(listImpl); } /** diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundRequest.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundRequest.java new file mode 100644 index 00000000..c76265b3 --- /dev/null +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundRequest.java @@ -0,0 +1,74 @@ +package com.acuity.iot.dsa.dslink.protocol.responder; + +import com.acuity.iot.dsa.dslink.DSSession; +import org.iot.dsa.dslink.DSLink; +import org.iot.dsa.dslink.responder.InboundRequest; +import org.iot.dsa.logging.DSLogger; + +/** + * Getters and setters common to most requests. + * + * @author Aaron Hansen + */ +public class DSInboundRequest extends DSLogger implements InboundRequest { + + /////////////////////////////////////////////////////////////////////////// + // Fields + /////////////////////////////////////////////////////////////////////////// + + private DSLink link; + private String path; + private Integer requestId; + private DSResponder responder; + private DSSession session; + + /////////////////////////////////////////////////////////////////////////// + // Methods in alphabetical order + /////////////////////////////////////////////////////////////////////////// + + public DSLink getLink() { + return link; + } + + public String getPath() { + return path; + } + + public Integer getRequestId() { + return requestId; + } + + public DSResponder getResponder() { + return responder; + } + + public DSSession getSession() { + return session; + } + + public DSInboundRequest setLink(DSLink link) { + this.link = link; + return this; + } + + public DSInboundRequest setPath(String path) { + this.path = path; + return this; + } + + public DSInboundRequest setResponder(DSResponder responder) { + this.responder = responder; + return this; + } + + public DSInboundRequest setSession(DSSession session) { + this.session = session; + return this; + } + + public DSInboundRequest setRequestId(Integer requestId) { + this.requestId = requestId; + return this; + } + +} diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSResponder.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSResponder.java new file mode 100644 index 00000000..48ef298e --- /dev/null +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSResponder.java @@ -0,0 +1,103 @@ +package com.acuity.iot.dsa.dslink.protocol.responder; + +import com.acuity.iot.dsa.dslink.DSSession; +import com.acuity.iot.dsa.dslink.protocol.DSStream; +import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Logger; +import org.iot.dsa.dslink.DSLink; +import org.iot.dsa.dslink.DSLinkConnection; +import org.iot.dsa.node.DSNode; + +/** + * Abstract responder implementation. + * + * @author Aaron Hansen + */ +public abstract class DSResponder extends DSNode { + + /////////////////////////////////////////////////////////////////////////// + // Constants + /////////////////////////////////////////////////////////////////////////// + + /////////////////////////////////////////////////////////////////////////// + // Fields + /////////////////////////////////////////////////////////////////////////// + + private DSLinkConnection connection; + private ConcurrentHashMap inboundRequests = new ConcurrentHashMap(); + private DSLink link; + private Logger logger; + private DSSession session; + private DSResponder responder; + //private DS2InboundSubscriptions subscriptions = new DSInboundSubscriptions(this); + + ///////////////////////////////////////////////////////////////// + // Methods - Constructors + ///////////////////////////////////////////////////////////////// + + public DSResponder(DSSession session) { + this.session = session; + } + + ///////////////////////////////////////////////////////////////// + // Methods - In alphabetical order by method name. + ///////////////////////////////////////////////////////////////// + + public DSLinkConnection getConnection() { + if (connection == null) { + connection = session.getConnection(); + } + return connection; + } + + public DSLink getLink() { + if (link == null) { + link = getConnection().getLink(); + } + return link; + } + + @Override + public Logger getLogger() { + if (logger == null) { + logger = Logger.getLogger( + getConnection().getLink().getLinkName() + ".responder"); + } + return logger; + } + + public Map getRequests() { + return inboundRequests; + } + + public DSSession getSession() { + return session; + } + + public void onConnect() { + } + + public void onConnectFail() { + } + + public void onDisconnect() { + } + + public DSStream putRequest(Integer rid, DSStream request) { + return inboundRequests.put(rid, request); + } + public DSStream removeRequest(Integer rid) { + return inboundRequests.remove(rid); + } + + public boolean shouldEndMessage() { + return session.shouldEndMessage(); + } + + public void sendResponse(OutboundMessage res) { + session.enqueueOutgoingResponse(res); + } + +} diff --git a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/test/TestLink.java b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/test/TestLink.java index b98506fa..33d6d8d5 100644 --- a/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/test/TestLink.java +++ b/dslink-core/src/main/java/com/acuity/iot/dsa/dslink/test/TestLink.java @@ -44,7 +44,7 @@ protected DS1ConnectionInit initializeConnection() { * implementation. */ protected DS1Session makeSession(DS1ConnectionInit init) { - DS1Session ret = new DS1Session(); + DS1Session ret = new DS1Session(TestConnection.this); ret.setRequesterAllowed(); return ret; } diff --git a/dslink-core/src/main/java/org/iot/dsa/dslink/requester/SimpleInvokeHandler.java b/dslink-core/src/main/java/org/iot/dsa/dslink/requester/SimpleInvokeHandler.java new file mode 100644 index 00000000..585fa9a7 --- /dev/null +++ b/dslink-core/src/main/java/org/iot/dsa/dslink/requester/SimpleInvokeHandler.java @@ -0,0 +1,141 @@ +package org.iot.dsa.dslink.requester; + +import com.acuity.iot.dsa.dslink.DSProtocolException; +import org.iot.dsa.node.DSList; +import org.iot.dsa.node.DSMap; + +/** + * Action handler for non-tables/streams. + *

+ * Call getResult(long timeout) to block until the invocation is complete. It will either return + * the result (possibly null), or throw an exception. + * + * @author Aaron Hansen + */ +public class SimpleInvokeHandler extends AbstractInvokeHandler { + + /////////////////////////////////////////////////////////////////////////// + // Fields + /////////////////////////////////////////////////////////////////////////// + + private boolean autoClose = true; + private boolean closed = false; + private RuntimeException error; + private DSList result; + + /////////////////////////////////////////////////////////////////////////// + // Methods + /////////////////////////////////////////////////////////////////////////// + + /** + * Waits for the stream to close before returning, or the timeout to occur. + * + * @param timeout Passed to Object.wait + * @return Null, or the first update. + * @throws RuntimeException if there is a timeout, or if there are any errors. + */ + public DSList getResult(long timeout) { + synchronized (this) { + if (!closed) { + try { + wait(timeout); + } catch (Exception x) { + } + } + } + if (error != null) { + throw error; + } + if (!closed) { + throw new IllegalStateException("Action timed out"); + } + return result; + } + + /** + * True by default, whether or not to close the stream upon receiving the first result. + */ + public boolean isAutoClose() { + return autoClose; + } + + /** + * Causes getResult to return. + */ + public void onClose() { + synchronized (this) { + closed = true; + notifyAll(); + } + } + + /** + * Will create an exception to be thrown by getResult. + */ + public void onError(String type, String msg, String detail) { + synchronized (this) { + error = new DSProtocolException(msg).setType(type).setDetail(detail); + getStream().closeStream(); + notifyAll(); + } + } + + /** + * Does nothing. + */ + public void onColumns(DSList list) { + } + + /** + * Will result in an error since tables and streams are not supported. + */ + public void onInsert(int index, DSList rows) { + synchronized (this) { + error = new IllegalArgumentException("Tables and streams not supported"); + getStream().closeStream(); + notifyAll(); + } + } + + /** + * Does nothing. + */ + public void onMode(Mode mode) { + } + + /** + * Will result in an error since tables and streams are not supported. + */ + public void onReplace(int start, int end, DSList rows) { + synchronized (this) { + error = new IllegalArgumentException("Tables and streams not supported"); + getStream().closeStream(); + notifyAll(); + } + } + + public void onTableMeta(DSMap map) { + } + + /** + * Captures the result and if auto-close is true, closes the stream. + */ + public void onUpdate(DSList row) { + synchronized (this) { + result = row; + if (autoClose) { + getStream().closeStream(); + } + } + } + + /** + * Whether or not to auto close the stream on the first update. True by default, this + * only needs to be called to disable. + */ + public SimpleInvokeHandler setAutoClose(boolean arg) { + autoClose = arg; + return this; + } + +} diff --git a/dslink-core/src/main/java/org/iot/dsa/node/DSNode.java b/dslink-core/src/main/java/org/iot/dsa/node/DSNode.java index 15a2f6c0..1f3eb9f0 100644 --- a/dslink-core/src/main/java/org/iot/dsa/node/DSNode.java +++ b/dslink-core/src/main/java/org/iot/dsa/node/DSNode.java @@ -189,6 +189,7 @@ public class DSNode extends DSLogger implements DSIObject, Iterable { private DSInfo firstChild; private DSInfo infoInParent; private DSInfo lastChild; + private Object mutex = new Object(); private String path; private Subscription subscription; private int size = 0; @@ -203,7 +204,7 @@ public class DSNode extends DSLogger implements DSIObject, Iterable { */ void add(final DSInfo info) { dsInit(); - synchronized (this) { + synchronized (mutex) { if (lastChild != null) { lastChild.next = info; info.prev = lastChild; @@ -460,7 +461,7 @@ public DSInfo getInfo(String name) { if (firstChild == null) { return null; } - synchronized (this) { + synchronized (mutex) { if (size >= MAP_THRESHOLD) { if (childMap == null) { childMap = new TreeMap(); @@ -1030,7 +1031,7 @@ public DSNode remove(DSInfo info) { if (info.getParent() != this) { throw new IllegalStateException("Not a child of this container"); } - synchronized (this) { + synchronized (mutex) { if (childMap != null) { childMap.remove(info.getName()); if (childMap.size() < MAP_THRESHOLD) { @@ -1218,7 +1219,7 @@ public void subscribe(DSTopic topic, DSInfo child, DSISubscriber subscriber) { } boolean firstSubscription; int count = 0; - synchronized (this) { + synchronized (mutex) { Subscription sub = subscription; firstSubscription = (sub == null); Subscription prev = null; @@ -1278,7 +1279,7 @@ protected static DSNode toNode(Object obj) { public void unsubscribe(DSTopic topic, DSInfo child, DSISubscriber subscriber) { int count = 0; Subscription removed = null; - synchronized (this) { + synchronized (mutex) { Subscription sub = subscription; Subscription prev = null; while (sub != null) { diff --git a/dslink-core/src/main/java/org/iot/dsa/node/action/DSAction.java b/dslink-core/src/main/java/org/iot/dsa/node/action/DSAction.java index 3c7bae4d..0f2c5112 100644 --- a/dslink-core/src/main/java/org/iot/dsa/node/action/DSAction.java +++ b/dslink-core/src/main/java/org/iot/dsa/node/action/DSAction.java @@ -307,12 +307,4 @@ private void validate(DSMap params, List existing) { } } - /////////////////////////////////////////////////////////////////////////// - // Inner Classes - /////////////////////////////////////////////////////////////////////////// - - /////////////////////////////////////////////////////////////////////////// - // Initialization - /////////////////////////////////////////////////////////////////////////// - } diff --git a/dslink-core/src/test/java/org/iot/dsa/dslink/PerfTest.java b/dslink-core/src/test/java/org/iot/dsa/dslink/PerfTest.java index 27757b1b..f3ed7971 100644 --- a/dslink-core/src/test/java/org/iot/dsa/dslink/PerfTest.java +++ b/dslink-core/src/test/java/org/iot/dsa/dslink/PerfTest.java @@ -69,9 +69,13 @@ private long memoryUsed() { //@Test public void execute() { System.out.println("\nStarting " + new java.util.Date()); + System.out.println("Test (1/3)..."); test(false); + System.out.println("Test (2/3)..."); test(false); + System.out.println("Test (3/3)..."); test(true); + System.out.println("\nFinished " + new java.util.Date()); } public void test(boolean print) { diff --git a/dslink-core/src/test/java/org/iot/dsa/dslink/RequesterInvokeTest.java b/dslink-core/src/test/java/org/iot/dsa/dslink/RequesterInvokeTest.java new file mode 100644 index 00000000..bc0df6e5 --- /dev/null +++ b/dslink-core/src/test/java/org/iot/dsa/dslink/RequesterInvokeTest.java @@ -0,0 +1,128 @@ +package org.iot.dsa.dslink; + +import com.acuity.iot.dsa.dslink.protocol.protocol_v1.DS1LinkConnection; +import com.acuity.iot.dsa.dslink.test.TestLink; +import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport; +import org.iot.dsa.dslink.requester.AbstractInvokeHandler; +import org.iot.dsa.dslink.requester.OutboundInvokeHandler; +import org.iot.dsa.dslink.requester.SimpleInvokeHandler; +import org.iot.dsa.node.DSInfo; +import org.iot.dsa.node.DSInt; +import org.iot.dsa.node.DSList; +import org.iot.dsa.node.DSMap; +import org.iot.dsa.node.DSMetadata; +import org.iot.dsa.node.DSNode; +import org.iot.dsa.node.DSValueType; +import org.iot.dsa.node.action.ActionInvocation; +import org.iot.dsa.node.action.ActionResult; +import org.iot.dsa.node.action.DSAction; +import org.junit.Assert; +import org.junit.Test; + +/** + * @author Aaron Hansen + */ +public class RequesterInvokeTest implements DSLinkConnection.Listener { + + // Fields + // ------ + + private static boolean success = false; + private DSLink link; + private MyMain root; + + // Methods + // ------- + + public void onConnect(DSLinkConnection connection) { + DSIRequester requester = link.getConnection().getRequester(); + success = true; + synchronized (this) { + notifyAll(); + } + } + + public void onDisconnect(DSLinkConnection connection) { + } + + @Test + public void theTest() throws Exception { + link = new TestLink(root = new MyMain()); + link.getConnection().addListener(this); + Thread t = new Thread(link, "DSLink Runner"); + t.start(); + success = false; + synchronized (this) { + this.wait(5000); + } + Assert.assertTrue(success); + success = false; + DSIRequester requester = link.getConnection().getRequester(); + SimpleInvokeHandler res = (SimpleInvokeHandler) requester.invoke( + "/main/simpleAction", null, new SimpleInvokeHandler()); + res.getResult(1000); + res = (SimpleInvokeHandler) requester.invoke( + "/main/simpleParam", + new DSMap().put("param", true), + new SimpleInvokeHandler()); + res.getResult(1000); + Assert.assertTrue(success); + res = (SimpleInvokeHandler) requester.invoke( + "/main/exception", + new DSMap().put("param", true), + new SimpleInvokeHandler()); + success = false; + try { + res.getResult(1000); + } catch (Exception x) { + success = x.getMessage().equals("my message"); + } + Assert.assertTrue(success); + } + + // Inner Classes + // ------------- + + public static class MyMain extends DSMainNode { + + @Override + public void declareDefaults() { + declareDefault("anode", new ANode()); + declareDefault("simpleAction", DSAction.DEFAULT); + DSAction action = new DSAction(); + action.addParameter("param", DSValueType.BOOL, "a desc"); + declareDefault("simpleParam", action); + declareDefault("exception", DSAction.DEFAULT); + } + + @Override + public ActionResult onInvoke(DSInfo action, ActionInvocation invocation) { + String name = action.getName(); + if (name.equals("simpleAction")) { + success = true; + } else if (name.equals("simpleParam")) { + DSMap params = invocation.getParameters(); + success = params.get("param", false); + } else if (name.equals("exception")) { + throw new IllegalStateException("my message"); + } + return null; + } + + } + + public static class ANode extends DSNode { + + @Override + public void declareDefaults() { + declareDefault("int", DSInt.valueOf(0)); + } + + @Override + public ActionResult onInvoke(DSInfo action, ActionInvocation invocation) { + return null; + } + + } + +} diff --git a/dslink-core/src/test/java/org/iot/dsa/dslink/V2HandshakeTest.java b/dslink-core/src/test/java/org/iot/dsa/dslink/V2HandshakeTest.java index 7293bd11..2f6c26b6 100644 --- a/dslink-core/src/test/java/org/iot/dsa/dslink/V2HandshakeTest.java +++ b/dslink-core/src/test/java/org/iot/dsa/dslink/V2HandshakeTest.java @@ -1,7 +1,7 @@ package org.iot.dsa.dslink; -import com.acuity.iot.dsa.dslink.protocol.protocol_v2.MessageReader; -import com.acuity.iot.dsa.dslink.protocol.protocol_v2.MessageWriter; +import com.acuity.iot.dsa.dslink.protocol.protocol_v2.DS2MessageReader; +import com.acuity.iot.dsa.dslink.protocol.protocol_v2.DS2MessageWriter; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.nio.ByteBuffer; @@ -84,7 +84,7 @@ public void testF0() throws Exception { byte[] saltBytes = toBytesFromHex(saltHex); Assert.assertTrue(saltBytes.length == 32); //construct the message - MessageWriter writer = new MessageWriter(); + DS2MessageWriter writer = new DS2MessageWriter(); writer.setMethod((byte) 0xf0); ByteBuffer buffer = writer.getBody(); buffer.put((byte) 2).put((byte) 0); //dsa version @@ -111,7 +111,7 @@ public void testF1() throws Exception { byte[] saltBytes = toBytesFromHex(saltHex); Assert.assertTrue(saltBytes.length == 32); //construct the message - MessageWriter writer = new MessageWriter(); + DS2MessageWriter writer = new DS2MessageWriter(); writer.setMethod((byte) 0xf1); ByteBuffer buffer = writer.getBody(); //dsa version @@ -127,7 +127,7 @@ public void testF1() throws Exception { byte[] correctBytes = toBytesFromHex(correctResult); Assert.assertArrayEquals(bytes, correctBytes); //validate reading - MessageReader reader = new MessageReader(); + DS2MessageReader reader = new DS2MessageReader(); reader.init(new ByteArrayInputStream(bytes)); InputStream in = reader.getBody(); Assert.assertEquals(0xf1, reader.getMethod()); @@ -158,7 +158,7 @@ public void testF2() throws Exception { byte[] validAuthBytes = toBytesFromHex(validAuthString); Assert.assertTrue(Arrays.equals(auth, validAuthBytes)); //construct the message - MessageWriter writer = new MessageWriter(); + DS2MessageWriter writer = new DS2MessageWriter(); writer.setMethod((byte) 0xf2); ByteBuffer buffer = writer.getBody(); writer.writeString("sample_token_string", buffer); @@ -191,7 +191,7 @@ public void testF3() throws Exception { byte[] validAuthBytes = toBytesFromHex(validAuthString); Assert.assertTrue(Arrays.equals(auth, validAuthBytes)); //construct the message - MessageWriter writer = new MessageWriter(); + DS2MessageWriter writer = new DS2MessageWriter(); writer.setMethod((byte) 0xf3); ByteBuffer buffer = writer.getBody(); buffer.put((byte) 1); // allow requester @@ -207,7 +207,7 @@ public void testF3() throws Exception { byte[] correctBytes = toBytesFromHex(correctResult); Assert.assertTrue(Arrays.equals(tmp, correctBytes)); //validate reading - MessageReader reader = new MessageReader(); + DS2MessageReader reader = new DS2MessageReader(); reader.init(new ByteArrayInputStream(tmp)); InputStream in = reader.getBody(); Assert.assertEquals(0xf3, reader.getMethod());