Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIOT-GDA-10-002: Update MqttClientConnector to send received messages to an IDataMessageListener instance #92

Open
labbenchstudios opened this issue Nov 8, 2020 · 0 comments
Labels
exercise New feature to implement as an exercise
Milestone

Comments

@labbenchstudios
Copy link
Contributor

labbenchstudios commented Nov 8, 2020

Description

  • Update MqttClientConnector to subscribe to the CDA's topics related to SensorData messages, SystemPerformanceData messages, and ActuatorData response messages.
    • You can choose to handle all of these topics using a single messageArrived() callback implementation or create a separate callback class for each topic that implements the IMqttMessageListener interface.

Review the README

  • Please see README.md for further information on, and use of, this content.
  • License for embedded documentation and source codes: PIOT-DOC-LIC

Estimated effort may vary greatly

  • The estimated level of effort for this exercise shown in the 'Estimate' section below is a very rough approximation. The actual level of effort may vary greatly depending on your development and test environment, experience with the requisite technologies, and many other factors.

Actions

IMPORTANT NOTE 1: You will likely need to use MqttAsyncClient in place of MqttClient depending on the integration use cases you implement. Why? You can encounter a deadlock condition using the synchronous MqttClient if, for example, you're subscribed to topic A, receive a message from the broker on topic A, and then attempt to re-publish that message on topic B. Simply change the MqttClient references (declaration and new instance) to MqttAsyncClient, and re-run your tests from Lab Module 07 and here in this Lab Module to verify the change executes as expected.

IMPORTANT NOTE 2: If you have not yet run PIOT-INT-10-001, be sure to do so BEFORE switching the MqttClientConnector code from the synchronous MqttClient to the asynchronous MqttAsyncClient. It will not success if using the async MqttAsyncClient.

Using the Asynchronous MQTT Client (MqttAsyncClient)

  • Change the MQTT client declaration to use the async client, then update the connectClient() method accordingly.
    • Two examples follow: one for the class-scoped variables, and the other for the connection logic.
// NOTE: MQTT client updated to use async client vs sync client
private MqttAsyncClient      mqttClient = null;
// private MqttClient           mqttClient = null;
	
@Override
public boolean connectClient()
{
	try {
		if (this.mqttClient == null) {
			// NOTE: MQTT client updated to use async client vs sync client
			this.mqttClient = new MqttAsyncClient(this.brokerAddr, this.clientID, this.persistence);
//			this.mqttClient = new MqttClient(this.brokerAddr, this.clientID, this.persistence);
			
			this.mqttClient.setCallback(this);
		}
		
		if (! this.mqttClient.isConnected()) {
			_Logger.info("MQTT client connecting to broker: " + this.brokerAddr);
			
			this.mqttClient.connect(this.connOpts);
			
			// NOTE: When using the async client, returning 'true' here doesn't mean
			// the client is actually connected - yet. Use the connectComplete() callback
			// to determine result of connectClient().
			return true;
		} else {
			_Logger.warning("MQTT client already connected to broker: " + this.brokerAddr);
		}
	} catch (MqttException e) {
		// TODO: handle this exception
		
		_Logger.log(Level.SEVERE, "Failed to connect MQTT client to broker: " + this.brokerAddr, e);
	}
	
	return false;
}

Update the subscriber functionality

  • Update the subscriber functionality to support the use of either IDataMessageListener for callbacks, or another callback method contained within MqttClientConnector. There are two ways to support this, as follows:
    • NOTE: Both subscription options described below will be invoked as part of the connectComplete() callback once the connectClient() is called and the connection to the broker succeeds, ensuring the necessary topic subscriptions are called after a successful broker connection.
      • IMPORTANT: If you've already added the subscription calls described below within your GDA's DeviceDataManager (after invoking the connectClient() method on your MQTT client instance within DeviceDataManager, for instance, during the startManager() call), you should remove that logic before moving forward.
    • Option 1: Register for all subscription notifications using the existing mqttClient.messageArrived() callback already in place, and either parse the content within the method before sending to the appropriate IDataMessageListener callback instance
    • Option 2: Define a unique callback handler for each topic requiring special handling by implementing a new class which implements the IMqttMessageListener interface. You can use inner classes or create separate external classes.
  • Implement the connectComplete() callback by subscribing to the given topics using either Option 1 or Option 2 above.

