Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apply plugin: 'java'
apply plugin: 'maven'

group 'org.iot.dsa'
version '0.15.0'
version '0.16.0'

sourceCompatibility = 1.6
targetCompatibility = 1.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ public String getType() {
return type;
}

/**
* Optional.
*/
public DSProtocolException setDetail(String arg) {
detail = arg;
return this;
}

/**
* Optional.
*/
Expand Down
21 changes: 13 additions & 8 deletions dslink-core/src/main/java/com/acuity/iot/dsa/dslink/DSSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ public abstract class DSSession extends DSNode {
private List<OutboundMessage> outgoingResponses = new LinkedList<OutboundMessage>();
protected boolean requesterAllowed = false;

///////////////////////////////////////////////////////////////////////////
// Constructors
///////////////////////////////////////////////////////////////////////////

public DSSession() {
}

public DSSession(DSLinkConnection connection) {
this.connection = connection;
}

///////////////////////////////////////////////////////////////////////////
// Methods
///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -202,6 +213,8 @@ public void setRequesterAllowed() {
requesterAllowed = true;
}

public abstract boolean shouldEndMessage();

/**
* Called by the connection, this manages the running state and calls doRun for the specific
* implementation. A separate thread is spun off to manage writing.
Expand All @@ -221,14 +234,6 @@ public void run() {
}
}

/**
* For use by the connection object.
*/
public DSSession setConnection(DSLinkConnection connection) {
this.connection = connection;
return this;
}

