Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apply plugin: 'java'
apply plugin: 'maven'

group 'org.iot.dsa'
version '0.19.0'
version '0.20.0'

sourceCompatibility = 1.6
targetCompatibility = 1.6
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.acuity.iot.dsa.dslink.io;

import com.acuity.iot.dsa.dslink.transport.DSBinaryTransport;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
Expand Down Expand Up @@ -269,21 +268,14 @@ public DSByteBuffer put(int dest, byte[] msg, int off, int len) {
}

public int put(InputStream in, int len) {
int count = 0;
growBuffer(offset + len);
try {
int ch;
while (count < len) {
ch = in.read();
if (ch < 0) {
return count;
}
put((byte) ch);
count++;
}
} catch (IOException x) {
len = in.read(buffer, offset, len);
} catch (Exception x) {
DSException.throwRuntime(x);
}
return count;
length += len;
return len;
}

/**
Expand Down Expand Up @@ -554,6 +546,19 @@ public void sendTo(DSBinaryTransport transport, boolean isLast) {
length = 0;
}

/**
* Push bytes from the internal buffer to the given.
*/
public void sendTo(DSByteBuffer buf, int len) {
buf.put(buffer, offset, len);
length -= len;
if (length == 0) {
offset = 0;
} else {
offset += len;
}
}

/**
* Push bytes from the internal buffer to the stream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.iot.dsa.dslink.requester.ErrorType;
import org.iot.dsa.dslink.requester.OutboundRequestHandler;
import org.iot.dsa.dslink.requester.OutboundStream;
import org.iot.dsa.node.DSElement;
import org.iot.dsa.node.DSMap;

/**
Expand Down Expand Up @@ -79,49 +78,6 @@ public void handleClose() {
getRequester().removeRequest(getRequestId());
}

public void handleError(DSElement details) {
if (!open) {
return;
}
try {
ErrorType type = ErrorType.internalError;
String msg;
if (details.isMap()) {
String detail = null;
DSMap map = details.toMap();
String tmp = map.getString("type");
if (tmp.equals("permissionDenied")) {
type = ErrorType.permissionDenied;
} else if (tmp.equals("invalidRequest")) {
type = ErrorType.badRequest;
} else if (tmp.equals("invalidPath")) {
type = ErrorType.badRequest;
} else if (tmp.equals("notSupported")) {
type = ErrorType.notSupported;
} else {
type = ErrorType.internalError;
}
msg = map.getString("msg");
detail = map.getString("detail");
if (msg == null) {
msg = detail;
}
if (msg == null) {
msg = details.toString();
}
} else {
type = ErrorType.internalError;
msg = details.toString();
}
if (msg == null) {
msg = "";
}
getHandler().onError(type, msg);
} catch (Exception x) {
getRequester().error(getRequester().getPath(), x);
}
}

public void handleError(ErrorType type, String message) {
if (!open) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,6 @@ void add(DSOutboundSubscribeStub stub) {
}
}

public void close() {
//TODO who calls this and for what purpose
/*
DSOutboundSubscribeStub cur = first;
while (cur != null) {
cur.close();
cur = cur.getNext();
}
*/
}

