Skip to content

Commit

Permalink
Prepare for multiple controller instances in the same process
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian Bauer committed Nov 29, 2016
1 parent ee0280c commit 84b7d16
Show file tree
Hide file tree
Showing 21 changed files with 473 additions and 467 deletions.
Expand Up @@ -3,14 +3,12 @@
import org.openremote.container.Container;
import org.openremote.container.ContainerService;
import org.openremote.controller.command.CommandBuilder;
import org.openremote.controller.context.ControllerContext;
import org.openremote.controller.context.InMemoryStateStorage;
import org.openremote.controller.deploy.DeploymentDefinition;
import org.openremote.controller.deploy.xml.ControllerDOMParser;
import org.openremote.controller.event.EventProcessor;
import org.openremote.controller.event.EventProcessorChain;
import org.openremote.controller.rules.CommandFacade;
import org.openremote.controller.model.Deployment;
import org.openremote.controller.model.Sensor;
import org.openremote.controller.context.DataContext;
import org.openremote.controller.deploy.Deployment;

import java.io.InputStream;

Expand All @@ -23,9 +21,7 @@ public class ControllerService implements ContainerService {
final protected EventProcessor[] eventProcessors;

protected DeploymentDefinition deploymentDefinition;
protected Deployment deployment;
protected CommandFacade commandFacade;
protected DataContext dataContext;
protected ControllerContext controllerContext;

public ControllerService(InputStream deploymentXml, CommandBuilder commandBuilder, EventProcessor[] eventProcessors) {
this.deploymentXml = deploymentXml;
Expand All @@ -40,41 +36,31 @@ public void init(Container container) throws Exception {

@Override
public void configure(Container container) throws Exception {
deployment = new Deployment(deploymentDefinition, commandBuilder);

commandFacade = new CommandFacade(deployment);

EventProcessorChain eventProcessorChain = new EventProcessorChain(commandFacade, eventProcessors);

dataContext = new DataContext(deployment, eventProcessorChain);
// TODO Support booting of multiple controller instances
Deployment deployment = new Deployment(
deploymentDefinition,
commandBuilder,
new InMemoryStateStorage(),
eventProcessors
);
controllerContext = new ControllerContext("OpenRemoteController1", deployment);
}

@Override
public void start(Container container) throws Exception {
if (dataContext != null) {
dataContext.start();
for (Sensor sensor : deployment.getSensors()) {
dataContext.registerAndStartSensor(sensor);
}
if (controllerContext != null) {
controllerContext.start();
}
}

@Override
public void stop(Container container) throws Exception {
if (dataContext != null) {
dataContext.stop();
if (controllerContext != null) {
controllerContext.stop();
}
}

public Deployment getDeployment() {
return deployment;
}

public CommandFacade getCommandFacade() {
return commandFacade;
}

public DataContext getDataContext() {
return dataContext;
public ControllerContext getContext() {
return controllerContext;
}
}
@@ -1,10 +1,8 @@
package org.openremote.controller.rules;
package org.openremote.controller.command;

import org.openremote.controller.command.Command;
import org.openremote.controller.command.ExecutableCommand;
import org.openremote.controller.event.EventProcessor;
import org.openremote.controller.deploy.CommandDefinition;
import org.openremote.controller.model.Deployment;
import org.openremote.controller.deploy.Deployment;

import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -13,25 +11,25 @@
* Can be used directly and/or by {@link EventProcessor}s to trigger
* {@link ExecutableCommand}s (e.g. in rules or from simple client call).
*/
public class CommandFacade {
public class Commands {

private static final Logger LOG = Logger.getLogger(CommandFacade.class.getName());
private static final Logger LOG = Logger.getLogger(Commands.class.getName());

final protected Deployment deployment;

public CommandFacade(Deployment deployment) {
public Commands(Deployment deployment) {
this.deployment = deployment;
}

public void command(String commandName) {
command(commandName, null);
public void execute(String commandName) {
execute(commandName, null);
}

public void command(String commandName, int arg) {
command(commandName, Integer.toString(arg));
public void execute(String commandName, int arg) {
execute(commandName, Integer.toString(arg));
}

public void command(String commandName, String arg) {
public void execute(String commandName, String arg) {
CommandDefinition commandDefinition = deployment.getCommandDefinition(commandName);
if (commandDefinition == null) {
LOG.warning("Command definition not found, ignoring execution: " + commandName);
Expand Down Expand Up @@ -62,18 +60,18 @@ public void execute(CommandDefinition commandDefinition, String arg) {
"No command was produced (does the protocol have an ExecutableCommand?): " + commandDefinition
);
} else if (command instanceof ExecutableCommand) {
LOG.fine("Executing command '" + command + "' with: " + arg);
LOG.fine("Executing '" + commandDefinition.getName() + "' with: " + arg);
ExecutableCommand executableCommand = (ExecutableCommand) command;
try {
executableCommand.send(arg);
} catch (Exception ex) {
LOG.log(Level.SEVERE, "Error executing command '" + command + "' with: " + arg, ex);
LOG.log(Level.SEVERE, "Error executing '" + commandDefinition.getName() + "' with: " + arg, ex);
}
} else {
LOG.log(Level.WARNING, "Ignoring, not an ExecutableCommand: " + command);
}
} catch (Throwable t) {
LOG.log(Level.SEVERE, "Error building command: " + commandDefinition, t);
LOG.log(Level.SEVERE, "Error building: " + commandDefinition, t);
}
}
}
Expand Down
Expand Up @@ -11,8 +11,8 @@
* implementation but push commands can be used to override this default implementation.
*
* Push commands are expected to create their own threads (if needed) which implement the
* push functionality and also directly push received events to the
* data context of the controller using the sensor callback API provided.
* push functionality and also directly push received events to the context of the controller
* using the sensor callback API provided.
*
* Push command implementations use the sensor
* {@link org.openremote.controller.model.Sensor#update} method to push state changes
Expand Down
Expand Up @@ -3,84 +3,91 @@
import org.openremote.controller.event.Event;
import org.openremote.controller.event.EventProcessingContext;
import org.openremote.controller.event.EventProcessorChain;
import org.openremote.controller.model.Deployment;
import org.openremote.controller.deploy.Deployment;
import org.openremote.controller.model.Sensor;
import org.openremote.controller.command.Commands;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

public class DataContext {
/**
* An instance of a controller, bootstrapped from {@link Deployment}, this is the main API.
*/
public class ControllerContext {

private static final Logger LOG = Logger.getLogger(DataContext.class.getName());
private static final Logger LOG = Logger.getLogger(ControllerContext.class.getName());

final protected String controllerID;
final protected Deployment deployment;
final protected Commands commands;
final protected EventProcessorChain eventProcessorChain;

final protected Map<Integer, Sensor> sensors = new ConcurrentHashMap<>();
final protected StateStorage stateStorage = new InMemoryStateStorage();
private volatile Boolean shutdownInProgress = false;

private volatile Boolean isShutdownInProcess = false;

public DataContext(Deployment deployment, EventProcessorChain eventProcessorChain) {
public ControllerContext(String controllerID, Deployment deployment) {
this.controllerID = controllerID;
this.deployment = deployment;
this.eventProcessorChain = eventProcessorChain;
this.commands = new Commands(deployment);
this.eventProcessorChain = new EventProcessorChain(commands, deployment.getEventProcessors());
}

public String getControllerID() {
return controllerID;
}

public Deployment getDeployment() {
return deployment;
}

public Commands getCommands() {
return commands;
}

protected StateStorage getStateStorage() {
return getDeployment().getStateStorage();
}

public synchronized void start() {
if (isShutdownInProcess)
if (shutdownInProgress)
return;
LOG.info("Starting context: " + getControllerID());
eventProcessorChain.start();
for (Sensor sensor : deployment.getSensors()) {
// Put initial state "unknown" for each sensor
getStateStorage().put(new SensorState(new Sensor.UnknownEvent(sensor)));
sensor.start(this);
}
}

/**
* <ul>
* <li>registered sensors are stopped</li>
* <li>event processors are stopped</li>
* <li>sensors are stopped</li>
* <li>the current state is cleared</li>
* <li>registered sensors are unregistered</li>
* </ul>
*/
public synchronized void stop() {
try {
isShutdownInProcess = true;
LOG.info("Stopping context: " + getControllerID());
shutdownInProgress = true;
eventProcessorChain.stop();
for (Sensor sensor : sensors.values()) {
LOG.info("Stopping sensor: " + sensor);
for (Sensor sensor : deployment.getSensors()) {
try {
sensor.stop();
} catch (Throwable t) {
LOG.log(Level.SEVERE, "Failed to stop sensor: " + sensor, t);
}
}
stateStorage.clear();
sensors.clear();
getStateStorage().clear();
} finally {
isShutdownInProcess = false;
}
}

public synchronized void registerAndStartSensor(Sensor sensor) {
if (isShutdownInProcess) {
return;
shutdownInProgress = false;
}

Sensor previous = sensors.put(sensor.getSensorDefinition().getSensorID(), sensor);
if (previous != null) {
throw new IllegalArgumentException("Duplicate registration: " + sensor.getSensorDefinition());
}

// Initial state
stateStorage.put(new SensorState(new Sensor.UnknownEvent(sensor)));

sensor.start(this);
LOG.info("Registered and started sensor: " + sensor);
}

public synchronized void update(Event event) {
LOG.fine("==> Update from event: " + event);
if (isShutdownInProcess) {
LOG.fine("<== Data context is shutting down. Ignoring update from: " + event.getSource());
if (shutdownInProgress) {
LOG.fine("<== Shutting down. Ignoring update from: " + event.getSource());
return;
}

Expand All @@ -89,29 +96,28 @@ public synchronized void update(Event event) {

// Early exist if one of the processors decided to terminate the chain
if (ctx.hasTerminated()) {
LOG.fine("<== Updating status complete, event context terminated, no update was made to data context for event: " + ctx.getEvent());
LOG.fine("<== Updating status complete, event context terminated, no update was made for event: " + ctx.getEvent());
return;
}

stateStorage.put(new SensorState(event));
getStateStorage().put(new SensorState(event));
LOG.fine("<== Updating status complete for event: " + event);
// TODO: Trigger notification of client that stuff has changed? Put it in a message broker/queue/topic?
}

public String queryValue(int sensorID) {
if (!stateStorage.contains(sensorID)) {
if (!getStateStorage().contains(sensorID)) {
LOG.info("Requested sensor id '" + sensorID + "' was not found. Defaulting to: " + Sensor.UNKNOWN_STATUS);
return Sensor.UNKNOWN_STATUS;
}
return stateStorage.get(sensorID).getEvent().serialize();
return getStateStorage().get(sensorID).getEvent().serialize();
}

public String queryValue(String sensorName) {
return queryValue(deployment.getSensorID(sensorName));
}

public Event queryEvent(int sensorID) {
return stateStorage.get(sensorID).getEvent();
return getStateStorage().get(sensorID).getEvent();
}

public Event queryEvent(String sensorName) {
Expand Down
@@ -1,9 +1,13 @@
package org.openremote.controller.context;

/**
* Store controller context (sensor) state.
*/
public interface StateStorage {

void clear();

// TODO: Trigger notification of client that stuff has changed? Put it in a message broker/queue/topic?
void put(SensorState sensorState);

boolean contains(int sensorID);
Expand Down

0 comments on commit 84b7d16

Please sign in to comment.