Permalink
Browse files

Implemented thing attribute value updates from sensor messages

  • Loading branch information...
1 parent 4a7726c commit 7591527f970e3349334b0faa81279e843eb0421b @christianbauer christianbauer committed Dec 21, 2016
@@ -118,7 +118,7 @@ public void unlinkAttributes(String thingId) throws Exception {
while (entryIterator.hasNext()) {
Map.Entry<AttributeRef, ThingAttribute> entry = entryIterator.next();
if (entry.getKey().getEntityId().equals(thingId)) {
- LOG.fine("Attribute moved on '" + getProtocolName() + "': " + entry.getValue());
+ LOG.fine("Attribute removed on '" + getProtocolName() + "': " + entry.getValue());
onAttributeRemoved(entry.getValue());
entryIterator.remove();
}
@@ -20,24 +20,23 @@
package org.openremote.manager.server.agent;
import org.apache.camel.builder.RouteBuilder;
-import org.openremote.model.asset.AgentAttributes;
-import org.openremote.model.asset.ThingAttribute;
-import org.openremote.model.asset.ThingAttributes;
import org.openremote.agent3.protocol.Protocol;
import org.openremote.container.Container;
import org.openremote.container.ContainerService;
import org.openremote.container.message.MessageBrokerContext;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.persistence.PersistenceEvent;
import org.openremote.manager.server.asset.AssetService;
-import org.openremote.model.asset.Asset;
+import org.openremote.model.AttributeValueChange;
+import org.openremote.model.asset.*;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
+import static org.openremote.agent3.protocol.Protocol.SENSOR_TOPIC;
import static org.openremote.container.persistence.PersistenceEvent.PERSISTENCE_EVENT_TOPIC;
import static org.openremote.manager.server.asset.AssetPredicates.isPersistenceEventForAssetType;
import static org.openremote.manager.server.asset.AssetPredicates.isPersistenceEventForEntityType;
@@ -82,6 +81,7 @@ public void stop(Container container) throws Exception {
@Override
public void configure() throws Exception {
+
// If any agent or thing was modified in the database, deploy the changes
from(PERSISTENCE_EVENT_TOPIC)
.filter(isPersistenceEventForEntityType(Asset.class))
@@ -94,6 +94,20 @@ public void configure() throws Exception {
deployThing(asset, persistenceEvent.getCause());
}
});
+
+ // Update thing asset when attribute change value messages are published on the sensor topic
+ from(SENSOR_TOPIC)
+ .filter(body().isInstanceOf(AttributeValueChange.class))
+ .process(exchange -> {
+ AttributeValueChange attributeValueChange =
+ exchange.getIn().getBody(AttributeValueChange.class);
+ // Note that this is a _direct_ update of the attribute value in the database, it will
+ // not trigger a persistence event - we don't want to redeploy a thing just because an
+ // attribute value changed!
+ boolean success = assetService.updateThingAttributeValue(attributeValueChange);
+ // TODO If success then... notify asset listener clients? If not, then handle error?
+
+ });
}
protected void deployAgent(Asset agent, PersistenceEvent.Cause cause) {
@@ -117,16 +131,7 @@ protected void deployThing(Asset thing, PersistenceEvent.Cause cause) {
// Linked attributes have a reference to an agent, and a protocol configuration attribute of that agent
Map<String, List<ThingAttribute>> linkedAttributes = thingAttributes.getLinkedAttributes(
- agentLink -> {
- // Resolve the agent and the protocol configuration
- // TODO This is very inefficient and requires Hibernate second-level caching
- Asset agent = assetService.get(agentLink.getEntityId());
- if (agent != null && agent.getWellKnownType().equals(AGENT)) {
- AgentAttributes agentAttributes = new AgentAttributes(agent);
- return agentAttributes.getProtocolConfiguration(agentLink.getAttributeName());
- }
- return null;
- }
+ assetService.getAgentLinkResolver()
);
LOG.fine("Thing has attribute links to " + linkedAttributes.size() + " protocol(s): " + thing);
@@ -19,36 +19,56 @@
*/
package org.openremote.manager.server.asset;
+import elemental.json.JsonValue;
import org.apache.camel.builder.RouteBuilder;
import org.hibernate.Session;
+import org.openremote.Function;
import org.openremote.container.Container;
import org.openremote.container.ContainerService;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.persistence.PersistenceEvent;
import org.openremote.container.persistence.PersistenceService;
import org.openremote.container.web.WebService;
import org.openremote.manager.server.event.EventService;
-import org.openremote.manager.shared.asset.*;
-import org.openremote.model.asset.Asset;
-import org.openremote.model.asset.AssetInfo;
-import org.openremote.model.asset.AssetType;
+import org.openremote.manager.shared.asset.SubscribeAssetModified;
+import org.openremote.manager.shared.asset.UnsubscribeAssetModified;
+import org.openremote.model.AttributeRef;
+import org.openremote.model.AttributeValueChange;
+import org.openremote.model.asset.*;
+import org.postgresql.util.PGobject;
import javax.persistence.EntityManager;
+import java.sql.Array;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.List;
+import java.util.logging.Logger;
import static org.openremote.container.persistence.PersistenceEvent.PERSISTENCE_EVENT_TOPIC;
import static org.openremote.manager.server.asset.AssetPredicates.isPersistenceEventForEntityType;
import static org.openremote.manager.server.event.EventPredicates.isEventType;
+import static org.openremote.model.asset.AssetType.AGENT;
+import static org.openremote.model.asset.AssetType.THING;
public class AssetService extends RouteBuilder implements ContainerService {
+ private static final Logger LOG = Logger.getLogger(AssetService.class.getName());
+
protected MessageBrokerService messageBrokerService;
protected EventService eventService;
protected PersistenceService persistenceService;
protected AssetListenerSubscriptions assetListenerSubscriptions;
+ final protected Function<AttributeRef, ProtocolConfiguration> agentLinkResolver = agentLink -> {
+ // Resolve the agent and the protocol configuration
+ // TODO This is very inefficient and requires Hibernate second-level caching
+ Asset agent = get(agentLink.getEntityId());
+ if (agent != null && agent.getWellKnownType().equals(AGENT)) {
+ AgentAttributes agentAttributes = new AgentAttributes(agent);
+ return agentAttributes.getProtocolConfiguration(agentLink.getAttributeName());
+ }
+ return null;
+ };
@Override
public void init(Container container) throws Exception {
@@ -113,6 +133,10 @@ public void configure() throws Exception {
});
}
+ public Function<AttributeRef, ProtocolConfiguration> getAgentLinkResolver() {
+ return agentLinkResolver;
+ }
+
public AssetInfo[] getRoot(String realm) {
if (realm == null || realm.length() == 0)
throw new IllegalArgumentException("Realm must be provided to query assets");
@@ -220,6 +244,38 @@ public void deleteChildren(String parentId) {
}
*/
+ public boolean updateThingAttributeValue(AttributeValueChange attributeValueChange) {
+ // TODO: More fine-grained return to distinguish failures ("wrong value type" is not the same as "not found")
+
+ AttributeRef attributeRef = attributeValueChange.getAttributeRef();
+ ServerAsset thing = get(attributeRef.getEntityId());
+ if (thing == null) {
+ LOG.fine("Ignoring attribute update for unknown asset: " + attributeValueChange);
+ return false;
+ }
+ if (thing.getWellKnownType() != THING) {
+ LOG.fine("Ignoring attribute update '" + attributeValueChange + "' for non-thing asset: " + thing);
+ return false;
+ }
+ ThingAttributes thingAttributes = new ThingAttributes(thing);
+ ThingAttribute thingAttribute = thingAttributes.getLinkedAttribute(
+ agentLinkResolver, attributeRef.getAttributeName()
+ );
+ if (thingAttribute == null) {
+ LOG.fine("Ignoring attribute update '" + attributeValueChange + "' for unknown/unlinked attribute: " + thing);
+ return false;
+ }
+
+ if (thingAttribute.getType().getJsonType() != attributeValueChange.getValue().getType()) {
+ LOG.fine("Ignoring attribute update '" + attributeValueChange + "', wrong value type '" + attributeValueChange.getValue().getType() + "': " + thing);
+ return false;
+ }
+ LOG.fine("Applying attribute update '" + attributeValueChange + "' on: " + thing);
+ thingAttribute.setValue(attributeValueChange.getValue());
+
+ return updateAttributeValue(thing.getId(), thingAttribute.getName(), thingAttribute.getValue());
+ }
+
protected ServerAsset loadAsset(EntityManager em, String assetId) {
ServerAsset asset = em.find(ServerAsset.class, assetId);
if (asset == null)
@@ -257,4 +313,36 @@ protected void validateParent(EntityManager em, Asset asset) {
if (Arrays.asList(parent.getPath()).contains(asset.getId()))
throw new IllegalStateException("Parent asset can not be a child of the asset: " + asset.getParentId());
}
+
+ protected boolean updateAttributeValue(String assetId, String attributeName, JsonValue value) {
+ return persistenceService.doReturningTransaction(entityManager ->
+ entityManager.unwrap(Session.class).doReturningWork(connection -> {
+ String update =
+ "UPDATE ASSET" +
+ " SET ATTRIBUTES = jsonb_set(ATTRIBUTES, ?, ?, FALSE)" +
+ " WHERE ID = ? AND ATTRIBUTES -> ? IS NOT NULL";
+ int result;
+ try (PreparedStatement statement = connection.prepareStatement(update)) {
+
+ Array attributePath = connection.createArrayOf(
+ "text",
+ new String[]{attributeName, "value"}
+ );
+ statement.setArray(1, attributePath);
+
+ PGobject pgJsonValue = new PGobject();
+ pgJsonValue.setType("jsonb");
+ pgJsonValue.setValue(value.toJson());
+ statement.setObject(2, pgJsonValue);
+
+ statement.setString(3, assetId);
+
+ statement.setString(4, attributeName);
+
+ result = statement.executeUpdate();
+ return result == 1;
+ }
+ })
+ );
+ }
}
@@ -2,34 +2,33 @@ package org.openremote.test.agent
import elemental.json.Json
import elemental.json.JsonObject
-import org.apache.camel.Exchange
-import org.apache.camel.Processor
-import org.apache.camel.builder.RouteBuilder
+import elemental.json.JsonType
import org.openremote.agent3.protocol.simulator.SimulatorProtocol
+import org.openremote.container.message.MessageBrokerService
+import org.openremote.manager.server.asset.AssetService
import org.openremote.model.AttributeRef
import org.openremote.model.AttributeValueChange
+import org.openremote.model.Attributes
import org.openremote.model.asset.Color
import org.openremote.test.ManagerContainerTrait
import spock.lang.Specification
-import spock.util.concurrent.BlockingVariables
import spock.util.concurrent.PollingConditions
-import org.openremote.container.message.MessageBrokerService
-import static org.openremote.manager.server.DemoDataService.DEMO_THING_ID
import static org.openremote.agent3.protocol.Protocol.ACTUATOR_TOPIC
-import static org.openremote.agent3.protocol.Protocol.SENSOR_TOPIC
+import static org.openremote.manager.server.DemoDataService.DEMO_THING_ID
class AgentDeploymentTest extends Specification implements ManagerContainerTrait {
def "Check agent and thing deployment"() {
given: "expected conditions"
- def conditions = new PollingConditions(timeout: 10, initialDelay: 3)
+ def conditions = new PollingConditions(timeout: 30, initialDelay: 3)
when: "the demo agent and thing have been deployed"
def serverPort = findEphemeralPort()
def container = startContainer(defaultConfig(serverPort), defaultServices())
def simulatorProtocol = container.getService(SimulatorProtocol.class)
+ def assetService = container.getService(AssetService.class);
then: "the simulator elements should have the initial state"
conditions.eventually {
@@ -41,6 +40,32 @@ class AgentDeploymentTest extends Specification implements ManagerContainerTrait
when: "a thing attribute value change occurs"
conditions = new PollingConditions(timeout: 3, initialDelay: 2)
+ /* TODO What's the call path for thing attribute value updates and therefore the asset client API?
+
+ Proposal:
+
+ 1. Receive AttributeValueChange on client API
+
+ 2. Handle security and verify the message is for a linked agent/thing/attribute in the asset database
+
+ 3. Send the AttributeValueChange to the Protocol through ACTUATOR_TOPIC and trigger device/service call
+
+ 4a. We assume that most protocol implementations are not using fire-and-forget but request/reply
+ communication. Thus, an AttributeValueChange has no further consequences besides triggering an
+ actuator. We expect that a sensor "response" will let us know "soon" if the call was successful.
+ Only a sensor value change will result in an update of the asset database state. The window of
+ inconsistency we can accept depends on how "soon" a protocol typically responds.
+
+ 4b. Alternatively, if the updated attribute is configured with the "forceUpdate" meta item, we write
+ the new attribute value directly into the asset database after triggering the actuator. This flag
+ is useful if the protocol does not reflect actuator changes "immediately". For example, if we
+ send a value to a light dimmer actuator, does the light dimmer also have a sensor that responds
+ quickly with the new value? If the device/service does not reply to value changes, we can force
+ an update of the "current state" in our database and simply assume that the actuator call was
+ successful.
+
+ 4c. Should "forceUpdate" be the default behavior?
+ */
def light1DimmerChange = new AttributeValueChange(
new AttributeRef(DEMO_THING_ID, "light1Dimmer"), Json.create(66)
)
@@ -53,33 +78,21 @@ class AgentDeploymentTest extends Specification implements ManagerContainerTrait
assert simulatorProtocol.getState(DEMO_THING_ID, "light1Dimmer").asNumber() == 66
}
- when: "we listen to sensor changes so we can update the thing attribute value"
- def result = new BlockingVariables(3)
- addRoutes(container, new RouteBuilder() {
- @Override
- void configure() throws Exception {
- from(SENSOR_TOPIC)
- .process(new Processor() {
- @Override
- void process(Exchange exchange) throws Exception {
- result.attributeValueChange =
- exchange.getIn().getBody(AttributeValueChange.class)
- }
- })
- }
- })
-
- and: "a simulated sensor changes its value"
+ when: "a simulated sensor changes its value"
+ conditions = new PollingConditions(timeout: 3, initialDelay: 2)
simulatorProtocol.putState(
new AttributeRef(DEMO_THING_ID, "light1Dimmer"),
Json.create(77),
true
)
- then: "a thing value change should occur"
- result.attributeValueChange.attributeRef.entityId == DEMO_THING_ID
- result.attributeValueChange.attributeRef.attributeName == "light1Dimmer"
- result.attributeValueChange.value.asNumber() == 77
+ then: "the thing attribute value should be updated"
+ conditions.eventually {
+ def thing = assetService.get(DEMO_THING_ID)
+ def attributes = new Attributes(thing.getAttributes())
+ attributes.get("light1Dimmer").getValue().getType() == JsonType.NUMBER
+ attributes.get("light1Dimmer").getValue().asNumber() == 77
+ }
cleanup: "the server should be stopped"
stopContainer(container);
@@ -66,25 +66,26 @@ public String getThingId() {
return result;
}
+ public ThingAttribute getLinkedAttribute(Function<AttributeRef, ProtocolConfiguration> linkResolver,
+ String attributeName) {
+ return getLinkedAttribute(linkResolver, get(attributeName));
+ }
+
protected ThingAttribute getLinkedAttribute(Function<AttributeRef, ProtocolConfiguration> linkResolver,
Attribute attribute) {
- if (attribute == null)
+ if (attribute == null || !ThingAttribute.isLinkedAttribute(attribute))
return null;
- if (ThingAttribute.isLinkedAttribute(attribute)) {
- AttributeRef agentLink = ThingAttribute.getAgentLink(attribute);
- if (agentLink == null)
- return null;
-
- ProtocolConfiguration protocolConfiguration = linkResolver.apply(agentLink);
- if (protocolConfiguration == null) {
- LOG.info("Protocol configuration not found in agent '" + agentLink + "', ignoring: " + attribute);
- return null;
- }
+ AttributeRef agentLink = ThingAttribute.getAgentLink(attribute);
+ if (agentLink == null)
+ return null;
- return new ThingAttribute(getThingId(), protocolConfiguration, attribute);
+ ProtocolConfiguration protocolConfiguration = linkResolver.apply(agentLink);
+ if (protocolConfiguration == null) {
+ LOG.info("Protocol configuration not found in agent '" + agentLink + "', ignoring: " + attribute);
+ return null;
}
- return null;
- }
+ return new ThingAttribute(getThingId(), protocolConfiguration, attribute);
+ }
}

0 comments on commit 7591527

Please sign in to comment.