private boolean contains(DSOutboundSubscribeStub stub) {
if (stub == first) {
return true;
Expand Down Expand Up @@ -114,6 +103,14 @@ public boolean hasSid() {
return sid != null;
}

public void onDisconnect() {
DSOutboundSubscribeStub cur = first;
while (cur != null) {
cur.closeStream();
cur = cur.getNext();
}
}

/**
* Null if the arg is the first in the list, last if stub is not contained.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ public void onConnectFail() {
}

public void onDisconnect() {
for (DSOutboundSubscribeStubs stubs : pendingSubscribe) {
stubs.onDisconnect();
}
pendingSubscribe.clear();
pendingUnsubscribe.clear();
for (DSOutboundSubscribeStubs stubs : pathMap.values()) {
stubs.onDisconnect();
}
sidMap.clear();
pathMap.clear();
}

public void handleUpdate(int sid, String ts, String sts, DSElement value) {
Expand Down Expand Up @@ -217,8 +227,7 @@ public void write(MessageWriter writer) {
if (!pendingSubscribe.isEmpty()) {
doBeginSubscribe(writer);
Iterator<DSOutboundSubscribeStubs> it = pendingSubscribe.iterator();
//while (it.hasNext() && !session.shouldEndMessage()) {
while (it.hasNext()) { //todo
while (it.hasNext() && !session.shouldEndMessage()) {
DSOutboundSubscribeStubs stubs = it.next();
if (!stubs.hasSid()) {
synchronized (pathMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import com.acuity.iot.dsa.dslink.protocol.DSSession;
import com.acuity.iot.dsa.dslink.protocol.message.OutboundMessage;
import com.acuity.iot.dsa.dslink.transport.DSTransport;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.iot.dsa.dslink.DSIRequester;
Expand All @@ -28,8 +30,7 @@ public abstract class DSRequester extends DSNode implements DSIRequester {

private AtomicInteger nextRid = new AtomicInteger();
private DSSession session;
private Map<Integer, DSOutboundStub> requests =
new ConcurrentHashMap<Integer, DSOutboundStub>();
private Map<Integer, DSOutboundStub> requests = new ConcurrentHashMap<Integer, DSOutboundStub>();
private DSOutboundSubscriptions subscriptions = makeSubscriptions();

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -120,7 +121,6 @@ protected DSOutboundSetStub makeSet(String path, DSIValue value, OutboundRequest

public void onConnect() {
subscriptions.onConnect();
session.setRequesterAllowed();
}

public void onConnectFail() {
Expand All @@ -129,6 +129,17 @@ public void onConnectFail() {

public void onDisconnect() {
subscriptions.onDisconnect();
Iterator<Entry<Integer, DSOutboundStub>> it = requests.entrySet().iterator();
Map.Entry<Integer, DSOutboundStub> me;
while (it.hasNext()) {
me = it.next();
try {
me.getValue().getHandler().onClose();
} catch (Exception x) {
error(getPath(), x);
}
it.remove();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Iterator;
import org.iot.dsa.DSRuntime;
import org.iot.dsa.dslink.DSIResponder;
import org.iot.dsa.dslink.DSPermissionException;
import org.iot.dsa.dslink.DSRequestException;
import org.iot.dsa.dslink.responder.InboundInvokeRequest;
import org.iot.dsa.io.DSIWriter;
Expand Down Expand Up @@ -79,7 +80,7 @@ public void close() {
}
state = STATE_CLOSE_PENDING;
enqueueResponse();
fine(fine() ? getPath() + " list closed locally" : null);
fine(fine() ? getPath() + " invoke closed locally" : null);
}

@Override
Expand All @@ -90,7 +91,7 @@ public void close(Exception reason) {
closeReason = reason;
state = STATE_CLOSE_PENDING;
enqueueResponse();
fine(fine() ? getPath() + " list closed locally" : null);
fine(fine() ? getPath() + " invoke closed locally" : null);
}

private synchronized Update dequeueUpdate() {
Expand Down Expand Up @@ -236,7 +237,19 @@ public void run() {
if (!info.isAction()) {
throw new DSRequestException("Not an action " + path.getPath());
}
//TODO verify incoming permission
if (info.isAdmin()) {
if (!permission.isConfig()) {
throw new DSPermissionException("Config permission required");
}
} else if (!info.isReadOnly()) {
if (DSPermission.WRITE.isGreaterThan(permission)) {
throw new DSPermissionException("Write permission required");
}
} else {
if (DSPermission.READ.isGreaterThan(permission)) {
throw new DSPermissionException("Read permission required");
}
}
DSAction action = info.getAction();
result = action.invoke(info, this);
} catch (Exception x) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.iot.dsa.node.event.DSISubscriber;
import org.iot.dsa.node.event.DSInfoTopic;
import org.iot.dsa.node.event.DSTopic;
import org.iot.dsa.security.DSPermission;

/**
* List implementation for a responder.
Expand Down Expand Up @@ -219,7 +220,11 @@ protected void encodeChild(ApiObject child, MessageWriter writer) {
if (e != null) {
map.put("$invokable", e);
} else {
map.put("$invokable", action.getPermission().toString());
if (child.isAdmin()) {
map.put("$invokable", DSPermission.CONFIG.toString());
} else if (!child.isReadOnly()) {
map.put("$invokable", DSPermission.WRITE.toString());
}
}
} else if (child.isValue()) {
e = cacheMap.remove("$type");
Expand Down Expand Up @@ -301,7 +306,11 @@ private void encodeTargetAction(ApiObject object, MessageWriter writer) {
}
DSElement e = cacheMap.remove("$invokable");
if (e == null) {
encode("$invokable", action.getPermission().toString(), writer);
if (object.isAdmin()) {
encode("$invokable", DSPermission.CONFIG.toString(), writer);
} else if (!object.isReadOnly()) {
encode("$invokable", DSPermission.WRITE.toString(), writer);
}
} else {
encode("$invokable", e, writer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.acuity.iot.dsa.dslink.protocol.message.RequestPath;
import org.iot.dsa.dslink.DSIResponder;
import org.iot.dsa.dslink.DSPermissionException;
import org.iot.dsa.dslink.DSRequestException;
import org.iot.dsa.dslink.responder.InboundSetRequest;
import org.iot.dsa.node.DSElement;
Expand Down Expand Up @@ -44,7 +45,13 @@ public void run() {
if (info.isReadOnly()) {
throw new DSRequestException("Not writable: " + getPath());
}
//TODO verify incoming permission
if (!permission.isConfig()) {
if (info.isAdmin()) {
throw new DSPermissionException("Config permission required");
} else if (DSPermission.WRITE.isGreaterThan(permission)) {
throw new DSPermissionException("Write permission required");
}
}
if (info.isNode()) {
info.getNode().onSet(value);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ public DSInboundSubscriptions(DSResponder responder) {
// Methods in alphabetical order
///////////////////////////////////////////////////////////////////////////

/**
* Unsubscribes all.
*/
public void close() {
for (Integer i : sidMap.keySet()) {
unsubscribe(i);
}
}

/**
* Add to the outbound queue if not already enqueued.
*/
Expand Down Expand Up @@ -89,6 +80,21 @@ protected DSInboundSubscription makeSubscription(Integer sid, String path, int q
return new DSInboundSubscription(this, sid, path, qos);
}

public void onConnect() {
}

public void onConnectFail() {
}

/**
* Unsubscribes all.
*/
public void onDisconnect() {
for (Integer i : sidMap.keySet()) {
unsubscribe(i);
}
}

/**
* Create or update a subscription.
*/
Expand Down
Loading