Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIOT-GDA-11-003: Create / edit module CloudClientConnector and connect into DeviceDataManager #113

Open
labbenchstudios opened this issue Nov 18, 2020 · 0 comments
Labels
exercise New feature to implement as an exercise
Milestone

Comments

@labbenchstudios
Copy link
Contributor

labbenchstudios commented Nov 18, 2020

Description

  • Create a Java class named CloudClientConnector that implements ICloudClient.
    • NOTE: Depending upon the cloud service provider's capabilities and relevant API's, cloud connectivity can be either pub/sub or request/response (or both). While both are generally available with the services we'll review in this course, one of the more common connection paradigms is via MQTT. Hence, this exercise will focus on the pub/sub paradigm, and will therefore rely upon the MQTT protocol.
  • Integrate CloudClientConnector into DeviceDataManager

Review the README

  • Please see README.md for further information on, and use of, this content.
  • License for embedded documentation and source codes: PIOT-DOC-LIC

Estimated effort may vary greatly

  • The estimated level of effort for this exercise shown in the 'Estimate' section below is a very rough approximation. The actual level of effort may vary greatly depending on your development and test environment, experience with the requisite technologies, and many other factors.

Actions

  • Within the java package programmingtheiot/gda/connection source folder, create a new (or edit the existing) Java class named CloudClientConnector. It will implement ICloudClient, and will instance MqttClientConnector, as it will delegate much of its work to it.
  • Add the following import statements:
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;

import programmingtheiot.common.ConfigConst;
import programmingtheiot.common.ConfigUtil;
import programmingtheiot.common.IDataMessageListener;
import programmingtheiot.common.ResourceNameEnum;
import programmingtheiot.data.DataUtil;
import programmingtheiot.data.SensorData;
import programmingtheiot.data.SystemPerformanceData;
  • Create the following class-scoped variables:
private String topicPrefix = "";
private MqttClientConnector mqttClient = null;
private IDataMessageListener dataMsgListener = null;

// TODO: set to either 0 or 1, depending on which is preferred for your implementation
private int qosLevel = 1;
  • Create a no-arg constructor in CloudClientConnector, as follows:
public CloudClientConnector()
{
	ConfigUtil configUtil = ConfigUtil.getInstance();
	
	this.topicPrefix =
		configUtil.getProperty(ConfigConst.CLOUD_GATEWAY_SERVICE, ConfigConst.BASE_TOPIC_KEY);
	
	// Depending on the cloud service, the topic names may or may not begin with a "/", so this code
	// should be updated according to the cloud service provider's topic naming conventions
	if (topicPrefix == null) {
		topicPrefix = "/";
	} else {
		if (! topicPrefix.endsWith("/")) {
			topicPrefix += "/";
		}
	}
}
  • Implement the ICloudClient interface by simply logging messages indicating the methods have been called.
  • Update connectClient as follows:
@Override
public boolean connectClient()
{
	if (this.mqttClient == null) {
		// TODO: either line should work with recent updates to `MqttClientConnector`
//		this.mqttClient = new MqttClientConnector(true);
		this.mqttClient = new MqttClientConnector(ConfigConst.CLOUD_GATEWAY_SERVICE);
	}
	
	// NOTE: If MqttClientConnector is using the async client, we won't have a complete
	// connection to the cloud-hosted MQTT broker until MqttClientConnector's
	// connectComplete() callback is invoked. The details pertaining to the use
	// of IConnectionListener are covered in PIOT-GDA-11-001 and PIOT-GDA-11-004. 
	return this.mqttClient.connectClient();
}
  • Update disconnectClient() to check if this.mqttClient is non-null, and if so, invoke its disconnect method. Be sure to return a boolean indicating success or failure.
@Override
public boolean disconnectClient()
{
	if (this.mqttClient != null && this.mqttClient.isConnected()) {
		return this.mqttClient.disconnectClient();
	}
	
	return false;
}
  • Add a private method named createTopicName(), as follows:
    • NOTE: This method will simply generate a topic name specific to the cloud service you're using as the integration end point. The this.topicPrefix is set based on contents from the configuration file, so can be further customized by updating PiotConfig.props.
private String createTopicName(ResourceNameEnum resource)
{
	return createTopicName(resource.getDeviceName(), resource.getResourceType());
}

