Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modifications to incorporate token-refresh for Virtual Firealarm #297

Merged
merged 6 commits into from
Jun 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@
<artifactId>json</artifactId>
</dependency>

<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.10</version>
</dependency>


</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.transport.TransportUtils;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.transport.http.HTTPTransportHandler;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.core.AgentConstants;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.core.AgentManager;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.exception.AgentCoreOperationException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.transport.TransportUtils;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.transport.http.HTTPTransportHandler;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -191,21 +191,19 @@ public void run() {

private void executeDataPush(String pushDataPayload) {
AgentManager agentManager = AgentManager.getInstance();
int responseCode = -1;
String pushDataEndPointURL = agentManager.getPushDataAPIEP();
HttpURLConnection httpConnection = null;
HttpURLConnection httpConnection;
int responseCode = -1;

try {
httpConnection = TransportUtils.getHttpConnection(agentManager.getPushDataAPIEP());
httpConnection.setRequestMethod(AgentConstants.HTTP_POST);
httpConnection.setRequestProperty("Authorization", "Bearer " +
agentManager.getAgentConfigs().getAuthToken());
httpConnection.setRequestProperty("Content-Type",
AgentConstants.APPLICATION_JSON_TYPE);
httpConnection.setRequestProperty("Authorization",
"Bearer " + agentManager.getAgentConfigs().getAuthToken());
httpConnection.setRequestProperty("Content-Type", AgentConstants.APPLICATION_JSON);

httpConnection.setDoOutput(true);
DataOutputStream dataOutPutWriter = new DataOutputStream(
httpConnection.getOutputStream());
DataOutputStream dataOutPutWriter = new DataOutputStream(httpConnection.getOutputStream());
dataOutPutWriter.writeBytes(pushDataPayload);
dataOutPutWriter.flush();
dataOutPutWriter.close();
Expand All @@ -225,39 +223,34 @@ private void executeDataPush(String pushDataPayload) {
} catch (IOException exception) {
String errorMsg =
"An IO error occurred whilst trying to get the response code from: " +
pushDataEndPointURL + " for a " + AgentConstants.HTTP_POST +
" " + "method.";
pushDataEndPointURL + " for a " + AgentConstants.HTTP_POST + " method.";
log.error(AgentConstants.LOG_APPENDER + errorMsg);

} catch (TransportHandlerException exception) {
log.error(AgentConstants.LOG_APPENDER +
"Error encountered whilst trying to create HTTP-Connection " +
"to IoT-Server EP at: " +
"Error encountered whilst trying to create HTTP-Connection to IoT-Server EP at: " +
pushDataEndPointURL);
}

if (responseCode == HttpStatus.CONFLICT_409 ||
responseCode == HttpStatus.PRECONDITION_FAILED_412) {
log.warn(AgentConstants.LOG_APPENDER +
"DeviceIP is being Re-Registered due to Push-Data failure " +
"with response code: " +
"DeviceIP is being Re-Registered due to Push-Data failure with response code: " +
responseCode);
registerThisDevice();

} else if (responseCode != HttpStatus.NO_CONTENT_204) {
if (log.isDebugEnabled()) {
log.error(AgentConstants.LOG_APPENDER + "Status Code: " + responseCode +
" encountered whilst trying to Push-Device-Data to IoT " +
"Server at: " +
" encountered whilst trying to Push-Device-Data to IoT Server at: " +
agentManager.getPushDataAPIEP());
}
agentManager.updateAgentStatus(AgentConstants.SERVER_NOT_RESPONDING);
}

if (log.isDebugEnabled()) {
log.debug(AgentConstants.LOG_APPENDER + "Push-Data call with payload - " +
pushDataPayload + ", to IoT Server returned status " +
responseCode);
log.debug(AgentConstants.LOG_APPENDER + "Push-Data call with payload - " + pushDataPayload +
", to IoT Server returned status " + responseCode);
}
}

Expand All @@ -272,16 +265,14 @@ public void run() {
closeConnection();
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.warn(AgentConstants.LOG_APPENDER +
"Unable to 'STOP' HTTP server at port: " + port);
log.warn(AgentConstants.LOG_APPENDER + "Unable to 'STOP' HTTP server at port: " + port);
}

try {
Thread.sleep(timeoutInterval);
} catch (InterruptedException e1) {
log.error(AgentConstants.LOG_APPENDER +
"HTTP-Termination: Thread Sleep Interrupt " +
"Exception");
log.error(
AgentConstants.LOG_APPENDER + "HTTP-Termination: Thread Sleep Interrupt Exception");
}
}
}
Expand Down Expand Up @@ -398,8 +389,7 @@ private int registerDeviceIP(String deviceOwner, String deviceID)
} catch (TransportHandlerException e) {
String errorMsg =
"Protocol specific error occurred when trying to fetch an HTTPConnection to:" +
" " +
registerEndpointURLString;
" " + registerEndpointURLString;
log.error(AgentConstants.LOG_APPENDER + errorMsg);
throw new AgentCoreOperationException();
}
Expand All @@ -419,8 +409,7 @@ private int registerDeviceIP(String deviceOwner, String deviceID)

} catch (IOException exception) {
String errorMsg = "An IO error occurred whilst trying to get the response code from:" +
" " +
registerEndpointURLString + " for a " + AgentConstants.HTTP_POST + " method.";
" " + registerEndpointURLString + " for a " + AgentConstants.HTTP_POST + " method.";
log.error(AgentConstants.LOG_APPENDER + errorMsg);
throw new AgentCoreOperationException(errorMsg, exception);
}
Expand All @@ -436,7 +425,7 @@ private int registerDeviceIP(String deviceOwner, String deviceID)

