Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
[![](https://jitpack.io/v/iot-dsa-v2/sdk-dslink-java-v2.svg)](https://jitpack.io/#iot-dsa-v2/sdk-dslink-java-v2)

* [Developer Guide](https://iot-dsa-v2.github.io/sdk-dslink-java-v2/)
* [Javadoc](https://iot-dsa-v2.github.io/sdk-dslink-java-v2/javadoc/)
* [Javadoc](https://jitpack.io/com/github/iot-dsa-v2/sdk-dslink-java-v2/dslink-v2/master-SNAPSHOT/javadoc/)
* JDK 1.6+


Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ subprojects {
apply plugin: 'maven'

group 'org.iot-dsa'
version '0.32.0'
version '0.34.0'

sourceCompatibility = 1.6
targetCompatibility = 1.6
Expand All @@ -26,5 +26,5 @@ subprojects {
}

task wrapper(type: Wrapper) {
gradleVersion = '4.8.1'
gradleVersion = '4.9'
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public DSTransport close() {
return this;
}
open = false;
debug(debug() ? "WsTextTransport.close()" : null, new Exception());
debug(debug() ? "WsTextTransport.close()" : null);
try {
if (session != null) {
session.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ public synchronized void put(char[] msg, int off, int len) {
offset = 0;
}
}
//System.arraycopy(msg, off, buffer, length + offset, len);
System.arraycopy(msg, off, buffer, offset, len);
length += len;
notifyAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import com.acuity.iot.dsa.dslink.protocol.responder.DSResponder;
import com.acuity.iot.dsa.dslink.transport.DSTransport;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.iot.dsa.conn.DSConnection;
import org.iot.dsa.conn.DSIConnected;
import org.iot.dsa.dslink.DSIRequester;
Expand Down Expand Up @@ -49,8 +48,8 @@ public abstract class DSSession extends DSNode implements DSIConnected {
private int messageId = 0;
private int nextMessage = 1;
private final Object outgoingMutex = new Object();
private List<OutboundMessage> outgoingRequests = new LinkedList<OutboundMessage>();
private List<OutboundMessage> outgoingResponses = new LinkedList<OutboundMessage>();
private ConcurrentLinkedQueue<OutboundMessage> outgoingRequests = new ConcurrentLinkedQueue<OutboundMessage>();
private ConcurrentLinkedQueue<OutboundMessage> outgoingResponses = new ConcurrentLinkedQueue<OutboundMessage>();
private DSInfo requesterAllowed = getInfo(REQUESTER_ALLOWED);
private ReadThread readThread;
private WriteThread writeThread;
Expand Down Expand Up @@ -78,10 +77,8 @@ public void enqueueOutgoingRequest(OutboundMessage arg) {
if (!isRequesterAllowed()) {
throw new IllegalStateException("Requester not allowed");
}
synchronized (outgoingMutex) {
outgoingRequests.add(arg);
outgoingMutex.notify();
}
outgoingRequests.add(arg);
notifyOutgoing();
}
}

Expand All @@ -90,10 +87,8 @@ public void enqueueOutgoingRequest(OutboundMessage arg) {
*/
public void enqueueOutgoingResponse(OutboundMessage arg) {
if (connected) {
synchronized (outgoingMutex) {
outgoingResponses.add(arg);
outgoingMutex.notify();
}
outgoingResponses.add(arg);
notifyOutgoing();
}
}

Expand Down Expand Up @@ -175,24 +170,14 @@ protected void declareDefaults() {
* Can return null.
*/
protected OutboundMessage dequeueOutgoingRequest() {
synchronized (outgoingMutex) {
if (!outgoingRequests.isEmpty()) {
return outgoingRequests.remove(0);
}
}
return null;
return outgoingRequests.poll();
}

/**
* Can return null.
*/
protected OutboundMessage dequeueOutgoingResponse() {
synchronized (outgoingMutex) {
if (!outgoingResponses.isEmpty()) {
return outgoingResponses.remove(0);
}
}
return null;
return outgoingResponses.poll();
}

/**
Expand Down Expand Up @@ -240,20 +225,25 @@ protected boolean hasAckToSend() {
* Override point, this returns the result of hasMessagesToSend.
*/
protected boolean hasSomethingToSend() {
if (ackToSend >= 0) {
return true;
}
if (hasPingToSend()) {
if (hasAckToSend() || hasPingToSend()) {
return true;
}
if (waitingForAcks()) {
return false;
}
if (!outgoingResponses.isEmpty()) {
return true;
for (OutboundMessage msg : outgoingResponses) {
if (msg.canWrite(this)) {
return true;
}
}
}
if (!outgoingRequests.isEmpty()) {
return true;
for (OutboundMessage msg : outgoingRequests) {
if (msg.canWrite(this)) {
return true;
}
}
}
return false;
}
Expand Down Expand Up @@ -282,25 +272,19 @@ protected void onConnected() {
connected = true;
lastTimeRecv = lastTimeSend = System.currentTimeMillis();
readThread = new ReadThread(getConnection().getLink().getLinkName() + " Reader");
writeThread = new WriteThread(getConnection().getLink().getLinkName() + " Writer");
readThread.start();
Thread.yield();
writeThread = new WriteThread(getConnection().getLink().getLinkName() + " Writer");
writeThread.start();
}

/**
* Clear the outgoing queues and waits for the the read and write threads to exit.
*/
protected void onDisconnected() {
synchronized (outgoingMutex) {
outgoingRequests.clear();
outgoingResponses.clear();
outgoingMutex.notifyAll();
}
try {
writeThread.join();
} catch (Exception x) {
debug(getPath(), x);
}
outgoingRequests.clear();
outgoingResponses.clear();
notifyOutgoing();
try {
readThread.join();
} catch (Exception x) {
Expand All @@ -319,18 +303,21 @@ protected void onDisconnecting() {
}
connected = false;
notifyOutgoing();
try {
writeThread.join();
} catch (Exception x) {
debug(getPath(), x);
}
//Attempt to exit cleanly, try to get acks for sent messages.
waitForAcks(1000);
}

protected void requeueOutgoingRequest(OutboundMessage arg) {
synchronized (outgoingMutex) {
outgoingRequests.add(arg);
}
outgoingRequests.add(arg);
}

protected void requeueOutgoingResponse(OutboundMessage arg) {
synchronized (outgoingMutex) {
outgoingResponses.add(arg);
}
outgoingResponses.add(arg);
}

/**
Expand Down Expand Up @@ -385,6 +372,26 @@ private void verifyLastSend() throws IOException {
}
}

/* Try to exit cleanly, wait for all acks for sent messages. */
private void waitForAcks(long timeout) {
long start = System.currentTimeMillis();
synchronized (outgoingMutex) {
while (getMissingAcks() > 0) {
try {
outgoingMutex.wait(500);
} catch (InterruptedException x) {
warn(getPath(), x);
}
if ((System.currentTimeMillis() - start) > timeout) {
debug(debug() ? String
.format("waitForAcks timeout (%s / %s)", ackRcvd, messageId)
: null);
break;
}
}
}
}

///////////////////////////////////////////////////////////////////////////
// Inner Classes
///////////////////////////////////////////////////////////////////////////
Expand All @@ -400,6 +407,7 @@ private class ReadThread extends Thread {
}

public void run() {
debug("Enter DSSession.ReadThread");
DSLinkConnection conn = getConnection();
try {
while (connected) {
Expand All @@ -415,6 +423,7 @@ public void run() {
conn.connDown(DSException.makeMessage(x));
}
}
debug("Exit DSSession.ReadThread");
}
}

Expand All @@ -430,6 +439,7 @@ private class WriteThread extends Thread {

public void run() {
DSLinkConnection conn = getConnection();
debug("Enter DSSession.WriteThread");
try {
while (connected) {
verifyLastRead();
Expand All @@ -454,6 +464,7 @@ public void run() {
conn.connDown(DSException.makeMessage(x));
}
}
debug("Exit DSSession.WriteThread");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public void handleUpdate(int sid, String ts, String sts, DSElement value) {
@Override
public void write(DSSession session, MessageWriter writer) {
if (!pendingSubscribe.isEmpty()) {
debug(debug() ? "Sending subscribe requests" : null);
doBeginSubscribe(writer);
Iterator<DSOutboundSubscribeStubs> it = pendingSubscribe.iterator();
while (it.hasNext() && !session.shouldEndMessage()) {
Expand All @@ -114,6 +115,7 @@ public void write(DSSession session, MessageWriter writer) {
doEndMessage(writer);
}
if (!pendingUnsubscribe.isEmpty() && !session.shouldEndMessage()) {
debug(debug() ? "Sending unsubscribe requests" : null);
doBeginUnsubscribe(writer);
Iterator<DSOutboundSubscribeStubs> it = pendingUnsubscribe.iterator();
while (it.hasNext() && !session.shouldEndMessage()) {
Expand All @@ -129,9 +131,7 @@ public void write(DSSession session, MessageWriter writer) {
}
doEndMessage(writer);
}
synchronized (this) {
enqueued = false;
}
enqueued = false;
if (!pendingSubscribe.isEmpty() || !pendingUnsubscribe.isEmpty()) {
sendMessage();
}
Expand Down Expand Up @@ -208,6 +208,7 @@ protected void onDisconnected() {
}
sidMap.clear();
pathMap.clear();
enqueued = false;
}

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -239,6 +240,7 @@ private void sendMessage() {
* Create or update a subscription.
*/
OutboundSubscribeHandler subscribe(String path, int qos, OutboundSubscribeHandler req) {
trace(trace() ? String.format("Subscribe (qos=%s) %s", qos, path) : null);
DSOutboundSubscribeStub stub = new DSOutboundSubscribeStub(path, qos, req);
DSOutboundSubscribeStubs stubs = null;
synchronized (pathMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,19 @@ protected void doRecvMessage() throws IOException {
@Override
protected void doSendMessage() {
try {
beginMessage();
if (!waitingForAcks()) {
requestsNext = !requestsNext;
beginMessage();
send(requestsNext);
if (!shouldEndMessage()) {
send(!requestsNext);
}
endMessage();
lastMessageSent = System.currentTimeMillis();
if (requestsBegun || responsesBegun) {
setAckRequired();
}
}
endMessage();
} finally {
requestsBegun = false;
responsesBegun = false;
Expand Down Expand Up @@ -250,11 +250,9 @@ private DSIWriter getWriter() {
* Decomposes and processes a complete envelope which can contain multiple requests and
* responses.
*
* @param reader lastRun() will return BEGIN_MAP
* @param reader last() must return BEGIN_MAP
*/
private void processEnvelope(DSIReader reader) {
int msg = -1;
Token next;
switch (reader.next()) {
case END_MAP:
return;
Expand All @@ -263,6 +261,8 @@ private void processEnvelope(DSIReader reader) {
default:
throw new IllegalStateException("Poorly formatted request");
}
int msg = -1;
Token next;
boolean sendAck = false;
do {
String key = reader.getString();
Expand Down
Loading