private String createTopicName(String deviceName, String resourceTypeName)
{
	return this.topicPrefix + deviceName + "/" + resourceTypeName;
}
  • Implement the setDataMessageListener() method by setting the passed in IDataMessageListener reference to this.dataMsgListener.
    • NOTE: You may not need this at all, as the MqttClientConnector instance will also have a reference to the IDataMessageListener instance and will handle subscription callbacks accordingly.
  • Implement the following private methods (overloaded versions of publishMessageToCloud()) as follows (you may want to modify it to suit your specific needs):
private boolean publishMessageToCloud(ResourceNameEnum resource, String itemName, String payload)
{
	String topicName = createTopicName(resource) + "-" + itemName;
	
	return publishMessageToCloud(topicName, payload);
}
	
private boolean publishMessageToCloud(String topicName, String payload)
{
	try {
		_Logger.finest("Publishing payload value(s) to CSP: " + topicName);
		
		this.mqttClient.publishMessage(topicName, payload.getBytes(), this.qosLevel);
		
		// NOTE: Depending on the cloud service, it may be necessary to 'throttle'
		// the published messages by limiting to, for example, no more than one
		// per second. While there are a variety of ways to accomplish this,
		// briefly described below are two techniques that may be worth considering
		// if this is a limitation you need to handle in your code:
		// 
		// 1) Add an artificial delay after the call to this.mqttClient.publishMessage().
		//    This can be implemented by sleeping for up to a second after the call.
		//    However, it can also adversely affect the program flow, as this sleep
		//    will block DeviceDataManager, which invoked one of the sendEdgeDataToCloud()
		//    methods that led to this call, and may negatively impact your application.
		// 
		// 2) Implement a Queue which can store both the payload and target topic, and
		//    add a scheduler to pop the oldest message off the Queue (when not empty)
		//    at a regular interval (for example, once per second), and then invoke the
		//    this.mqttClient.publishMessage() method.
		// 
		// Both approaches require thoughtful design considerations of course, and your
		// requirements may demand an alternative approach (or none at all if throttling
		// isn't a concern). Design and implementation details are left up to you.
		
		return true;
	} catch (Exception e) {
		_Logger.warning("Failed to publish message to CSP: " + topicName);
	}
	
	return false;
}
  • Implement the two 'publish' methods defined in ICloudClient (named sendEdgeDataToCloud()) as follows (you may want to modify these to suit your specific needs):
@Override
public boolean sendEdgeDataToCloud(ResourceNameEnum resource, SensorData data)
{
	if (resource != null && data != null) {
		String payload = DataUtil.getInstance().sensorDataToJson(data);
		
		return publishMessageToCloud(resource, data.getName(), payload);
	}
	
	return false;
}

@Override
public boolean sendEdgeDataToCloud(ResourceNameEnum resource, SystemPerformanceData data)
{
	if (resource != null && data != null) {
		SensorData cpuData = new SensorData();
		cpuData.updateData(data);
		cpuData.setName(ConfigConst.CPU_UTIL_NAME);
		cpuData.setValue(data.getCpuUtilization());
		
		boolean cpuDataSuccess = sendEdgeDataToCloud(resource, cpuData);
		
		if (! cpuDataSuccess) {
			_Logger.warning("Failed to send CPU utilization data to cloud service.");
		}
		
		SensorData memData = new SensorData();
		memData.updateData(data);
		memData.setName(ConfigConst.MEM_UTIL_NAME);
		memData.setValue(data.getMemoryUtilization());
		
		boolean memDataSuccess = sendEdgeDataToCloud(resource, memData);
		
		if (! memDataSuccess) {
			_Logger.warning("Failed to send memory utilization data to cloud service.");
		}
		
		return (cpuDataSuccess == memDataSuccess);
	}
	
	return false;
}
  • Implement the subscribe method, as follows (you may want to modify this to suit your specific needs):
    • IMPORTANT NOTE: This is a very basic subscription approach, where any received messages from the cloud service will be handled by the generic messageReceived() method implemented within MqttClientConnector. While you can use this for handling incoming messages from the cloud service, it will require implementation of the appropriate verification logic within messageReceived() to process the MQTT payload and determine what type of message should be processed, as well as where it originated. This can pose some parsing and logical flow challenges, so it's best to use the delegated approach specified for CloudClientConnector in PIOT-GDA-11-004.
