Skip to content

Commit

Permalink
Merge pull request #902 from openworm/fix/177
Browse files Browse the repository at this point in the history
Backend websocket reconnection
  • Loading branch information
tarelli committed May 26, 2020
2 parents bebcc0d + bf9d859 commit 1a95b17
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -83,7 +84,7 @@ public class ConnectionHandler implements IGeppettoManagerCallbackListener

// the geppetto project active for this connection
private IGeppettoProject geppettoProject;

/**
* @param websocketConnection
* @param geppettoManager
Expand All @@ -98,7 +99,32 @@ protected ConnectionHandler(WebsocketConnection websocketConnection, IGeppettoMa
this.geppettoManager = new GeppettoManager(geppettoManager, geppettoManagerConfiguration);
this.geppettoManager.setSimulationListener(this);
}


/**
* @param instance
*/
public void setGeppettoManager(GeppettoManager instance) throws GeppettoExecutionException
{
if (instance != null) {
geppettoManager = instance;
} else {
throw new GeppettoExecutionException("Setting geppetto manager on reconnection not working, the instance provided is null");
}
}

/**
* @return
* @throws GeppettoExecutionException
*/
public IGeppettoManager getGeppettoManager() throws GeppettoExecutionException
{
if (geppettoManager != null) {
return geppettoManager;
} else {
throw new GeppettoExecutionException("This connection handler does not have an initialised geppetto manager.");
}
}

/**
* @param requestID
* @param projectId
Expand Down Expand Up @@ -210,6 +236,29 @@ else if(geppettoProject.getActiveExperimentId() != -1)
}

}

/**
* @param requestID
* @param projectId
*/
public void setProjectFromId(String requestID, long projectId)
{
try {
IGeppettoDataManager dataManager = DataManagerHelper.getDataManager();
IGeppettoProject geppettoProject = dataManager.getGeppettoProjectById(projectId);

Gson gson = new Gson();
String projectJSON = gson.toJson(geppettoProject);
boolean persisted = geppettoProject.isVolatile();
String update = "{\"project\":" + projectJSON + "}";

setConnectionProject(geppettoProject);
reloadExperiment(requestID, geppettoProject.getActiveExperimentId(), projectId);
websocketConnection.sendMessage(null, OutboundMessages.PROJECT_LOADED, update);
} catch (GeppettoExecutionException e) {
error(e, "Error in retrieving the project from the ID provided");
}
}

/**
* @param projectId
Expand Down Expand Up @@ -353,6 +402,38 @@ public void loadExperiment(String requestID, long experimentID, long projectId)
}
logger.debug("Loading experiment took " + (System.currentTimeMillis() - start) + "ms");
}

/**
* @param requestID
* @param experimentID
* @param projectId
*/
public void reloadExperiment(String requestID, long experimentID, long projectId)
{
long start = System.currentTimeMillis();
try
{
IGeppettoProject geppettoProject = retrieveGeppettoProject(projectId);
IExperiment experiment = retrieveExperiment(experimentID, geppettoProject);
// run the matched experiment
if(experiment != null)
{
ExperimentState experimentState = geppettoManager.loadExperiment(requestID, experiment);
logger.info("The experiment " + experimentID + " was reloaded");

}
else
{
error(null, "Error reloading experiment, the experiment " + experimentID + " was not found in project " + projectId);
}

}
catch(GeppettoExecutionException | GeppettoAccessException e)
{
error(e, "Error loading experiment");
}
logger.debug("Loading experiment took " + (System.currentTimeMillis() - start) + "ms");
}

/**
* Run the Experiment
Expand Down Expand Up @@ -1411,6 +1492,14 @@ public void closeProject()

}

/**
*
*/
public void pauseProject()
{
ConnectionsManager.getInstance().removeConnection(websocketConnection);

}
/**
* @param geppettoProject
* @throws GeppettoExecutionException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Date;
import java.util.Calendar;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.geppetto.core.common.GeppettoExecutionException;
import org.geppetto.simulation.manager.GeppettoManager;

/**
* @author matteocantarelli
Expand All @@ -26,6 +32,8 @@ public class ConnectionsManager

private final ConcurrentHashMap<String, WebsocketConnection> _connections = new ConcurrentHashMap<String, WebsocketConnection>();

private final ConcurrentHashMap<String, ManagerRecord> managers = new ConcurrentHashMap<String, ManagerRecord>();

/**
* @return
*/
Expand All @@ -37,6 +45,39 @@ public static ConnectionsManager getInstance()
}
return connectionsManager;
}

/**
* @param String requestID
* @param ConnectionHandler instance
*/
public void registerHandler(String connectionID, ConnectionHandler instance) throws GeppettoExecutionException
{
if (!managers.containsKey(connectionID)) {
ManagerRecord newRecord = new ManagerRecord((GeppettoManager) instance.getGeppettoManager());
managers.put(connectionID, newRecord);
} else {
ManagerRecord newRecord = new ManagerRecord((GeppettoManager) instance.getGeppettoManager());
managers.put(connectionID, newRecord);
throw new GeppettoExecutionException("The GeppettoManager registered for the session " + connectionID + " has been replaced");
}
}

