Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apply plugin: 'java'
apply plugin: 'maven'

group 'org.iot.dsa'
version '0.17.0'
version '0.18.0'

sourceCompatibility = 1.6
targetCompatibility = 1.6
Expand Down
96 changes: 75 additions & 21 deletions dslink-core/src/main/java/com/acuity/iot/dsa/dslink/DSSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage;
import com.acuity.iot.dsa.dslink.transport.DSTransport;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.logging.Logger;
import org.iot.dsa.dslink.DSIRequester;
import org.iot.dsa.dslink.DSLinkConnection;
import org.iot.dsa.node.DSNode;
Expand All @@ -17,13 +17,23 @@
*/
public abstract class DSSession extends DSNode {

///////////////////////////////////////////////////////////////////////////
// Constants
///////////////////////////////////////////////////////////////////////////

private static final int MAX_MSG_ID = 2147483647;
private static final long MSG_TIMEOUT = 60000;

///////////////////////////////////////////////////////////////////////////
// Fields
///////////////////////////////////////////////////////////////////////////

private long lastRecv;
private long lastSend;
private int nextAck = -1;
private int nextMessage = 1;
private boolean connected = false;
private DSLinkConnection connection;
private Logger logger;
private Object outgoingMutex = new Object();
private List<OutboundMessage> outgoingRequests = new LinkedList<OutboundMessage>();
private List<OutboundMessage> outgoingResponses = new LinkedList<OutboundMessage>();
Expand Down Expand Up @@ -127,11 +137,28 @@ public DSLinkConnection getConnection() {
}

@Override
public Logger getLogger() {
if (logger == null) {
logger = Logger.getLogger(getConnection().getLink().getLinkName() + ".session");
protected String getLogName() {
return "Session";
}

/**
* The next ack id, or -1.
*/
public synchronized int getNextAck() {
int ret = nextAck;
nextAck = -1;
return ret;
}

/**
* Returns the next new message id.
*/
public synchronized int getNextMessageId() {
int ret = nextMessage;
if (++nextMessage > MAX_MSG_ID) {
nextMessage = 1;
}
return logger;
return ret;
}

public abstract DSIRequester getRequester();
Expand All @@ -140,17 +167,8 @@ public DSTransport getTransport() {
return getConnection().getTransport();
}

/**
* True if there are any outbound requests or responses queued up.
*/
protected final boolean hasMessagesToSend() {
if (!outgoingResponses.isEmpty()) {
return true;
}
if (!outgoingRequests.isEmpty()) {
return true;
}
return false;
protected boolean hasAckToSend() {
return nextAck > 0;
}

protected boolean hasOutgoingRequests() {
Expand All @@ -165,7 +183,16 @@ protected boolean hasOutgoingResponses() {
* Override point, this returns the result of hasMessagesToSend.
*/
protected boolean hasSomethingToSend() {
return hasMessagesToSend();
if (nextAck > 0) {
return true;
}
if (!outgoingResponses.isEmpty()) {
return true;
}
if (!outgoingRequests.isEmpty()) {
return true;
}
return false;
}

protected boolean isConnected() {
Expand Down Expand Up @@ -206,6 +233,16 @@ public void onDisconnect() {
}
}

/**
* Call for each incoming message id that needs to be acked.
*/
public synchronized void setNextAck(int nextAck) {
if (nextAck > 0) {
this.nextAck = nextAck;
notifyOutgoing();
}
}

/**
* Called when the broker signifies that requests are allowed.
*/
Expand All @@ -220,20 +257,35 @@ public void setRequesterAllowed() {
* implementation. A separate thread is spun off to manage writing.
*/
public void run() {
lastRecv = lastSend = System.currentTimeMillis();
new WriteThread(getConnection().getLink().getLinkName() + " Writer").start();
while (connected) {
try {
verifyLastSend();
doRecvMessage();
lastRecv = System.currentTimeMillis();
} catch (Exception x) {
getTransport().close();
if (connected) {
connected = false;
severe(getPath(), x);
error(getPath(), x);
}
}
}
}

private void verifyLastRead() throws IOException {
if ((System.currentTimeMillis() - lastRecv) > MSG_TIMEOUT) {
throw new IOException("No message received in " + MSG_TIMEOUT + "ms");
}
}

private void verifyLastSend() throws IOException {
if ((System.currentTimeMillis() - lastSend) > MSG_TIMEOUT) {
throw new IOException("No message sent in " + MSG_TIMEOUT + "ms");
}
}

///////////////////////////////////////////////////////////////////////////
// Inner Classes
///////////////////////////////////////////////////////////////////////////
Expand All @@ -251,23 +303,25 @@ private class WriteThread extends Thread {
public void run() {
try {
while (connected) {
verifyLastRead();
synchronized (outgoingMutex) {
if (!hasSomethingToSend()) {
try {
outgoingMutex.wait(5000);
} catch (InterruptedException x) {
fine(getPath(), x);
warn(getPath(), x);
}
continue;
}
}
doSendMessage();
lastSend = System.currentTimeMillis();
}
} catch (Exception x) {
if (connected) {
connected = false;
getTransport().close();
severe(getPath(), x);
error(getPath(), x);
}
}
}
Expand Down
Loading