Skip to content

Commit

Permalink
Merge pull request #245 from richard-austin/modify-cloud-link-to-use-…
Browse files Browse the repository at this point in the history
…activemq

Bug fixes related to using the Cloud Service. Direct access to the NVR is not affected
  • Loading branch information
richard-austin authored Jan 13, 2024
2 parents e6a7b7a + 1907e0c commit 3058fd2
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 116 deletions.
4 changes: 2 additions & 2 deletions server/grails-app/conf/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ environments:
mqUser: cloud
mqPassword: inelESOLsiONgeOuShEFatIns
productKeyPath: /etc/security-cam/prodKey
cloudActiveMQUrl: failover://ssl://192.168.1.83:61617?socket.verifyHostName=false
cloudActiveMQUrl: ssl://192.168.1.82:61617?socket.verifyHostName=false
activeMQInitQueue: INIT
webServerForCloudProxyHost: localhost
webServerForCloudProxyPort: 8080
Expand Down Expand Up @@ -207,7 +207,7 @@ environments:
mqUser: cloud
mqPassword: inelESOLsiONgeOuShEFatIns
productKeyPath: /etc/security-cam/prodKey
cloudActiveMQUrl: failover://ssl://192.168.1.83:61617?socket.verifyHostName=false
cloudActiveMQUrl: ssl://192.168.1.82:61617?socket.verifyHostName=false
activeMQInitQueue: INIT
webServerForCloudProxyHost: localhost
webServerForCloudProxyPort: 8088
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import security.cam.interfaceobjects.ObjectCommandResponse
class CloudProxyService {
LogService logService
GrailsApplication grailsApplication
SimpMessagingTemplate brokerMessagingTemplate;
SimpMessagingTemplate brokerMessagingTemplate

CloudAMQProxy cloudProxy = null

Expand All @@ -26,7 +26,7 @@ class CloudProxyService {

ObjectCommandResponse response = new ObjectCommandResponse()
try {
cloudProxy.start()
cloudProxy.userStart()
}
catch (Exception ex) {
response.status = PassFail.FAIL
Expand All @@ -39,7 +39,7 @@ class CloudProxyService {
ObjectCommandResponse stop() {
ObjectCommandResponse response = new ObjectCommandResponse()
try {
cloudProxy.stop()
cloudProxy.userStop()
}
catch (Exception ex) {
response.status = PassFail.FAIL
Expand Down
64 changes: 0 additions & 64 deletions server/src/main/java/com/proxy/AdvisoryMonitor.java

This file was deleted.

119 changes: 72 additions & 47 deletions server/src/main/java/com/proxy/CloudAMQProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public enum MessageMetadata {
private ActiveMQConnection connection = null;
private String productId = "";
SimpMessagingTemplate brokerMessagingTemplate;
private boolean userSelectedRunning = false;

public CloudAMQProxy(String webServerForCloudProxyHost, int webServerForCloudProxyPort, SimpMessagingTemplate brokerMessagingTemplate) {
this.webServerForCloudProxyHost = webServerForCloudProxyHost;
Expand All @@ -75,39 +76,53 @@ public CloudAMQProxy(String webServerForCloudProxyHost, int webServerForCloudPro
setLogLevel(cloudProxyProperties.getLOG_LEVEL());
}

public void start() {
startActiveMQClientExecutor.submit(() -> {
if (!isRunning()) {
cloudProxyExecutor = Executors.newSingleThreadExecutor();
webserverReadExecutor = Executors.newCachedThreadPool();
webserverWriteExecutor = Executors.newSingleThreadExecutor();
cloudProxyExecutor.execute(() -> {
running = true;
try {
connection = getConnection();
connection.start();
// Create a Session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (INIT queue for sending product ID for login)
Destination destination = session.createQueue(cloudProxyProperties.getACTIVE_MQ_INIT_QUEUE());
loginToCloud(destination);

} catch (Exception ex) {
showExceptionDetails(ex, "CloudAMQProxy.start");
public void userStart() {
userSelectedRunning = true;
start();
}

private void start() {
if (userSelectedRunning) {
startActiveMQClientExecutor.submit(() -> {
if (!isRunning()) {
cloudProxyExecutor = Executors.newSingleThreadExecutor();
webserverReadExecutor = Executors.newCachedThreadPool();
webserverWriteExecutor = Executors.newSingleThreadExecutor();
cloudProxyExecutor.execute(() -> {
running = true;
stop();
} finally {
resetCloudProxySessionTimeout(); // Start the connection timer, if heartbeats are not received from the Cloud
}
});
}
});
try {
connection = getConnection();
connection.start();
// Create a Session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (INIT queue for sending product ID for login)
Destination destination = session.createQueue(cloudProxyProperties.getACTIVE_MQ_INIT_QUEUE());
// Signal that the connection is active
brokerMessagingTemplate.convertAndSend("/topic/transportStatus", transportActiveMsg);
transportActive = true;
loginToCloud(destination);
} catch (Exception ex) {
showExceptionDetails(ex, "CloudAMQProxy.start");
stop();
} finally {
resetCloudProxySessionTimeout(); // Start the connection timer, if heartbeats are not received from the Cloud
}
});
}
});
}
}

public void stop() {
public void userStop() {
userSelectedRunning = false;
stop();
}

private void stop() {
if (isRunning()) {
try {
stopCloudProxySessionTimer();
running = false;
cloudProxyExecutor.shutdownNow();
webserverReadExecutor.shutdown();
webserverWriteExecutor.shutdownNow();
Expand All @@ -124,8 +139,6 @@ public void stop() {
}
} catch (Exception ex) {
logger.error(ex.getClass().getName() + " in CloudAMQProxy.stop: " + ex.getMessage());
} finally {
running = false;
}
}
}
Expand All @@ -148,16 +161,14 @@ void closeAndClearSockets() {
/**
* cleanUpForRestart: Some sort of problem occurred with the Cloud connection, ensure we restart cleanly
*/
void restart() {
public void restart() {
try {
cip.stop();
closeAndClearSockets();
logger.info("Restarting CloudAMQProxy");
// Ensure all sockets in the token/socket map are closed
Thread.sleep(2000); // Short wait before restart
stop();
if (userSelectedRunning) {
Thread.sleep(2000); // Short wait before restart
start();
}
// Create the destination
Destination destination = session.createQueue(cloudProxyProperties.getACTIVE_MQ_INIT_QUEUE());
loginToCloud(destination);
} catch (Exception ex) {
logger.error(ex.getClass().getName() + " in restart: " + ex.getMessage());
}
Expand Down Expand Up @@ -368,9 +379,19 @@ void sendResponseToCloud(Message msg) {
}

CloudInputProcess cip = null;
final String transportActiveMsg = new JSONObject()
.put("transportActive", true)
.toString();
final String transportInactiveMsg = new JSONObject()
.put("transportActive", false)
.toString();

private ActiveMQConnection getConnection() throws Exception {
ActiveMQConnection connection = getActiveMQConnection();
final String transportActiveMsg = new JSONObject()
.put("transportActive", false)
.toString();

TransportListener tl = new TransportListener() {
@Override
public void onCommand(Object command) {
Expand All @@ -379,26 +400,29 @@ public void onCommand(Object command) {

@Override
public void onException(IOException error) {
// This does get called when not using failover.
logger.info(error.getClass().getName() + " received in getConnection transport listener: " + error.getMessage());

// Use the exception handler for what I hoped the transportInterupted handler would do!
brokerMessagingTemplate.convertAndSend("/topic/transportStatus", transportInactiveMsg);
transportActive = false;
}

@Override
public void transportInterupted() {
final String transportActiveMsg = new JSONObject()
.put("transportActive", false)
.toString();
// Disable audio out on clients except the initiator
brokerMessagingTemplate.convertAndSend("/topic/transportStatus", transportActiveMsg);
// This is never called when not using failover

// Signal that the connection is down
brokerMessagingTemplate.convertAndSend("/topic/transportStatus", transportInactiveMsg);
transportActive = false;
logger.info("Transport interrupted");
}

@Override
public void transportResumed() {
final String transportActiveMsg = new JSONObject()
.put("transportActive", true)
.toString();
// Disable audio out on clients except the initiator
// This is never called when not using failover

// Signal that the connection is active
brokerMessagingTemplate.convertAndSend("/topic/transportStatus", transportActiveMsg);

transportActive = true;
Expand Down Expand Up @@ -485,8 +509,9 @@ private void startCloudProxySessionTimer() {
}

private void stopCloudProxySessionTimer() {
if (cloudProxySessionTimer != null)
if (cloudProxySessionTimer != null) {
cloudProxySessionTimer.cancel();
}
}

public void resetCloudProxySessionTimeout() {
Expand Down

0 comments on commit 3058fd2

Please sign in to comment.