diff --git a/src/main/java/org/geppetto/frontend/controllers/ConnectionHandler.java b/src/main/java/org/geppetto/frontend/controllers/ConnectionHandler.java index 87239fec2..89f161f12 100644 --- a/src/main/java/org/geppetto/frontend/controllers/ConnectionHandler.java +++ b/src/main/java/org/geppetto/frontend/controllers/ConnectionHandler.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -83,7 +84,7 @@ public class ConnectionHandler implements IGeppettoManagerCallbackListener // the geppetto project active for this connection private IGeppettoProject geppettoProject; - + /** * @param websocketConnection * @param geppettoManager @@ -98,7 +99,32 @@ protected ConnectionHandler(WebsocketConnection websocketConnection, IGeppettoMa this.geppettoManager = new GeppettoManager(geppettoManager, geppettoManagerConfiguration); this.geppettoManager.setSimulationListener(this); } - + + /** + * @param instance + */ + public void setGeppettoManager(GeppettoManager instance) throws GeppettoExecutionException + { + if (instance != null) { + geppettoManager = instance; + } else { + throw new GeppettoExecutionException("Setting geppetto manager on reconnection not working, the instance provided is null"); + } + } + + /** + * @return + * @throws GeppettoExecutionException + */ + public IGeppettoManager getGeppettoManager() throws GeppettoExecutionException + { + if (geppettoManager != null) { + return geppettoManager; + } else { + throw new GeppettoExecutionException("This connection handler does not have an initialised geppetto manager."); + } + } + /** * @param requestID * @param projectId @@ -210,6 +236,29 @@ else if(geppettoProject.getActiveExperimentId() != -1) } } + + /** + * @param requestID + * @param projectId + */ + public void setProjectFromId(String requestID, long projectId) + { + try { + IGeppettoDataManager dataManager = DataManagerHelper.getDataManager(); + IGeppettoProject geppettoProject = dataManager.getGeppettoProjectById(projectId); + + Gson gson = new Gson(); + String projectJSON = gson.toJson(geppettoProject); + boolean persisted = geppettoProject.isVolatile(); + String update = "{\"project\":" + projectJSON + "}"; + + setConnectionProject(geppettoProject); + reloadExperiment(requestID, geppettoProject.getActiveExperimentId(), projectId); + websocketConnection.sendMessage(null, OutboundMessages.PROJECT_LOADED, update); + } catch (GeppettoExecutionException e) { + error(e, "Error in retrieving the project from the ID provided"); + } + } /** * @param projectId @@ -353,6 +402,38 @@ public void loadExperiment(String requestID, long experimentID, long projectId) } logger.debug("Loading experiment took " + (System.currentTimeMillis() - start) + "ms"); } + + /** + * @param requestID + * @param experimentID + * @param projectId + */ + public void reloadExperiment(String requestID, long experimentID, long projectId) + { + long start = System.currentTimeMillis(); + try + { + IGeppettoProject geppettoProject = retrieveGeppettoProject(projectId); + IExperiment experiment = retrieveExperiment(experimentID, geppettoProject); + // run the matched experiment + if(experiment != null) + { + ExperimentState experimentState = geppettoManager.loadExperiment(requestID, experiment); + logger.info("The experiment " + experimentID + " was reloaded"); + + } + else + { + error(null, "Error reloading experiment, the experiment " + experimentID + " was not found in project " + projectId); + } + + } + catch(GeppettoExecutionException | GeppettoAccessException e) + { + error(e, "Error loading experiment"); + } + logger.debug("Loading experiment took " + (System.currentTimeMillis() - start) + "ms"); + } /** * Run the Experiment @@ -1411,6 +1492,14 @@ public void closeProject() } + /** + * + */ + public void pauseProject() + { + ConnectionsManager.getInstance().removeConnection(websocketConnection); + + } /** * @param geppettoProject * @throws GeppettoExecutionException diff --git a/src/main/java/org/geppetto/frontend/controllers/ConnectionsManager.java b/src/main/java/org/geppetto/frontend/controllers/ConnectionsManager.java index 8dd6619b0..0f7ab711a 100644 --- a/src/main/java/org/geppetto/frontend/controllers/ConnectionsManager.java +++ b/src/main/java/org/geppetto/frontend/controllers/ConnectionsManager.java @@ -4,12 +4,18 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.Date; +import java.util.Calendar; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.geppetto.core.common.GeppettoExecutionException; +import org.geppetto.simulation.manager.GeppettoManager; /** * @author matteocantarelli @@ -26,6 +32,8 @@ public class ConnectionsManager private final ConcurrentHashMap _connections = new ConcurrentHashMap(); + private final ConcurrentHashMap managers = new ConcurrentHashMap(); + /** * @return */ @@ -37,6 +45,39 @@ public static ConnectionsManager getInstance() } return connectionsManager; } + + /** + * @param String requestID + * @param ConnectionHandler instance + */ + public void registerHandler(String connectionID, ConnectionHandler instance) throws GeppettoExecutionException + { + if (!managers.containsKey(connectionID)) { + ManagerRecord newRecord = new ManagerRecord((GeppettoManager) instance.getGeppettoManager()); + managers.put(connectionID, newRecord); + } else { + ManagerRecord newRecord = new ManagerRecord((GeppettoManager) instance.getGeppettoManager()); + managers.put(connectionID, newRecord); + throw new GeppettoExecutionException("The GeppettoManager registered for the session " + connectionID + " has been replaced"); + } + } + + /** + * @param String connectionID + * @return + */ + public GeppettoManager getHandler(String connectionID) throws GeppettoExecutionException + { + if (managers.containsKey(connectionID)) { + GeppettoManager _manager = managers.get(connectionID).getManagerRecord(); + managers.remove(connectionID); + return _manager; + } else { + throw new GeppettoExecutionException("The Geppetto Manager requested has not been registered."); + } + } + + /** * Add new connection to list of current ones @@ -62,15 +103,26 @@ public String addConnection(WebsocketConnection websocketConnection) */ private void purgeLostConnections() { - List toBeRemoved = new ArrayList(); + // Check all the connections registered, ping who is alive and purge the others for(WebsocketConnection client : this.getConnections()) { - CharBuffer buffer = CharBuffer.wrap("ping"); - client.getSession().getAsyncRemote().sendObject(buffer); + if (client.getSession().isOpen()) { + CharBuffer buffer = CharBuffer.wrap("ping"); + client.getSession().getAsyncRemote().sendObject(buffer); + } else { + this.removeConnection(client); + } + } - for(WebsocketConnection client : toBeRemoved) - { - this.removeConnection(client); + + // To avoid memory consumption we check also the map of geppetto managers stored + // the instances that are more than 5 minutes older gets removed + Long now = Calendar.getInstance().getTimeInMillis() / 1000; + for(String key : managers.keySet()) { + ManagerRecord value = managers.get(key); + if ((now - value.getRegistration()) > (5 * 60)) { + managers.remove(key); + } } } @@ -126,4 +178,22 @@ private String getNewConnectionId() { return "Connection" + connectionsCounter.incrementAndGet(); } + + private class ManagerRecord { + private GeppettoManager manager; + private Long registrationDate; + + public ManagerRecord(GeppettoManager manager) { + this.manager = manager; + this.registrationDate = Calendar.getInstance().getTimeInMillis() / 1000; + } + + public Long getRegistration() { + return registrationDate; + } + + public GeppettoManager getManagerRecord() { + return manager; + } + } } diff --git a/src/main/java/org/geppetto/frontend/controllers/WebsocketConnection.java b/src/main/java/org/geppetto/frontend/controllers/WebsocketConnection.java index f5a8b7597..87417a9c5 100644 --- a/src/main/java/org/geppetto/frontend/controllers/WebsocketConnection.java +++ b/src/main/java/org/geppetto/frontend/controllers/WebsocketConnection.java @@ -128,10 +128,28 @@ public void onOpen(Session session, EndpointConfig config) { @OnClose public void onClose(Session session, CloseReason closeReason) { - super.onClose(session, closeReason); - messageSender.shutdown(); - connectionHandler.closeProject(); - logger.info("Closed Connection ..."+userSession.getId()); + String exitCode = closeReason.getCloseCode().toString(); + if (exitCode.equals("NORMAL_CLOSURE") || exitCode.equals("GOING_AWAY")) { + super.onClose(session, closeReason); + messageSender.shutdown(); + connectionHandler.closeProject(); + } else { + try + { + ConnectionsManager.getInstance().registerHandler(connectionID, this.connectionHandler); + super.onClose(session, closeReason); + messageSender.shutdown(); + connectionHandler.pauseProject(); + } + catch(GeppettoExecutionException e) + { + super.onClose(session, closeReason); + messageSender.shutdown(); + connectionHandler.closeProject(); + } + logger.info("Closed Connection ..."+userSession.getId()); + + } } @OnError @@ -205,6 +223,20 @@ public void onMessage(String message) { connectionHandler.checkUserPrivileges(requestID); break; } + case RECONNECT: + { + parameters = new Gson().fromJson(gmsg.data, new TypeToken>() + { + }.getType()); + String lostConnectionID = parameters.get("connectionID"); + projectId = Long.parseLong(parameters.get("projectId")); + try { + connectionHandler.setGeppettoManager(ConnectionsManager.getInstance().getHandler(lostConnectionID)); + } catch (GeppettoExecutionException e) { + sendMessage(requestID, OutboundMessages.RECONNECTION_ERROR, ""); + } + break; + } case NEW_EXPERIMENT: { parameters = new Gson().fromJson(gmsg.data, new TypeToken>() diff --git a/src/main/java/org/geppetto/frontend/messages/InboundMessages.java b/src/main/java/org/geppetto/frontend/messages/InboundMessages.java index a29000906..edd6809e9 100644 --- a/src/main/java/org/geppetto/frontend/messages/InboundMessages.java +++ b/src/main/java/org/geppetto/frontend/messages/InboundMessages.java @@ -10,6 +10,7 @@ public enum InboundMessages { NOTIFY_USER("notify_user"), GET_SCRIPT("get_script"), GET_DATA_SOURCE_RESULTS("get_data_source_results"), + RECONNECT("reconnect"), //PROJECT MESSAGES LOAD_PROJECT_FROM_URL("load_project_from_url"), @@ -53,7 +54,7 @@ public enum InboundMessages { //QUERIES RUN_QUERY("run_query"), - RUN_QUERY_COUNT("run_query_count"), + RUN_QUERY_COUNT("run_query_count"), ; diff --git a/src/main/java/org/geppetto/frontend/messages/OutboundMessages.java b/src/main/java/org/geppetto/frontend/messages/OutboundMessages.java index 880bee5cf..dc1ec1bfc 100644 --- a/src/main/java/org/geppetto/frontend/messages/OutboundMessages.java +++ b/src/main/java/org/geppetto/frontend/messages/OutboundMessages.java @@ -61,7 +61,8 @@ public enum OutboundMessages { IMPORT_VALUE_RESOLVED("import_value_resolved"), RETURN_QUERY("return_query"), RETURN_QUERY_COUNT("return_query_count"), - RETURN_QUERY_RESULTS("return_query_results"); + RETURN_QUERY_RESULTS("return_query_results"), + RECONNECTION_ERROR("reconnection_error"); private OutboundMessages(final String text) { this.text = text;