Permalink
Browse files

Event related updates

  • Loading branch information...
richturner committed Feb 5, 2019
1 parent 83b5cb8 commit ecbc2a174014aa8da74fd790dc815cbcc86752eb
Showing with 378 additions and 155 deletions.
  1. +20 −5 client/src/main/java/org/openremote/app/client/event/EventServiceImpl.java
  2. +5 −8 client/src/main/java/org/openremote/app/client/widget/MapWidget.java
  3. +4 −4 client/src/main/webapp/src/or-app/or-app-events.html
  4. +1 −1 manager/src/main/java/org/openremote/manager/asset/AssetProcessingService.java
  5. +13 −7 manager/src/main/java/org/openremote/manager/event/ClientEventService.java
  6. +66 −24 manager/src/main/java/org/openremote/manager/event/EventSubscriptions.java
  7. +27 −4 manager/src/main/java/org/openremote/manager/event/EventTypeConverters.java
  8. +9 −9 model/src/main/java/org/openremote/model/attribute/AttributeEvent.java
  9. +52 −0 model/src/main/java/org/openremote/model/event/TriggeredEventSubscription.java
  10. +10 −13 model/src/main/java/org/openremote/model/event/shared/CancelEventSubscription.java
  11. +21 −3 model/src/main/java/org/openremote/model/event/shared/EventSubscription.java
  12. +39 −0 model/src/main/java/org/openremote/model/event/shared/RenewEventSubscriptions.java
  13. +8 −18 model/src/main/java/org/openremote/model/event/shared/UnauthorizedEventSubscription.java
  14. +20 −4 test/src/main/groovy/org/openremote/test/ClientEventService.groovy
  15. +11 −3 test/src/test/groovy/org/openremote/test/console/ConsoleTest.groovy
  16. +3 −3 test/src/test/groovy/org/openremote/test/event/ClientEventTest.groovy
  17. +30 −24 test/src/test/groovy/org/openremote/test/notification/NotificationTest.groovy
  18. +7 −1 test/src/test/groovy/org/openremote/test/rules/residence/JsonRulesTest.groovy
  19. +32 −24 test/src/test/groovy/org/openremote/test/rules/residence/ResidenceNotifyAlarmTriggerTest.groovy