Subscribe - Option 1

  • In the connectComplete() callback, add the generic subscriptions:
@Override
public void connectComplete(boolean reconnect, String serverURI)
{
	_Logger.info("MQTT connection successful (is reconnect = " + reconnect + "). Broker: " + serverURI);
	
	int qos = 1;
	
	this.subscribeToTopic(ResourceNameEnum.CDA_ACTUATOR_RESPONSE_RESOURCE, qos);
	this.subscribeToTopic(ResourceNameEnum.CDA_SENSOR_MSG_RESOURCE, qos);
	this.subscribeToTopic(ResourceNameEnum.CDA_SYSTEM_PERF_MSG_RESOURCE, qos);
	
	// IMPORTANT NOTE: You'll have to parse each message type in the callback method
	// `public void messageArrived(String topic, MqttMessage msg) throws Exception`
}

Subscribe - Option 2

  • Create one private inner class for each subscription (REMEMBER, there are three: SensorData messages, SystemPerformanceData messages, and ActuatorData response messages - you'll need one inner class for EACH).
    • Here's an example for the ActuatorData response messages, which can be embedded at the end of MqttClientConnector:
private class ActuatorResponseMessageListener implements IMqttMessageListener
{
	private ResourceNameEnum resource = null;
	private IDataMessageListener dataMsgListener = null;
	
	ActuatorResponseMessageListener(ResourceNameEnum resource, IDataMessageListener dataMsgListener)
	{
		this.resource = resource;
		this.dataMsgListener = dataMsgListener;
	}
	
	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception
	{
		try {
			ActuatorData actuatorData =
				DataUtil.getInstance().jsonToActuatorData(new String(message.getPayload()));
			
			// optionally, log a message indicating data was received
			_Logger.info("Received ActuatorData response: " + actuatorData.getValue());
				

			if (this.dataMsgListener != null) {
				this.dataMsgListener.handleActuatorCommandResponse(resource, actuatorData);
			}
		} catch (Exception e) {
			_Logger.warning("Failed to convert message payload to ActuatorData.");
		}
	}
	
}
  • IMPORTANT NOTE: Don't forget to create an IMqttMessageListener to handle both SensorData and SystemPerformanceData messages! Use the template above for ActuatorResponseMessageListener to assist with this. Here are two shell examples - you will need to implement these on your own:
private class SensorDataMessageListener implements IMqttMessageListener
{
	private ResourceNameEnum resource = null;
	private IDataMessageListener dataMsgListener = null;
	
	SensorDataMessageListener(ResourceNameEnum resource, IDataMessageListener dataMsgListener)
	{
		this.resource = resource;
		this.dataMsgListener = dataMsgListener;
	}
	
	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception
	{
		try {
			// TODO: Extract the payload and convert the JSON to SensorData
			
			// optionally, log a message indicating data was received

			// TODO: invoke the dataMsgListener's callback to handle
			// incoming SensorData messages
		} catch (Exception e) {
			// TODO: handle any Exception that may be thrown
		}
	}	
}
private class SystemPerformanceDataMessageListener implements IMqttMessageListener
{
	private ResourceNameEnum resource = null;
	private IDataMessageListener dataMsgListener = null;
	
	SystemPerformanceDataMessageListener(ResourceNameEnum resource, IDataMessageListener dataMsgListener)
	{
		this.resource = resource;
		this.dataMsgListener = dataMsgListener;
	}
	
	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception
	{
		try {
			// TODO: Extract the payload and convert the JSON to SystemPerformanceData
			
			// optionally, log a message indicating data was received

			// TODO: invoke the dataMsgListener's callback to handle
			// incoming SystemPerformanceData messages
		} catch (Exception e) {
			// TODO: handle any Exception that may be thrown
		}
	}	
}
  • In the connectComplete() callback, add the inner class callback and subscription:
@Override
public void connectComplete(boolean reconnect, String serverURI)
{
	_Logger.info("MQTT connection successful (is reconnect = " + reconnect + "). Broker: " + serverURI);
	
	int qos = 1;
	
	// Option 2
	try {
		_Logger.info("Subscribing to topic: " + ResourceNameEnum.CDA_ACTUATOR_RESPONSE_RESOURCE.getResourceName());
			
		this.mqttClient.subscribe(
			ResourceNameEnum.CDA_ACTUATOR_RESPONSE_RESOURCE.getResourceName(),
			qos,
			new ActuatorResponseMessageListener(ResourceNameEnum.CDA_ACTUATOR_RESPONSE_RESOURCE, this.dataMsgListener));

		_Logger.info("Subscribing to topic: " + ResourceNameEnum.CDA_SENSOR_MSG_RESOURCE.getResourceName());
		
		this.subscribeToTopic(
			ResourceNameEnum.CDA_SENSOR_MSG_RESOURCE.getResourceName(),
			qos,
			new SensorDataMessageListener(ResourceNameEnum.CDA_SENSOR_MSG_RESOURCE, this.dataMsgListener));
		
		_Logger.info("Subscribing to topic: " + ResourceNameEnum.CDA_SYSTEM_PERF_MSG_RESOURCE.getResourceName());
		
		this.subscribeToTopic(
			ResourceNameEnum.CDA_SYSTEM_PERF_MSG_RESOURCE.getResourceName(),
			qos,
			new SystemPerformanceDataMessageListener(ResourceNameEnum.CDA_SYSTEM_PERF_MSG_RESOURCE, this.dataMsgListener));
	} catch (MqttException e) {
		_Logger.warning("Failed to subscribe to CDA actuator response topic.");
	}
}

Update DeviceDataManager

  • In DeviceDataManager, comment out the subscribe calls in the startManager() method.
    • NOTE: Your startManager() method may include additional calls not depicted below.
public void startManager()
{
	if (this.mqttClient != null) {
		if (this.mqttClient.connectClient()) {
			_Logger.info("Successfully connected MQTT client to broker.");
			
			// TODO: read this from the configuration file
			//int qos = ConfigConst.DEFAULT_QOS;
			
			// TODO: check the return value for each and take appropriate action
			
			// IMPORTANT NOTE: The 'subscribeToTopic()' method calls shown
			// below will be moved to MqttClientConnector.connectComplete()
			// in Lab Module 10. For now, they can remain here.
			//this.mqttClient.subscribeToTopic(ResourceNameEnum.GDA_MGMT_STATUS_MSG_RESOURCE, qos);
			//this.mqttClient.subscribeToTopic(ResourceNameEnum.CDA_ACTUATOR_RESPONSE_RESOURCE, qos);
			//this.mqttClient.subscribeToTopic(ResourceNameEnum.CDA_SENSOR_MSG_RESOURCE, qos);
			//this.mqttClient.subscribeToTopic(ResourceNameEnum.CDA_SYSTEM_PERF_MSG_RESOURCE, qos);
		} else {
			_Logger.severe("Failed to connect MQTT client to broker.");
			
			// TODO: take appropriate action
		}
	}
	
	// TODO: Other calls may need to be included, such as starting
	// the CoAP client instead of the CoAP server, depending on
	// your configuration settings
	
	if (this.enableCoapServer && this.coapServer != null) {
		if (this.coapServer.startServer()) {
			_Logger.info("CoAP server started.");
		} else {
			_Logger.severe("Failed to start CoAP server. Check log file for details.");
		}
	}
	
	if (this.sysPerfMgr != null) {
		this.sysPerfMgr.startManager();
	}
}

**Estimate**

- Medium

**Tests**

- Start your local MQTT broker using the default configuration and execute the following - for now, WITHOUT TLS enabled:
  - Simply re-run your `MqttClientConnectorTest` test cases - no encryption is required for this, as it's only to verify that your code is still functioning properly.
    - For simplicity, you may want to run only the `testConnectAndDisconnect()` test to ensure connectivity and the newly added subscription logic is working properly.
  - IMPORTANT NOTE 1: TLS-specific integration tests will be outlined in [PIOT-INT-10-004](https://github.com/programming-the-iot/book-exercise-tasks/issues/114)
  - IMPORTANT NOTE 2: If you're using `MqttAsyncClient` in your `MqttClientConnector`, you may encounter some timing issues with these tests. You may need to introduce an artificial delay within `MqttClientConnectorTest` after each call to `connectClient()` and `disconnectClient()` to account for the asynchronous nature of connecting and disconnecting from the broker when `MqttAsyncClient` is used as the MQTT client instance.
- Add and run a new test case within `MqttClientConnectorTest`.
  - Disable the other tests by commenting out the `@Test` annotation.
  - Your new test method should ensure Actuator Response messages are received and processed by the implementation from this exercise (either Option 1 or 2).
  - Here's a sample test case implementation to consider:
```java
@Test
public void testActuatorCommandResponseSubscription()
{
	int qos = 0;
	
	assertTrue(this.mqttClient.connectClient());
	
	try {
		Thread.sleep(2000);
	} catch (Exception e) {
		// ignore
	}
	
	ActuatorData ad = new ActuatorData();
	ad.setValue((float) 12.3);
	ad.setAsResponse();
	
	String adJson = DataUtil.getInstance().actuatorDataToJson(ad);
	
	assertTrue(this.mqttClient.publishMessage(ResourceNameEnum.CDA_ACTUATOR_RESPONSE_RESOURCE, adJson, qos));
	
	try {
		Thread.sleep(2000);
	} catch (Exception e) {
		// ignore
	}
	
	assertTrue(this.mqttClient.disconnectClient());
	
	try {
		Thread.sleep(2000);
	} catch (Exception e) {
		// ignore
	}
}
  • If your test executes successfully, the log output will look similar to the following:
Apr 05, 2022 3:20:39 PM programmingtheiot.gda.connection.MqttClientConnector initCredentialConnectionParameters
WARNING: No credentials are set.
Apr 05, 2022 3:20:39 PM programmingtheiot.gda.connection.MqttClientConnector initClientParameters
INFO: Using URL for broker conn: tcp://localhost:1883
Apr 05, 2022 3:20:39 PM programmingtheiot.gda.connection.MqttClientConnector connectClient
INFO: MQTT client connecting to broker: tcp://localhost:1883
Apr 05, 2022 3:20:39 PM programmingtheiot.gda.connection.MqttClientConnector connectComplete
INFO: MQTT connection successful (is reconnect = false). Broker: tcp://localhost:1883
Apr 05, 2022 3:20:39 PM programmingtheiot.gda.connection.MqttClientConnector connectComplete
INFO: Subscribing to topic: PIOT/ConstrainedDevice/ActuatorResponse
Apr 05, 2022 3:20:41 PM programmingtheiot.gda.connection.MqttClientConnector deliveryComplete
INFO: Delivered MQTT message with ID: 0
Apr 05, 2022 3:20:41 PM programmingtheiot.gda.connection.MqttClientConnector$ActuatorResponseMessageListener messageArrived
INFO: Received ActuatorData response: 12.3
Apr 05, 2022 3:20:43 PM programmingtheiot.gda.connection.MqttClientConnector disconnectClient
INFO: Disconnecting MQTT client from broker: tcp://localhost:1883
@labbenchstudios labbenchstudios added the exercise New feature to implement as an exercise label Nov 10, 2020
@labbenchstudios labbenchstudios added this to the Chapter 10 milestone Nov 10, 2020
@labbenchstudios labbenchstudios added this to Chapter 10 - Edge Integration in Programming the IoT - Exercises Kanban Board Nov 10, 2020
@labbenchstudios labbenchstudios changed the title PIOT-GDA-10-002: Update MqttClientConnector to send received messages to an IDataMessageListener instance PIOT-GDA-10-001: Update MqttClientConnector to send received messages to an IDataMessageListener instance Nov 18, 2020
@labbenchstudios labbenchstudios changed the title PIOT-GDA-10-001: Update MqttClientConnector to send received messages to an IDataMessageListener instance PIOT-GDA-10-002: Update MqttClientConnector to send received messages to an IDataMessageListener instance Nov 18, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
exercise New feature to implement as an exercise
Projects
Programming the IoT - Exercises Kanba...
  
Lab Module 10 - Edge Integration
Development

No branches or pull requests

1 participant