///////////////////////////////////////////////////////////////////////////
// Inner Classes
///////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public BaseMessage setStream(String arg) {
* write the key value pairs that have been configured on this object, but does not close the
* response map.
*/
public void write(DSIWriter writer) {
public void write(MessageWriter out) {
DSIWriter writer = out.getWriter();
writer.beginMap();
if (rid >= 0) {
writer.key("rid").value(rid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public CloseMessage(Integer rid) {
* Calls the super implementation then writes the error object and closes the entire response
* object.
*/
public void write(DSIWriter writer) {
public void write(MessageWriter writer) {
super.write(writer);
writer.endMap();
writer.getWriter().endMap();
}

/////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public ErrorResponse parse(Throwable reason) {
reason = tmp;
}
}
msg = reason.getMessage();
if (reason instanceof DSProtocolException) {
DSProtocolException x = (DSProtocolException) reason;
detail = x.getDetail();
msg = x.getMessage();
phase = x.getPhase();
type = x.getType();
} else if (reason instanceof DSRequestException) {
Expand Down Expand Up @@ -117,36 +117,25 @@ public ErrorResponse setType(String arg) {
* Calls the super implementation then writes the error object and closes the entire response
* object.
*/
public void write(DSIWriter writer) {
public void write(MessageWriter writer) {
super.write(writer);
writer.key("error").beginMap();
DSIWriter out = writer.getWriter();
out.key("error").beginMap();
if (type != null) {
writer.key("type").value(type);
out.key("type").value(type);
}
if (phase == Phase.RESPONSE) {
writer.key("phase").value("response");
out.key("phase").value("response");
}
if (msg != null) {
writer.key("msg").value(msg);
out.key("msg").value(msg);
}
if (detail != null) {
writer.key("detail").value(detail);
out.key("detail").value(detail);
}
writer.endMap().endMap();
out.endMap().endMap();
}

/////////////////////////////////////////////////////////////////
// Methods - Protected and in alphabetical order by method name.
/////////////////////////////////////////////////////////////////

/////////////////////////////////////////////////////////////////
// Methods - Package and in alphabetical order by method name.
/////////////////////////////////////////////////////////////////

/////////////////////////////////////////////////////////////////
// Methods - Private and in alphabetical order by method name.
/////////////////////////////////////////////////////////////////

/////////////////////////////////////////////////////////////////
// Inner Classes - in alphabetical order by class name.
/////////////////////////////////////////////////////////////////
Expand All @@ -156,12 +145,4 @@ public enum Phase {
RESPONSE
}

/////////////////////////////////////////////////////////////////
// Facets - in alphabetical order by field name.
/////////////////////////////////////////////////////////////////

/////////////////////////////////////////////////////////////////
// Initialization
/////////////////////////////////////////////////////////////////

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.acuity.iot.dsa.dslink.protocol.message;

import org.iot.dsa.io.DSIWriter;

/**
* Used to read a DSA 2.n message (header and body). Call init(InputStream) to start a new message,
* can be reused for multiple messages. Not thread safe, the intent is messages will be constructed
* and read serially.
*
* @author Aaron Hansen
*/
public interface MessageWriter {

public DSIWriter getWriter();

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.acuity.iot.dsa.dslink.protocol.message;

import org.iot.dsa.io.DSIWriter;

/**
* @author Aaron Hansen
*/
Expand All @@ -10,6 +8,6 @@ public interface OutboundMessage {
/**
* Write the full request or response message object.
*/
public void write(DSIWriter writer);
public void write(MessageWriter writer);

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.acuity.iot.dsa.dslink.protocol.protocol_v1.responder;
package com.acuity.iot.dsa.dslink.protocol.message;

import org.iot.dsa.dslink.DSIResponder;
import org.iot.dsa.dslink.DSInvalidPathException;
Expand All @@ -12,7 +12,7 @@
*
* @author Aaron Hansen
*/
class RequestPath {
public class RequestPath {

///////////////////////////////////////////////////////////////////////////
// Fields
Expand All @@ -29,7 +29,7 @@ class RequestPath {
// Constructors
///////////////////////////////////////////////////////////////////////////

RequestPath(String path, DSNode root) {
public RequestPath(String path, DSNode root) {
this.path = path;
this.root = root;
names = DSPath.decodePath(path);
Expand All @@ -40,9 +40,9 @@ class RequestPath {
///////////////////////////////////////////////////////////////////////////

/**
* The info of the target in the parent.
* The info of the target in it's parent.
*/
DSInfo getInfo() {
public DSInfo getInfo() {
if (target == null) {
getTarget();
}
Expand All @@ -52,21 +52,22 @@ DSInfo getInfo() {
/**
* The parent of the target unless the request was for /
*/
DSNode getParent() {
public DSNode getParent() {
if (parent == null) {
getTarget();
}
return parent;
}

/**
* If the target is a responder, this is path to send to it.
* If the target is a responder, this is path it should use, not the original path of the
* request.
*/
String getPath() {
public String getPath() {
return path;
}

DSIObject getTarget() {
public DSIObject getTarget() {
if (parent == null) {
parent = root.getParent();
target = root;
Expand Down Expand Up @@ -105,7 +106,7 @@ DSIObject getTarget() {
return target;
}

boolean isResponder() {
public boolean isResponder() {
return target instanceof DSIResponder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,8 @@ protected void onInitialize() {
makeTransport(init);
put(TRANSPORT, getTransport()).setTransient(true);
if (session == null) {
session = new DS1Session();
session = new DS1Session(this);
put(SESSION, session).setTransient(true);
session.setConnection(this);
}
connectionInit = init;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.acuity.iot.dsa.dslink.DSProtocolException;
import com.acuity.iot.dsa.dslink.DSSession;
import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter;
import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage;
import com.acuity.iot.dsa.dslink.protocol.protocol_v1.requester.DS1Requester;
import com.acuity.iot.dsa.dslink.protocol.protocol_v1.responder.DS1Responder;
Expand Down Expand Up @@ -46,12 +47,24 @@ public class DS1Session extends DSSession {
private DSInfo lastAckRecv = getInfo(LAST_ACK_RECV);
private DSInfo lastAckSent = getInfo(LAST_ACK_SENT);
private long lastMessageSent;
private MessageWriter messageWriter;
private int nextAck = -1;
private int nextMsg = 1;
private boolean requestsNext = false;
private DS1Requester requester = new DS1Requester(this);
private DS1Responder responder = new DS1Responder(this);

/////////////////////////////////////////////////////////////////
// Constructors
/////////////////////////////////////////////////////////////////

public DS1Session() {
}

public DS1Session(DS1LinkConnection connection) {
super(connection);
}

/////////////////////////////////////////////////////////////////
// Methods
/////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -178,6 +191,13 @@ public DS1LinkConnection getConnection() {
return (DS1LinkConnection) super.getConnection();
}

private MessageWriter getMessageWriter() {
if (messageWriter == null) {
messageWriter = new MyMessageWriter(getConnection().getWriter());
}
return messageWriter;
}

public DSIReader getReader() {
return getConnection().getReader();
}
Expand All @@ -187,8 +207,8 @@ public DSIRequester getRequester() {
return requester;
}

public DSIWriter getWriter() {
return getConnection().getWriter();
private DSIWriter getWriter() {
return getMessageWriter().getWriter();
}

private boolean hasPingToSend() {
Expand Down Expand Up @@ -390,7 +410,7 @@ public boolean shouldEndMessage() {
* @see #endRequests()
*/
public void writeRequest(OutboundMessage message) {
message.write(getWriter());
message.write(getMessageWriter());
}

/**
Expand All @@ -401,7 +421,26 @@ public void writeRequest(OutboundMessage message) {
* @see #endResponses()
*/
public void writeResponse(OutboundMessage message) {
message.write(getWriter());
message.write(getMessageWriter());
}


/////////////////////////////////////////////////////////////////
// Inner Classes
/////////////////////////////////////////////////////////////////

private class MyMessageWriter implements MessageWriter {

DSIWriter writer;

MyMessageWriter(DSIWriter writer) {
this.writer = writer;
}

@Override
public DSIWriter getWriter() {
return writer;
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.acuity.iot.dsa.dslink.protocol.protocol_v1.requester;

import com.acuity.iot.dsa.dslink.protocol.message.MessageWriter;
import org.iot.dsa.dslink.requester.OutboundInvokeHandler;
import org.iot.dsa.dslink.requester.OutboundInvokeHandler.Mode;
import org.iot.dsa.io.DSIWriter;
Expand Down Expand Up @@ -98,7 +99,8 @@ protected void handleResponse(DSMap response) {
}

@Override
public void write(DSIWriter out) {
public void write(MessageWriter writer) {
DSIWriter out = writer.getWriter();
out.beginMap();
out.key("rid").value(getRequestId());
out.key("method").value("invoke");
Expand Down
Loading