-
Notifications
You must be signed in to change notification settings - Fork 179
/
mqtt_server_client.dart
233 lines (199 loc) · 9.57 KB
/
mqtt_server_client.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
/*
* Package : mqtt_client
* Author : S. Hamblett <steve.hamblett@linux.com>
* Date : 31/05/2017
* Copyright : S.Hamblett
*/
import 'dart:async';
import 'dart:io';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
/// An annotated simple subscribe/publish usage example for mqtt_server_client. Please read in with reference
/// to the MQTT specification. The example is runnable, also refer to test/mqtt_client_broker_test...dart
/// files for separate subscribe/publish tests.
/// First create a client, the client is constructed with a broker name, client identifier
/// and port if needed. The client identifier (short ClientId) is an identifier of each MQTT
/// client connecting to a MQTT broker. As the word identifier already suggests, it should be unique per broker.
/// The broker uses it for identifying the client and the current state of the client. If you don’t need a state
/// to be hold by the broker, in MQTT 3.1.1 you can set an empty ClientId, which results in a connection without any state.
/// A condition is that clean session connect flag is true, otherwise the connection will be rejected.
/// The client identifier can be a maximum length of 23 characters. If a port is not specified the standard port
/// of 1883 is used.
/// If you want to use websockets rather than TCP see below.
final client = MqttServerClient('test.mosquitto.org', '');
var pongCount = 0; // Pong counter
var pingCount = 0; // Ping counter
Future<int> main() async {
/// A websocket URL must start with ws:// or wss:// or Dart will throw an exception, consult your websocket MQTT broker
/// for details.
/// To use websockets add the following lines -:
/// client.useWebSocket = true;
/// client.port = 80; ( or whatever your WS port is)
/// There is also an alternate websocket implementation for specialist use, see useAlternateWebSocketImplementation
/// Note do not set the secure flag if you are using wss, the secure flags is for TCP sockets only.
/// You can also supply your own websocket protocol list or disable this feature using the websocketProtocols
/// setter, read the API docs for further details here, the vast majority of brokers will support the client default
/// list so in most cases you can ignore this.
/// Set logging on if needed, defaults to off
client.logging(on: false);
/// Set the correct MQTT protocol for mosquito
client.setProtocolV311();
/// If you intend to use a keep alive you must set it here otherwise keep alive will be disabled.
client.keepAlivePeriod = 20;
/// The connection timeout period can be set if needed, the default is 5 seconds.
client.connectTimeoutPeriod = 2000; // milliseconds
/// Add the unsolicited disconnection callback
client.onDisconnected = onDisconnected;
/// Add the successful connection callback
client.onConnected = onConnected;
/// Add a subscribed callback, there is also an unsubscribed callback if you need it.
/// You can add these before connection or change them dynamically after connection if
/// you wish. There is also an onSubscribeFail callback for failed subscriptions, these
/// can fail either because you have tried to subscribe to an invalid topic or the broker
/// rejects the subscribe request.
client.onSubscribed = onSubscribed;
/// Set a ping received callback if needed, called whenever a ping response(pong) is received
/// from the broker. Can be used for health monitoring.
client.pongCallback = pong;
/// Set a ping sent callback if needed, called whenever a ping request(ping) is sent
/// by the client. Can be used for latency calculations.
client.pingCallback = ping;
/// Create a connection message to use or use the default one. The default one sets the
/// client identifier, any supplied username/password and clean session,
/// an example of a specific one below.
final connMess = MqttConnectMessage()
.withClientIdentifier('Mqtt_MyClientUniqueId')
.withWillTopic('willtopic') // If you set this you must set a will message
.withWillMessage('My Will message')
.startClean() // Non persistent session for testing
.withWillQos(MqttQos.atLeastOnce);
print('EXAMPLE::Mosquitto client connecting....');
client.connectionMessage = connMess;
/// Connect the client, any errors here are communicated by raising of the appropriate exception. Note
/// in some circumstances the broker will just disconnect us, see the spec about this, we however will
/// never send malformed messages.
try {
await client.connect();
} on NoConnectionException catch (e) {
// Raised by the client when connection fails.
print('EXAMPLE::client exception - $e');
client.disconnect();
} on SocketException catch (e) {
// Raised by the socket layer
print('EXAMPLE::socket exception - $e');
client.disconnect();
}
/// Check we are connected
if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('EXAMPLE::Mosquitto client connected');
} else {
/// Use status here rather than state if you also want the broker return code.
print(
'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, status is ${client.connectionStatus}');
client.disconnect();
exit(-1);
}
/// Ok, lets try a subscription
print('EXAMPLE::Subscribing to the test/lol topic');
const topic = 'test/lol'; // Not a wildcard topic
client.subscribe(topic, MqttQos.atMostOnce);
/// The client has a change notifier object(see the Observable class) which we then listen to to get
/// notifications of published updates to each subscribed topic.
/// In general you should listen here as soon as possible after connecting, you will not receive any
/// publish messages until you do this.
/// Also you must re-listen after disconnecting.
client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>>? c) {
final recMess = c![0].payload as MqttPublishMessage;
final pt =
MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
/// The above may seem a little convoluted for users only interested in the
/// payload, some users however may be interested in the received publish message,
/// lets not constrain ourselves yet until the package has been in the wild
/// for a while.
/// The payload is a byte buffer, this will be specific to the topic
print(
'EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- $pt -->');
print('');
});
/// If needed you can listen for published messages that have completed the publishing
/// handshake which is Qos dependant. Any message received on this stream has completed its
/// publishing handshake with the broker.
client.published!.listen((MqttPublishMessage message) {
print(
'EXAMPLE::Published notification:: topic is ${message.variableHeader!.topicName}, with Qos ${message.header!.qos}');
});
/// Lets publish to our topic
/// Use the payload builder rather than a raw buffer
/// Our known topic to publish to
const pubTopic = 'Dart/Mqtt_client/testtopic';
final builder = MqttClientPayloadBuilder();
builder.addString('Hello from mqtt_client');
/// Subscribe to it
print('EXAMPLE::Subscribing to the Dart/Mqtt_client/testtopic topic');
client.subscribe(pubTopic, MqttQos.exactlyOnce);
/// Publish it
print('EXAMPLE::Publishing our topic');
client.publishMessage(pubTopic, MqttQos.exactlyOnce, builder.payload!);
/// Ok, we will now sleep a while, in this gap you will see ping request/response
/// messages being exchanged by the keep alive mechanism.
print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(60);
/// Print the ping/pong cycle latency data before disconnecting.
print('EXAMPLE::Keep alive latencies');
print(
'The latency of the last ping/pong cycle is ${client.lastCycleLatency} milliseconds');
print(
'The average latency of all the ping/pong cycles is ${client.averageCycleLatency} milliseconds');
/// Finally, unsubscribe and exit gracefully
print('EXAMPLE::Unsubscribing');
client.unsubscribe(topic);
/// Wait for the unsubscribe message from the broker if you wish.
await MqttUtilities.asyncSleep(2);
print('EXAMPLE::Disconnecting');
client.disconnect();
print('EXAMPLE::Exiting normally');
return 0;
}
/// The subscribed callback
void onSubscribed(String topic) {
print('EXAMPLE::Subscription confirmed for topic $topic');
}
/// The unsolicited disconnect callback
void onDisconnected() {
print('EXAMPLE::OnDisconnected client callback - Client disconnection');
if (client.connectionStatus!.disconnectionOrigin ==
MqttDisconnectionOrigin.solicited) {
print('EXAMPLE::OnDisconnected callback is solicited, this is correct');
} else {
print(
'EXAMPLE::OnDisconnected callback is unsolicited or none, this is incorrect - exiting');
exit(-1);
}
if (pongCount == 3) {
print('EXAMPLE:: Pong count is correct');
} else {
print('EXAMPLE:: Pong count is incorrect, expected 3. actual $pongCount');
}
if (pingCount == 3) {
print('EXAMPLE:: Ping count is correct');
} else {
print('EXAMPLE:: Ping count is incorrect, expected 3. actual $pingCount');
}
}
/// The successful connect callback
void onConnected() {
print(
'EXAMPLE::OnConnected client callback - Client connection was successful');
}
/// Pong callback
void pong() {
print('EXAMPLE::Ping response client callback invoked');
pongCount++;
print(
'EXAMPLE::Latency of this ping/pong cycle is ${client.lastCycleLatency} milliseconds');
}
/// Ping callback
void ping() {
print('EXAMPLE::Ping sent client callback invoked');
pingCount++;
}