@Override
public boolean subscribeToCloudEvents(ResourceNameEnum resource)
{
	boolean success = false;
	
	String topicName = null;
	
	if (this.mqttClient != null && this.mqttClient.isConnected()) {
		topicName = createTopicName(resource);
		
		// NOTE: This is a generic subscribe call - if you use this approach,
		// you will need to update this.mqttClient.messageReceived() to
		//   (1) identify the message source (e.g., CDA or Cloud), 
		//   (2) determine the message type (e.g., actuator command), and
		//   (3) convert the payload into a data container (e.g., ActuatorData)
		// 
		// Once you determine the message source and type, and convert the
		// payload to its appropriate data container, you can then determine
		// where to route the message (e.g., send to the IDataMessageListener
		// instance (which will be DeviceDataManager).
		this.mqttClient.subscribeToTopic(topicName, this.qosLevel);
		
		success = true;
	} else {
		_Logger.warning("Subscription methods only available for MQTT. No MQTT connection to broker. Ignoring. Topic: " + topicName);
	}
	
	return success;
}
  • Implement the unsubscribe method, as follows (you may want to modify this to suit your specific needs):
@Override
public boolean unsubscribeFromCloudEvents(ResourceNameEnum resource)
{
	boolean success = false;
	
	String topicName = null;
	
	if (this.mqttClient != null && this.mqttClient.isConnected()) {
		topicName = createTopicName(resource);
		
		this.mqttClient.unsubscribeFromTopic(topicName);
		
		success = true;
	} else {
		_Logger.warning("Unsubscribe method only available for MQTT. No MQTT connection to broker. Ignoring. Topic: " + topicName);
	}
	
	return success;
}

Integrate with DeviceDataManager

  • Enable (or disable) CloudClientConnector within DeviceDataManager by storing a class-scoped boolean named enableCloudClient to determine if DeviceDataManager will establish a cloud connection or not. Here's one approach:
    • Add an enableCloudClient key / value (boolean flag) to PiotConfig.props.
    • Update DeviceDataManager with a reference to CloudClientConnector. This will only need to be set (and subsequently activated as indicated below) if enableCloudClient is true. For this activity, it will need to be true of course.
    • Create a class-scoped instance of CloudClientConnector within the DeviceDataManager constructor called cloudClient. It's type will be ICloudClient.
    • Edit the startManager() method to include a call to this.cloudClient.connectClient(). NOTE: All connections should be started successfully before the SystemPerformanceManager is started. This will mitigate any potential data loss condition where new system performance data is passed into DeviceDataManager before the cloud connection is established.
    • Edit the stopManager() method to include any appropriate unsubscribe calls (we'll dig into this further in Lab Module 10), followed by a call to this.cloudClient.disconnectClient().
  • Update the handleActuatorCommandRequest() method to handle incoming ActuatorData commands properly.
    • Here is a sample implementation for this method, although your implementation details may differ from that shown below.
@Override
public boolean handleActuatorCommandRequest(ResourceNameEnum resourceName, ActuatorData data)
{
	if (data != null) {
		// NOTE: Feel free to update this log message for debugging and monitoring
		_Logger.log(
			Level.FINE,
			"Actuator request received: {0}. Message: {1}",
			new Object[] {resourceName.getResourceName(), Integer.valueOf((data.getCommand()))});
		
		if (data.hasError()) {
			_Logger.warning("Error flag set for ActuatorData instance.");
		}
		
		// TODO: retrieve this from config file
		int qos = ConfigConst.DEFAULT_QOS;
		
		// TODO: you may want to implement some analysis logic here or
		// in a separate method to determine how best to handle incoming
		// ActuatorData before calling this.sendActuatorCommandtoCda()
		
		// Recall that this private method was implement in Lab Module 10
		// See PIOT-GDA-10-003 for details
		this.sendActuatorCommandtoCda(resourceName, data)
		
		return true;
	} else {
		return false;
	}
}
  • Update the handleSensorMessage() and handleSystemPerformanceMessage() methods to pass their respective data to cloudClient (if cloudClient is non-null and connected). These calls should be the last ones in each method.
    • Here are sample implementations for each, after the suggested updates:
@Override
public boolean handleSensorMessage(ResourceNameEnum resourceName, SensorData data)
{
	if (data != null) {
		_Logger.fine("Handling sensor message: " + data.getName());
		
		if (data.hasError()) {
			_Logger.warning("Error flag set for SensorData instance.");
		}
		
		String jsonData = DataUtil.getInstance().sensorDataToJson(data);
		
		_Logger.fine("JSON [SensorData] -> " + jsonData);
		
		// TODO: retrieve this from config file
		int qos = ConfigConst.DEFAULT_QOS;
		
		// NOTE: Your code may not have a persistenceClient reference or
		// a enablePersistenceClient boolean
		if (this.enablePersistenceClient && this.persistenceClient != null) {
			this.persistenceClient.storeData(resourceName.getResourceName(), qos, data);
		}
		
		this.handleIncomingDataAnalysis(resourceName, data);
		
		this.handleUpstreamTransmission(resourceName, jsonData, qos);
		
		return true;
	} else {
		return false;
	}
}
@Override
public boolean handleSystemPerformanceMessage(ResourceNameEnum resourceName, SystemPerformanceData data)
{
	if (data != null) {
		_Logger.info("Handling system performance message: " + data.getName());
		
		if (data.hasError()) {
			_Logger.warning("Error flag set for SystemPerformanceData instance.");
		}
		
		// TODO: retrieve this from config file
		int qos = ConfigConst.DEFAULT_QOS;
		
		// NOTE: You may want to persist your SystemPerformanceData here
		
		// NOTE: You may want to also analyze the SystemPerformanceData here
		
		this.handleUpstreamTransmission(resourceName, jsonData, qos);
		
		return true;
	} else {
		return false;
	}
}
  • Update the handleUpstreamTransmission() private method with the requisite functionality to send data to the cloud service.
    • Shown below is one way to implement this, although your implementation may differ.
private void handleUpstreamTransmission(ResourceNameEnum resource, String jsonData, int qos)
{
	// TODO: feel free to change the logging levels for debugging and monitoring
	_Logger.fine("Sending JSON data to cloud service: " + resource);
	
	if (this.cloudClient != null) {
		// TODO: handle any failures
		if (this.cloudClient.sendEdgeDataToCloud(resourceName, data)) {
			_Logger.fine("Sent JSON data upstream to CSP.");
		}
	}

Estimate

  • Large

Tests

  • Setup
    • For this initial test, we'll set the cloud gateway configuration in PiotConfig.props to point to the cloud service provider's MQTT broker instance and will need to also ensure the provider's root certificates and credentials are available and loaded via the PIOT-CFG-10-001 instructions previously discussed.
      • NOTE 1: A notional configuration section is shown below. Be sure to replace the host value of TBD to be the target MQTT broker, and to use the appropriate file references for both credFile and certFile (these can reside anywhere on your local filesystem of course, but remember - NEVER commit sensitive data, including these files, to your Git repository!)
      • NOTE 2: The notional configuration section shown assumes a Ubidots configuration. Your own configuration may vary. See https://help.ubidots.com/en/articles/570008-ubidots-mqtt-broker for details.
[Cloud.GatewayService]
credFile       = ./cred/UbidotsCloudCred.props
certFile       = ./cert/UbidotsCloudCert.pem
host           = industrial.api.ubidots.com
port           = 1883
securePort     = 8883
defaultQoS     = 0
keepAlive      = 60
enableAuth     = True
enableCrypt    = True
baseUrl        = 
baseTopic      = /v1.6/devices/
  • Integration tests (in ./src/test/java/programmingtheiot/part03/integration)
    • Run CloudClientConnectorTest. All test cases should pass with relevant output in the log file / console indicating success.
    • If all goes well, your log output may resemble the following:
Apr 06, 2022 11:19:59 PM programmingtheiot.gda.connection.MqttClientConnector initSecureConnectionParameters
INFO: Configuring TLS...
Apr 06, 2022 11:19:59 PM programmingtheiot.gda.connection.MqttClientConnector initSecureConnectionParameters
INFO: PEM file valid. Using secure connection: ./cert/UbidotsCloudCert.pem
Apr 06, 2022 11:19:59 PM programmingtheiot.common.SimpleCertManagementUtil isValid
INFO: Certificate / key file exists: ./cert/UbidotsCloudCert.pem
Apr 06, 2022 11:19:59 PM programmingtheiot.common.SimpleCertManagementUtil loadCertificate
INFO: Configuring SSL using X.509
Apr 06, 2022 11:19:59 PM programmingtheiot.common.SimpleCertManagementUtil importCertificate
INFO: Successfully imported X.509 certificate using entry name UbidotsCloudCert.pem.3138.1 from file: ./cert/UbidotsCloudCert.pem
Apr 06, 2022 11:19:59 PM programmingtheiot.common.SimpleCertManagementUtil importCertificate
INFO: Successfully imported X.509 certificate using entry name UbidotsCloudCert.pem.1938.2 from file: ./cert/UbidotsCloudCert.pem
Apr 06, 2022 11:19:59 PM programmingtheiot.common.SimpleCertManagementUtil loadCertificate
INFO: X.509 certificate load and SSL socket init successful from file: ./cert/UbidotsCloudCert.pem
Apr 06, 2022 11:19:59 PM programmingtheiot.gda.connection.MqttClientConnector initSecureConnectionParameters
INFO: TLS enabled.
Apr 06, 2022 11:19:59 PM programmingtheiot.gda.connection.MqttClientConnector initCredentialConnectionParameters
INFO: Checking if credentials file exists and us loadable...
Apr 06, 2022 11:19:59 PM programmingtheiot.gda.connection.MqttClientConnector initCredentialConnectionParameters
INFO: Credentials now set.
Apr 06, 2022 11:19:59 PM programmingtheiot.gda.connection.MqttClientConnector initClientParameters
INFO: Using URL for broker conn: ssl://industrial.api.ubidots.com:8883
Apr 06, 2022 11:19:59 PM programmingtheiot.gda.connection.MqttClientConnector connectClient
INFO: MQTT client connecting to broker: ssl://industrial.api.ubidots.com:8883
Apr 06, 2022 11:20:00 PM programmingtheiot.gda.connection.MqttClientConnector connectComplete
INFO: MQTT connection successful (is reconnect = false). Broker: ssl://industrial.api.ubidots.com:8883
Apr 06, 2022 11:20:00 PM programmingtheiot.gda.connection.MqttClientConnector subscribeToTopic
INFO: Successfully subscribed to topic: /v1.6/devices/ConstrainedDevice/ActuatorCmd
Apr 06, 2022 11:20:05 PM programmingtheiot.gda.connection.MqttClientConnector deliveryComplete
INFO: Delivered MQTT message with ID: 2
Apr 06, 2022 11:20:05 PM programmingtheiot.gda.connection.MqttClientConnector deliveryComplete
INFO: Delivered MQTT message with ID: 3
Apr 06, 2022 11:20:05 PM programmingtheiot.gda.connection.MqttClientConnector deliveryComplete
INFO: Delivered MQTT message with ID: 4
Apr 06, 2022 11:20:36 PM programmingtheiot.gda.connection.MqttClientConnector unsubscribeFromTopic
INFO: Successfully unsubscribed from topic: /v1.6/devices/ConstrainedDevice/ActuatorCmd
Apr 06, 2022 11:21:26 PM programmingtheiot.gda.connection.MqttClientConnector disconnectClient
INFO: Disconnecting MQTT client from broker: ssl://industrial.api.ubidots.com:8883
@labbenchstudios labbenchstudios added the exercise New feature to implement as an exercise label Nov 18, 2020
@labbenchstudios labbenchstudios added this to the Chapter 11 milestone Nov 18, 2020
@labbenchstudios labbenchstudios changed the title PIOT-GDA-11-002: Create / edit module CloudClientConnector with one minor update to MqttClientConnector PIOT-GDA-11-001: Create / edit module CloudClientConnector with one minor update to MqttClientConnector Nov 18, 2020
@labbenchstudios labbenchstudios added this to Chapter 11 - Cloud Integration in Programming the IoT - Exercises Kanban Board Nov 18, 2020
@labbenchstudios labbenchstudios changed the title PIOT-GDA-11-001: Create / edit module CloudClientConnector with one minor update to MqttClientConnector PIOT-GDA-11-003: Create / edit module CloudClientConnector Nov 23, 2020
@labbenchstudios labbenchstudios changed the title PIOT-GDA-11-003: Create / edit module CloudClientConnector PIOT-GDA-11-004: Create / edit module CloudClientConnector Nov 23, 2020
@labbenchstudios labbenchstudios changed the title PIOT-GDA-11-004: Create / edit module CloudClientConnector PIOT-GDA-11-003: Create / edit module CloudClientConnector Nov 25, 2020
@labbenchstudios labbenchstudios changed the title PIOT-GDA-11-003: Create / edit module CloudClientConnector PIOT-GDA-11-003: Create / edit module CloudClientConnector and connect into DeviceDataManager Feb 28, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
exercise New feature to implement as an exercise
Projects
Programming the IoT - Exercises Kanba...
  
Lab Module 11 - Cloud Integration
Development

No branches or pull requests

1 participant