/**
* @param String connectionID
* @return
*/
public GeppettoManager getHandler(String connectionID) throws GeppettoExecutionException
{
if (managers.containsKey(connectionID)) {
GeppettoManager _manager = managers.get(connectionID).getManagerRecord();
managers.remove(connectionID);
return _manager;
} else {
throw new GeppettoExecutionException("The Geppetto Manager requested has not been registered.");
}
}



/**
* Add new connection to list of current ones
Expand All @@ -62,15 +103,26 @@ public String addConnection(WebsocketConnection websocketConnection)
*/
private void purgeLostConnections()
{
List<WebsocketConnection> toBeRemoved = new ArrayList<WebsocketConnection>();
// Check all the connections registered, ping who is alive and purge the others
for(WebsocketConnection client : this.getConnections())
{
CharBuffer buffer = CharBuffer.wrap("ping");
client.getSession().getAsyncRemote().sendObject(buffer);
if (client.getSession().isOpen()) {
CharBuffer buffer = CharBuffer.wrap("ping");
client.getSession().getAsyncRemote().sendObject(buffer);
} else {
this.removeConnection(client);
}

}
for(WebsocketConnection client : toBeRemoved)
{
this.removeConnection(client);

// To avoid memory consumption we check also the map of geppetto managers stored
// the instances that are more than 5 minutes older gets removed
Long now = Calendar.getInstance().getTimeInMillis() / 1000;
for(String key : managers.keySet()) {
ManagerRecord value = managers.get(key);
if ((now - value.getRegistration()) > (5 * 60)) {
managers.remove(key);
}
}
}

Expand Down Expand Up @@ -126,4 +178,22 @@ private String getNewConnectionId()
{
return "Connection" + connectionsCounter.incrementAndGet();
}

private class ManagerRecord {
private GeppettoManager manager;
private Long registrationDate;

public ManagerRecord(GeppettoManager manager) {
this.manager = manager;
this.registrationDate = Calendar.getInstance().getTimeInMillis() / 1000;
}

public Long getRegistration() {
return registrationDate;
}

public GeppettoManager getManagerRecord() {
return manager;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,28 @@ public void onOpen(Session session, EndpointConfig config) {

@OnClose
public void onClose(Session session, CloseReason closeReason) {
super.onClose(session, closeReason);
messageSender.shutdown();
connectionHandler.closeProject();
logger.info("Closed Connection ..."+userSession.getId());
String exitCode = closeReason.getCloseCode().toString();
if (exitCode.equals("NORMAL_CLOSURE") || exitCode.equals("GOING_AWAY")) {
super.onClose(session, closeReason);
messageSender.shutdown();
connectionHandler.closeProject();
} else {
try
{
ConnectionsManager.getInstance().registerHandler(connectionID, this.connectionHandler);
super.onClose(session, closeReason);
messageSender.shutdown();
connectionHandler.pauseProject();
}
catch(GeppettoExecutionException e)
{
super.onClose(session, closeReason);
messageSender.shutdown();
connectionHandler.closeProject();
}
logger.info("Closed Connection ..."+userSession.getId());

}
}

@OnError
Expand Down Expand Up @@ -205,6 +223,20 @@ public void onMessage(String message) {
connectionHandler.checkUserPrivileges(requestID);
break;
}
case RECONNECT:
{
parameters = new Gson().fromJson(gmsg.data, new TypeToken<HashMap<String, String>>()
{
}.getType());
String lostConnectionID = parameters.get("connectionID");
projectId = Long.parseLong(parameters.get("projectId"));
try {
connectionHandler.setGeppettoManager(ConnectionsManager.getInstance().getHandler(lostConnectionID));
} catch (GeppettoExecutionException e) {
sendMessage(requestID, OutboundMessages.RECONNECTION_ERROR, "");
}
break;
}
case NEW_EXPERIMENT:
{
parameters = new Gson().fromJson(gmsg.data, new TypeToken<HashMap<String, String>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public enum InboundMessages {
NOTIFY_USER("notify_user"),
GET_SCRIPT("get_script"),
GET_DATA_SOURCE_RESULTS("get_data_source_results"),
RECONNECT("reconnect"),

//PROJECT MESSAGES
LOAD_PROJECT_FROM_URL("load_project_from_url"),
Expand Down Expand Up @@ -53,7 +54,7 @@ public enum InboundMessages {

//QUERIES
RUN_QUERY("run_query"),
RUN_QUERY_COUNT("run_query_count"),
RUN_QUERY_COUNT("run_query_count"),
;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public enum OutboundMessages {
IMPORT_VALUE_RESOLVED("import_value_resolved"),
RETURN_QUERY("return_query"),
RETURN_QUERY_COUNT("return_query_count"),
RETURN_QUERY_RESULTS("return_query_results");
RETURN_QUERY_RESULTS("return_query_results"),
RECONNECTION_ERROR("reconnection_error");

private OutboundMessages(final String text) {
this.text = text;
Expand Down

0 comments on commit 1a95b17

Please sign in to comment.