@@ -103,7 +103,7 @@ public void dispatch(SharedEvent sharedEvent) {
public void stop() {
for (Map.Entry<String, Double> entry : activeSubscriptions.entrySet()) {
DomGlobal.clearInterval(entry.getValue());
CancelEventSubscription cancellation = new CancelEventSubscription(entry.getKey());
CancelEventSubscription cancellation = new CancelEventSubscription(entry.getKey(), null);
sendServiceMessage(CancelEventSubscription.MESSAGE_PREFIX + cancelEventSubscriptionMapper.write(cancellation));
}
activeSubscriptions.clear();
@@ -119,18 +119,33 @@ protected void onServiceMessageReceived(String data) {
if (data.startsWith(UnauthorizedEventSubscription.MESSAGE_PREFIX)) {
data = data.substring(UnauthorizedEventSubscription.MESSAGE_PREFIX.length());
UnauthorizedEventSubscription failure = unauthorizedEventSubscriptionMapper.read(data);
eventBus.dispatch(new SubscriptionFailureEvent(failure.getEventType()));
eventBus.dispatch(new SubscriptionFailureEvent(failure.getSubscription().getEventType()));
} else if (data.startsWith(SharedEvent.MESSAGE_PREFIX)) {
data = data.substring(SharedEvent.MESSAGE_PREFIX.length());
if (data.startsWith("[")) {
if (data.startsWith("{")) {
SharedEvent event = sharedEventMapper.read(data);
eventBus.dispatch(event);
} else if (data.startsWith("[")) {
// Handle array of events
SharedEvent[] events = sharedEventArrayMapper.read(data);
for (SharedEvent event : events) {
eventBus.dispatch(event);
}
} else {
SharedEvent event = sharedEventMapper.read(data);
eventBus.dispatch(event);
String[] dataArr = data.split(":(.+)");
if (dataArr.length == 2) {
String subscriptionId = dataArr[0];
if (dataArr[1].startsWith("[")) {
// Handle array of events
SharedEvent[] events = sharedEventArrayMapper.read(data);
for (SharedEvent event : events) {
eventBus.dispatch(event);
}
} else {
SharedEvent event = sharedEventMapper.read(data);
eventBus.dispatch(event);
}
}
}
}
}
@@ -164,15 +164,12 @@ public void initialise(ObjectValue mapSettings, AppSecurity security, Runnable o
throw new IllegalStateException("Already initialized");
}

ObjectValue mapOptions = Values.createObject();
ObjectValue mapOptions = mapSettings.getObject("options").flatMap(opts -> opts.getObject("default")).orElse(null);
mapOptions.put("style", mapSettings.deepCopy());
mapOptions.put("minZoom", mapSettings.getNumber("minZoom").orElse(0d));
mapOptions.put("maxZoom", mapSettings.getNumber("maxZoom").orElse(22d));
if (mapSettings.getArray("maxBounds").isPresent()) {
mapOptions.put("maxBounds", mapSettings.getArray("maxBounds").get());
}
if (mapSettings.getBoolean("boxZoom").isPresent()) {
mapOptions.put("boxZoom", mapSettings.getBoolean("boxZoom").get());
mapOptions.put("minZoom", mapOptions.getNumber("minZoom").orElse(0d));
mapOptions.put("maxZoom", mapOptions.getNumber("maxZoom").orElse(22d));
if (mapOptions.getArray("bounds").isPresent()) {
mapOptions.put("maxBounds", mapOptions.getArray("bounds").get());
}
mapOptions.put("container", hostElementId);
mapOptions.put("attributionControl", true);
@@ -42,7 +42,7 @@
eventType: "attribute",
filter: {
filterType: "attribute-entity-id",
entityId: assetIds
entityIds: assetIds
}
});
}
@@ -79,10 +79,10 @@
onServiceMessageReceived(data) {
if (!data)
return;
if (data.startsWith("UNAUTHORIZED")) {
if (data.startsWith("UNAUTHORIZED:")) {
openremote.INSTANCE.setError("Unauthorized event subscription.");
} else if (data.startsWith("EVENT")) {
data = data.substring("EVENT".length);
} else if (data.startsWith("EVENT:")) {
data = data.substring("EVENT:".length);
if (data.startsWith("[")) {
// Handle array of events
let events = JSON.parse(data);
@@ -178,7 +178,7 @@ public void init(Container container) throws Exception {
boolean isRestrictedUser = identityService.getIdentityProvider().isRestrictedUser(auth.getUserId());

// Client can subscribe to several assets
for (String assetId : filter.getEntityId()) {
for (String assetId : filter.getEntityIds()) {
Asset asset = assetStorageService.find(assetId);
// If the asset doesn't exist, subscription must fail
if (asset == null)
@@ -31,10 +31,7 @@
import org.openremote.manager.concurrent.ManagerExecutorService;
import org.openremote.manager.security.ManagerIdentityService;
import org.openremote.model.Constants;
import org.openremote.model.event.shared.CancelEventSubscription;
import org.openremote.model.event.shared.EventSubscription;
import org.openremote.model.event.shared.SharedEvent;
import org.openremote.model.event.shared.UnauthorizedEventSubscription;
import org.openremote.model.event.shared.*;
import org.openremote.model.syslog.SyslogEvent;

import java.util.Collection;
@@ -49,7 +46,7 @@
* Messages always start with a message discriminator in all uppercase letters, followed
* by an optional JSON payload.
* <p>
* The following messages can be send by a client:
* The following messages can be sent by a client:
* <dl>
* <dt><code>SUBSCRIBE{...}</code><dt>
* <dd><p>
@@ -163,13 +160,14 @@ public void configure() throws Exception {
if (eventSubscriptionAuthorizers.stream()
.anyMatch(authorizer -> authorizer.apply(authContext, subscription))) {
boolean restrictedUser = identityService.getIdentityProvider().isRestrictedUser(authContext.getUserId());
eventSubscriptions.update(sessionKey, restrictedUser, subscription);
eventSubscriptions.createOrUpdate(sessionKey, restrictedUser, subscription);
sendToSession(sessionKey, subscription);
} else {
LOG.warning("Unauthorized subscription from '"
+ authContext.getUsername() + "' in realm '" + authContext.getAuthenticatedRealm()
+ "': " + subscription
);
sendToSession(sessionKey, new UnauthorizedEventSubscription(subscription.getEventType()));
sendToSession(sessionKey, new UnauthorizedEventSubscription(subscription));
}
})
.when(bodyAs(String.class).startsWith(CancelEventSubscription.MESSAGE_PREFIX))
@@ -178,6 +176,14 @@ public void configure() throws Exception {
String sessionKey = getSessionKey(exchange);
eventSubscriptions.cancel(sessionKey, exchange.getIn().getBody(CancelEventSubscription.class));
})
.when(bodyAs(String.class).startsWith(RenewEventSubscriptions.MESSAGE_PREFIX))
.convertBodyTo(RenewEventSubscriptions.class)
.process(exchange -> {
String sessionKey = getSessionKey(exchange);
AuthContext authContext = exchange.getIn().getHeader(Constants.AUTH_CONTEXT, AuthContext.class);
boolean restrictedUser = identityService.getIdentityProvider().isRestrictedUser(authContext.getUserId());
eventSubscriptions.update(sessionKey, restrictedUser,exchange.getIn().getBody(RenewEventSubscriptions.class).getSubscriptionIds());
})
.when(bodyAs(String.class).startsWith(SharedEvent.MESSAGE_PREFIX))
.convertBodyTo(SharedEvent.class)
.process(exchange -> {
@@ -23,11 +23,14 @@
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultMessage;
import org.openremote.container.timer.TimerService;
import org.openremote.container.util.UniqueIdentifierGenerator;
import org.openremote.container.web.socket.WebsocketConstants;
import org.openremote.manager.concurrent.ManagerExecutorService;
import org.openremote.model.event.TriggeredEventSubscription;
import org.openremote.model.event.shared.CancelEventSubscription;
import org.openremote.model.event.shared.EventSubscription;
import org.openremote.model.event.shared.SharedEvent;
import org.openremote.model.util.TextUtil;

import java.util.*;
import java.util.logging.Logger;
@@ -42,7 +45,7 @@
private static final Logger LOG = Logger.getLogger(EventSubscriptions.class.getName());

final protected TimerService timerService;
final protected Map<String, SessionSubscriptions> sessionSubscriptions = new HashMap<>();
final protected Map<String, SessionSubscriptions> sessionSubscriptionIdMap = new HashMap<>();

class SessionSubscriptions extends HashSet<SessionSubscription> {
public void removeExpired() {
@@ -56,25 +59,47 @@ public void removeExpired() {
);
}

public void update(boolean restrictedUser, EventSubscription eventSubscription) {
cancel(eventSubscription.getEventType());
public void createOrUpdate(boolean restrictedUser, EventSubscription eventSubscription) {

if (TextUtil.isNullOrEmpty(eventSubscription.getSubscriptionId())) {
cancelByType(eventSubscription.getEventType());
} else {
cancelById(eventSubscription.getSubscriptionId());
}

add(new SessionSubscription(restrictedUser, timerService.getCurrentTimeMillis(), eventSubscription));
}

public void cancel(String eventType) {
public void update(boolean resstrictedUser, String[] subscriptionIds) {
List<String> ids = Arrays.asList(subscriptionIds);
forEach(sessionSubscription -> {
if (ids.contains(sessionSubscription.subscriptionId)) {
sessionSubscription.restrictedUser = resstrictedUser;
sessionSubscription.timestamp = timerService.getCurrentTimeMillis();
}
});
}

public void cancelByType(String eventType) {
removeIf(sessionSubscription -> sessionSubscription.subscription.getEventType().equals(eventType));
}

public void cancelById(String subscriptionId) {
removeIf(sessionSubscription -> sessionSubscription.subscription.getSubscriptionId().equals(subscriptionId));
}
}

class SessionSubscription {
final boolean restrictedUser;
final long timestamp;
boolean restrictedUser;
long timestamp;
final EventSubscription subscription;
final String subscriptionId;

public SessionSubscription(boolean restrictedUser, long timestamp, EventSubscription subscription) {
this.restrictedUser = restrictedUser;
this.timestamp = timestamp;
this.subscription = subscription;
this.subscriptionId = subscription.getSubscriptionId();
}

public boolean matches(boolean accessibleForRestrictedUsers, SharedEvent event) {
@@ -94,39 +119,53 @@ public EventSubscriptions(TimerService timerService, ManagerExecutorService exec
LOG.info("Starting background task checking for expired event subscriptions from clients");
this.timerService = timerService;
executorService.scheduleAtFixedRate(() -> {
synchronized (this.sessionSubscriptions) {
for (SessionSubscriptions subscriptions : sessionSubscriptions.values()) {
synchronized (this.sessionSubscriptionIdMap) {
for (SessionSubscriptions subscriptions : sessionSubscriptionIdMap.values()) {
subscriptions.removeExpired();
}
}
}, 5000, 1000);
}

public void update(String sessionKey, boolean restrictedUser, EventSubscription subscription) {
synchronized (this.sessionSubscriptions) {
public void createOrUpdate(String sessionKey, boolean restrictedUser, EventSubscription subscription) {
synchronized (this.sessionSubscriptionIdMap) {
// TODO Check if the user can actually subscribe to the events it wants, how do we do that?
LOG.fine("For session '" + sessionKey + "', updating: " + subscription);
LOG.fine("For session '" + sessionKey + "', creating/updating: " + subscription);
SessionSubscriptions sessionSubscriptions =
this.sessionSubscriptions.computeIfAbsent(sessionKey, k -> new SessionSubscriptions());
sessionSubscriptions.update(restrictedUser, subscription);
this.sessionSubscriptionIdMap.computeIfAbsent(sessionKey, k -> new SessionSubscriptions());
sessionSubscriptions.createOrUpdate(restrictedUser, subscription);
}
}

public void update(String sessionKey, boolean restrictedUser, String[] subscriptionIds) {
synchronized (this.sessionSubscriptionIdMap) {
SessionSubscriptions sessionSubscriptions = this.sessionSubscriptionIdMap.get(sessionKey);
if (sessionSubscriptions != null) {
LOG.fine("For session '" + sessionKey + "', updating: " + Arrays.toString(subscriptionIds));
sessionSubscriptions.update(restrictedUser, subscriptionIds);
}
}
}

public void cancel(String sessionKey, CancelEventSubscription subscription) {
synchronized (this.sessionSubscriptions) {
if (!this.sessionSubscriptions.containsKey(sessionKey))
synchronized (this.sessionSubscriptionIdMap) {
if (!this.sessionSubscriptionIdMap.containsKey(sessionKey))
return;
LOG.fine("For session '" + sessionKey + "', cancelling: " + subscription);
SessionSubscriptions sessionSubscriptions = this.sessionSubscriptions.get(sessionKey);
sessionSubscriptions.cancel(subscription.getEventType());
SessionSubscriptions sessionSubscriptions = this.sessionSubscriptionIdMap.get(sessionKey);
if (TextUtil.isNullOrEmpty(subscription.getSubscriptionId())) {
sessionSubscriptions.cancelByType(subscription.getEventType());
} else {
sessionSubscriptions.cancelById(subscription.getSubscriptionId());
}
}
}

public void cancelAll(String sessionKey) {
synchronized (this.sessionSubscriptions) {
if (this.sessionSubscriptions.containsKey(sessionKey)) {
synchronized (this.sessionSubscriptionIdMap) {
if (this.sessionSubscriptionIdMap.containsKey(sessionKey)) {
LOG.fine("Cancelling all subscriptions for session: " + sessionKey);
this.sessionSubscriptions.remove(sessionKey);
this.sessionSubscriptionIdMap.remove(sessionKey);
}
}
}
@@ -141,8 +180,8 @@ public void cancelAll(String sessionKey) {
boolean accessibleForRestrictedUsers = exchange.getIn().getHeader(HEADER_ACCESS_RESTRICTED, false, Boolean.class);

Set<Map.Entry<String, SessionSubscriptions>> sessionSubscriptionsSet;
synchronized (this.sessionSubscriptions) {
sessionSubscriptionsSet = new HashSet<>(sessionSubscriptions.entrySet());
synchronized (this.sessionSubscriptionIdMap) {
sessionSubscriptionsSet = new HashSet<>(sessionSubscriptionIdMap.entrySet());
}

for (Map.Entry<String, SessionSubscriptions> entry : sessionSubscriptionsSet) {
@@ -157,14 +196,17 @@ public void cancelAll(String sessionKey) {
if (sessionSubscription.subscription.getFilter() == null
|| sessionSubscription.subscription.getFilter().apply(event)) {
LOG.fine("Creating message for subscribed session '" + sessionKey + "': " + event);

TriggeredEventSubscription<SharedEvent> triggeredEventSubscription = new TriggeredEventSubscription<>(event, sessionSubscription.subscriptionId);

if (sessionSubscription.subscription.getInternalConsumer() == null) {
Message msg = new DefaultMessage();
msg.setBody(event); // Don't copy the event, use same reference
msg.setBody(triggeredEventSubscription); // Don't copy the event, use same reference
msg.setHeaders(new HashMap<>(exchange.getIn().getHeaders())); // Copy headers
msg.setHeader(WebsocketConstants.SESSION_KEY, sessionKey);
messageList.add(msg);
} else {
sessionSubscription.subscription.getInternalConsumer().accept(event);
sessionSubscription.subscription.getInternalConsumer().accept(triggeredEventSubscription);
}
}
}
Oops, something went wrong.

0 comments on commit ecbc2a1

Please sign in to comment.