/*------------------------------------------------------------------------------------------*/
/* Utility methods relevant to creating and sending HTTP requests to the Iot-Server */
/*------------------------------------------------------------------------------------------*/
/*------------------------------------------------------------------------------------------*/

/**
* This method is used to get the IP of the device in which the agent is run on.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.core.AgentConstants;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.core.AgentManager;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.core.AgentUtilOperations;
Expand All @@ -35,13 +36,14 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

//TODO:: Lincense heade, comments and SPECIFIC class name since its not generic
//TODO:: Lincence header, comments and SPECIFIC class name since its not generic
public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {

private static final Log log = LogFactory.getLog(FireAlarmMQTTCommunicator.class);

private ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
private ScheduledFuture<?> dataPushServiceHandler;
private static final String DEFAULT_PASSWORD = "";

public FireAlarmMQTTCommunicator(String deviceOwner, String deviceType,
String mqttBrokerEndPoint, String subscribeTopic) {
Expand All @@ -68,18 +70,23 @@ public void connect() {
public void run() {
while (!isConnected()) {
try {
connectToQueue();
connectToQueue(agentManager.getAgentConfigs().getAuthToken(), DEFAULT_PASSWORD);
agentManager.updateAgentStatus("Connected to MQTT Queue");
} catch (TransportHandlerException e) {
log.warn(AgentConstants.LOG_APPENDER + "Connection to MQTT Broker at: " + mqttBrokerEndPoint +
" failed.\n Will retry in " + timeoutInterval + " milli-seconds.");
}

try{
subscribeToQueue();
agentManager.updateAgentStatus("Subscribed to MQTT Queue");
publishDeviceData();
if (e.getCause() != null && e.getCause() instanceof MqttSecurityException) {
refreshOAuthToken((MqttSecurityException) e.getCause());
}
}

try {
if (isConnected()) {
subscribeToQueue();
agentManager.updateAgentStatus("Subscribed to MQTT Queue");
publishDeviceData();
}
} catch (TransportHandlerException e) {
log.warn(AgentConstants.LOG_APPENDER + "Subscription to MQTT Broker at: " +
mqttBrokerEndPoint + " failed");
Expand All @@ -100,6 +107,26 @@ public void run() {
connectorThread.start();
}

private void refreshOAuthToken(final MqttSecurityException exception) {
Runnable tokenRefresher = new Runnable() {
public void run() {
String authenticationMethod = AgentUtilOperations.getAuthenticationMethod();

try {
if (exception.getReasonCode() == MqttSecurityException.REASON_CODE_FAILED_AUTHENTICATION &&
authenticationMethod.equals(AgentConstants.TOKEN_AUTHENTICATION_METHOD)) {
AgentUtilOperations.refreshOAuthToken();
}
} catch (AgentCoreOperationException e1) {
log.error(AgentConstants.LOG_APPENDER + "Token Refresh Attempt Failed. " + e1);
}
}
};

Thread connectorThread = new Thread(tokenRefresher);
connectorThread.setDaemon(true);
connectorThread.start();
}

@Override
public void processIncomingMessage(MqttMessage message, String... messageParams) {
Expand Down
Loading