You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Update MqttClientConnector with some additional features that allow it to do the following:
Load its configuration parameters from a different section of the PiotConfig.props configuration file.
Allow a package-scoped class or sub-class to directly invoke publish, subscribe, and unsubscribe functions.
Review the README
Please see README.md for further information on, and use of, this content.
License for embedded documentation and source codes: PIOT-DOC-LIC
Estimated effort may vary greatly
The estimated level of effort for this exercise shown in the 'Estimate' section below is a very rough approximation. The actual level of effort may vary greatly depending on your development and test environment, experience with the requisite technologies, and many other factors.
Actions
NOTE: All of these updates will occur in the class MqttClientConnector.
Update the MqttClientConnector Class-Scoped Variables and Constructors
Add two additional constructors to MqttClientConnector to support loading the MQTT configuration from a custom section or the cloud gateway service section as follows:
publicMqttClientConnector()
{
this(false);
}
publicMqttClientConnector(booleanuseCloudGatewayConfig)
{
this(useCloudGatewayConfig ? ConfigConst.CLOUD_GATEWAY_SERVICE : null);
}
publicMqttClientConnector(StringcloudGatewayConfigSectionName)
{
super();
if (cloudGatewayConfigSectionName != null && cloudGatewayConfigSectionName.trim().length() > 0) {
this.useCloudGatewayConfig = true;
initClientParameters(cloudGatewayConfigSectionName);
} else {
this.useCloudGatewayConfig = false;
// NOTE: This next method call should have already been created// in Lab Module 10. It is simply a delegate to handle parsing// of the appropriate configuration file sectioninitClientParameters(ConfigConst.MQTT_GATEWAY_SERVICE);
}
}
Ensure MqttClientConnector is Configured Properly
Make sure you've updated MqttClientConnector with the instructions specified in PIOT-GDA-10-001 by implementing the private method private void initClientParameters(String configSection) and moving your MQTT configuration and initialization logic previously in the MqttClientConnector constructor to initClientParameters().
Add Setter for IConnectionListener
Add in the following public method. This will enable the MqttClientConnector to notify an external listener (implementing IConnectionListener) of connect and disconnect events.
Add Protected Handler Methods for Sending / Receiving Messages
Add protected method implementations for publish, subscribe, and unsubscribe. The signatures will each use String parameters for the topic names instead of ResourceNameEnum. This will allow a package-scoped class, or a sub-class, to invoke these methods directly, and permit modification of the topic name to meet the requirements of the Cloud Service Provider's topic naming conventions.
The objective with these additions is to migrate the logic from your public publishMessage(), subscribeToTopic() and unsubscribeFromTopic() methods to these new protected implementations.
Here are some sample implementations for each method, which follow the implementation pattern from the exercises in Lab Module 10. You should modify these to suit your specific needs:
Sample updated protected publishMessage()
protectedbooleanpublishMessage(StringtopicName, byte[] payload, intqos)
{
if (topicName == null) {
_Logger.warning("Resource is null. Unable to publish message: " + this.brokerAddr);
returnfalse;
}
if (payload == null || payload.length == 0) {
_Logger.warning("Message is null or empty. Unable to publish message: " + this.brokerAddr);
returnfalse;
}
if (qos < 0 || qos > 2) {
_Logger.warning("Invalid QoS. Using default. QoS requested: " + qos);
// TODO: retrieve default QoS from config fileqos = ConfigConst.DEFAULT_QOS;
}
try {
MqttMessagemqttMsg = newMqttMessage();
mqttMsg.setQos(qos);
mqttMsg.setPayload(payload);
this.mqttClient.publish(topicName, mqttMsg);
returntrue;
} catch (Exceptione) {
_Logger.log(Level.SEVERE, "Failed to publish message to topic: " + topicName, e);
}
returnfalse;
}
Sample updated protected subscribeToTopic()
NOTE: There are two signatures provided - one allows passing a special callback handler that implements IMqttMessageListener for convenience.
protectedbooleansubscribeToTopic(StringtopicName, intqos)
{
returnsubscribeToTopic(topicName, qos, null);
}
protectedbooleansubscribeToTopic(StringtopicName, intqos, IMqttMessageListenerlistener)
{
// NOTE: This is the preferred method for subscribing to a given topic,// as it allows the use of an IMqttMessageListener to be defined and// registered as the handler for incoming messages pertaining to the// given topic 'topicName'.if (topicName == null) {
_Logger.warning("Resource is null. Unable to subscribe to topic: " + this.brokerAddr);
returnfalse;
}
if (qos < 0 || qos > 2) {
_Logger.warning("Invalid QoS. Using default. QoS requested: " + qos);
// TODO: retrieve default QoS from config fileqos = ConfigConst.DEFAULT_QOS;
}
try {
if (listener != null) {
this.mqttClient.subscribe(topicName, qos, listener);
_Logger.info("Successfully subscribed to topic with listener: " + topicName);
} else {
this.mqttClient.subscribe(topicName, qos);
_Logger.info("Successfully subscribed to topic: " + topicName);
}
returntrue;
} catch (Exceptione) {
_Logger.log(Level.SEVERE, "Failed to subscribe to topic: " + topicName, e);
}
returnfalse;
}
Sample updated protected unsubscribeFromTopic()
protectedbooleanunsubscribeFromTopic(StringtopicName)
{
if (topicName == null) {
_Logger.warning("Resource is null. Unable to unsubscribe from topic: " + this.brokerAddr);
returnfalse;
}
try {
this.mqttClient.unsubscribe(topicName);
_Logger.info("Successfully unsubscribed from topic: " + topicName);
returntrue;
} catch (Exceptione) {
_Logger.log(Level.SEVERE, "Failed to unsubscribe from topic: " + topicName, e);
}
returnfalse;
}
Update the existing public publishMessage(), subscribeToTopic(), and unsubscribeFromTopic() methods to call these new protected methods instead.
Here are some sample implementations, which perform basic validation checks, then invoke the newly created protected methods defined earlier in this exercise. Your own implementation may vary:
Sample updated public publishMessage()
@OverridepublicbooleanpublishMessage(ResourceNameEnumtopicName, Stringmsg, intqos)
{
if (topicName == null) {
_Logger.warning("Resource is null. Unable to publish message: " + this.brokerAddr);
returnfalse;
}
if (msg == null || msg.length() == 0) {
_Logger.warning("Message is null or empty. Unable to publish message: " + this.brokerAddr);
returnfalse;
}
returnpublishMessage(topicName.getResourceName(), msg.getBytes(), qos);
}
Sample updated public subscribeToTopic()
@OverridepublicbooleansubscribeToTopic(ResourceNameEnumtopicName, intqos)
{
if (topicName == null) {
_Logger.warning("Resource is null. Unable to subscribe to topic: " + this.brokerAddr);
returnfalse;
}
returnsubscribeToTopic(topicName.getResourceName(), qos);
}
Sample updated public unsubscribeFromTopic()
@OverridepublicbooleanunsubscribeFromTopic(ResourceNameEnumtopicName)
{
if (topicName == null) {
_Logger.warning("Resource is null. Unable to unsubscribe from topic: " + this.brokerAddr);
returnfalse;
}
returnunsubscribeFromTopic(topicName.getResourceName());
}
Update the connectComplete() callback implementation to support either local topic subscriptions or cloud topic subscriptions using either Option 1 or Option 2 described below.
NOTE: Recall that this approach is described in PIOT-GDA-10-002. If you used Option 1 for that approach, consider using it here as well, and vice-versa if you selected Option 2. In both cases, you'll check the value of this.useCloudGatewayConfig. If it's true, you'll subscribe to cloud-specific topics; otherwise, you'll invoke the logic you implemented in Lab Module 10.
Subscribe - Option 1
In the connectComplete() callback, update the logic from Lab Module 10 based on the value of this.useCloudGatewayConfig:
@OverridepublicvoidconnectComplete(booleanreconnect, StringserverURI)
{
_Logger.info("MQTT connection successful (is reconnect = " + reconnect + "). Broker: " + serverURI);
intqos = 1;
// Option 1if (! this.useCloudGatewayConfig) {
this.subscribeToTopic(ResourceNameEnum.CDA_ACTUATOR_RESPONSE_RESOURCE, qos);
this.subscribeToTopic(ResourceNameEnum.CDA_SENSOR_MSG_RESOURCE, qos);
this.subscribeToTopic(ResourceNameEnum.CDA_SYSTEM_PERF_MSG_RESOURCE, qos);
// IMPORTANT NOTE: You'll have to parse each message type in the callback method// `public void messageArrived(String topic, MqttMessage msg) throws Exception`
}
// This call enables the MqttClientConnector to notify another listener// about the connection now being complete. This will be important for// the CloudClientConnector implementation, is it needs to know when// this client is finally connected with the cloud-hosted MQTT broker.if (this.connListener != null) {
this.connListener.onConnect();
}
}
Subscribe - Option 2
In the connectComplete() callback, add the inner class callback and subscription:
@OverridepublicvoidconnectComplete(booleanreconnect, StringserverURI)
{
_Logger.info("MQTT connection successful (is reconnect = " + reconnect + "). Broker: " + serverURI);
intqos = 1;
// Option 2try {
if (! this.useCloudGatewayConfig) {
_Logger.info("Subscribing to topic: " + ResourceNameEnum.CDA_ACTUATOR_RESPONSE_RESOURCE.getResourceName());
this.mqttClient.subscribe(
ResourceNameEnum.CDA_ACTUATOR_RESPONSE_RESOURCE.getResourceName(),
qos,
newActuatorResponseMessageListener(ResourceNameEnum.CDA_ACTUATOR_RESPONSE_RESOURCE, this.dataMsgListener));
// IMPORTANT NOTE: You'll have to create a `subscribe()` call that delegates// incoming SensorData and SystemPerformanceData messages using your newly// created SensorDataMessageListener and SystemPerformanceDataMessageListener// class instances
}
} catch (MqttExceptione) {
_Logger.warning("Failed to subscribe to CDA actuator response topic.");
}
// This call enables the MqttClientConnector to notify another listener// about the connection now being complete. This will be important for// the CloudClientConnector implementation, is it needs to know when// this client is finally connected with the cloud-hosted MQTT broker.if (this.connListener != null) {
this.connListener.onConnect();
}
}
Estimate
Medium
Tests
Setup
For this initial test, we'll simply set the cloud gateway configuration in PiotConfig.props to be the same as those for the MQTT gateway configuration and point the host to localhost. Since you're testing against your local MQTT broker instance, you can choose to enable TLS or not - this example shows it enabled based on the instructions in PIOT-GDA-10-001
#
# Cloud client configuration information
#
[Cloud.GatewayService]
credFile = ./cred/PiotCloudCred.props
certFile = ./cert/PiotCloudCert.pem
host = localhost
port = 1883
securePort = 8883
defaultQoS = 0
keepAlive = 30
enableCrypt = True
#
# MQTT client configuration information
#
# NOTE: `credFile` and `certFile` will only be set in the config for your local MQTT
# broker if you've enabled encryption and have user / password authentication
[Mqtt.GatewayService]
credFile = ./cred/PiotMqttCred.props
certFile = ./cert/server.crt
host = localhost
port = 1883
securePort = 8883
defaultQoS = 0
keepAlive = 30
enableCrypt = True
NOTE: The purpose of re-running MQTT-specific integration tests from Lab Module 10 (Part 03) is to ensure your MqttClientConnector still functions as expected. You may need to adjust these tests to work with your configuration (see IMPORTANT NOTE 2 and others below).
Integration tests (in ./src/test/java/programmingtheiot/part03/integration)
IMPORTANT NOTE 1: Functionally, MqttClientConnector should be no different than before; however, it's important to verify no regressions have been introduced.
IMPORTANT NOTE 2: Keep in mind that you're likely using the MqttAsyncClient in your MqttClientConnector. This will cause potential timing issues with these tests, so you may need to introduce an artificial delay within MqttClientConnectorTest after each call to connectClient() and disconnectClient() to account for the asynchronous nature of connecting and disconnecting from the broker.
Make sure your local Mosquitto (or other) MQTT broker is running and client access is configured properly in the Mqtt.GatewayService section of your PiotConfig.props configuration file.
Run MqttClientConnectorTest. All test cases should pass with relevant output in the log file / console indicating success.
NOTE: Be sure to review your implementation of MqttClientConnector, as use of MqttAsyncClient may cause some timing issues with some of the tests in MqttClientConnectorTest, as mentioned under IMPORTANT NOTE 2 above.
The text was updated successfully, but these errors were encountered:
labbenchstudios
changed the title
PIOT-GDA-11-002: Update the MqttClientConnector with additional features
PIOT-GDA-11-001: Update the MqttClientConnector with additional features
Nov 25, 2020
Description
MqttClientConnector
with some additional features that allow it to do the following:Review the README
Estimated effort may vary greatly
Actions
NOTE: All of these updates will occur in the class
MqttClientConnector
.Update the MqttClientConnector Class-Scoped Variables and Constructors
MqttClientConnector
to support loading the MQTT configuration from a custom section or the cloud gateway service section as follows:Ensure MqttClientConnector is Configured Properly
MqttClientConnector
with the instructions specified in PIOT-GDA-10-001 by implementing the private methodprivate void initClientParameters(String configSection)
and moving your MQTT configuration and initialization logic previously in theMqttClientConnector
constructor toinitClientParameters()
.Add Setter for IConnectionListener
MqttClientConnector
to notify an external listener (implementingIConnectionListener
) of connect and disconnect events.Add Protected Handler Methods for Sending / Receiving Messages
String
parameters for the topic names instead ofResourceNameEnum
. This will allow a package-scoped class, or a sub-class, to invoke these methods directly, and permit modification of the topic name to meet the requirements of the Cloud Service Provider's topic naming conventions.publishMessage()
,subscribeToTopic()
andunsubscribeFromTopic()
methods to these new protected implementations.Sample updated protected
publishMessage()
Sample updated protected
subscribeToTopic()
IMqttMessageListener
for convenience.Sample updated protected
unsubscribeFromTopic()
publishMessage()
,subscribeToTopic()
, andunsubscribeFromTopic()
methods to call these new protected methods instead.Sample updated public
publishMessage()
Sample updated public
subscribeToTopic()
Sample updated public
unsubscribeFromTopic()
connectComplete()
callback implementation to support either local topic subscriptions or cloud topic subscriptions using either Option 1 or Option 2 described below.this.useCloudGatewayConfig
. If it's true, you'll subscribe to cloud-specific topics; otherwise, you'll invoke the logic you implemented in Lab Module 10.Subscribe - Option 1
connectComplete()
callback, update the logic from Lab Module 10 based on the value ofthis.useCloudGatewayConfig
:Subscribe - Option 2
connectComplete()
callback, add the inner class callback and subscription:Estimate
Tests
PiotConfig.props
to be the same as those for the MQTT gateway configuration and point the host to localhost. Since you're testing against your local MQTT broker instance, you can choose to enable TLS or not - this example shows it enabled based on the instructions in PIOT-GDA-10-001NOTE: The purpose of re-running MQTT-specific integration tests from Lab Module 10 (Part 03) is to ensure your
MqttClientConnector
still functions as expected. You may need to adjust these tests to work with your configuration (see IMPORTANT NOTE 2 and others below).MqttClientConnector
should be no different than before; however, it's important to verify no regressions have been introduced.MqttAsyncClient
in yourMqttClientConnector
. This will cause potential timing issues with these tests, so you may need to introduce an artificial delay withinMqttClientConnectorTest
after each call toconnectClient()
anddisconnectClient()
to account for the asynchronous nature of connecting and disconnecting from the broker.Mqtt.GatewayService
section of yourPiotConfig.props
configuration file.MqttClientConnectorTest
. All test cases should pass with relevant output in the log file / console indicating success.MqttClientConnector
, as use ofMqttAsyncClient
may cause some timing issues with some of the tests inMqttClientConnectorTest
, as mentioned under IMPORTANT NOTE 2 above.The text was updated successfully, but these errors were encountered: