Skip to content

Commit

Permalink
Issue531 (#547)
Browse files Browse the repository at this point in the history
* Issue 531

* Issue 531

* Issue 531

* Issue 531
  • Loading branch information
shamblett committed Jul 9, 2024
1 parent a26d005 commit 9539532
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 6 deletions.
6 changes: 5 additions & 1 deletion example/mqtt_server_client_websocket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ final client = MqttServerClient('ws://test.mosquitto.org', '');
Future<int> main() async {
client.useWebSocket = true;
client.port = 8080; // ( or whatever your ws port is)
/// You can also supply your own websocket protocol list or disable this feature using the websocketProtocols
/// You can supply your own websocket protocol list or disable this feature using the websocketProtocols
/// setter, read the API docs for further details here, the vast majority of brokers will support the client default
/// list so in most cases you can ignore this. Mosquito needs the single default setting.
client.websocketProtocols = MqttClientConstants.protocolsSingleDefault;

/// You can supply a list of headers to send with the websocket request.
/// Some brokers are known to need their own special headers for auth etc.
client.websocketHeader = {'sjh-test': 'SJH'};

/// Set logging on if needed, defaults to off
client.logging(on: false);

Expand Down
4 changes: 4 additions & 0 deletions example/mqtt_server_client_websocket_secure.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ Future<int> main() async {
/// list so in most cases you can ignore this. Mosquito needs the single default setting.
client.websocketProtocols = MqttClientConstants.protocolsSingleDefault;

/// You can supply a list of headers to send with the websocket request.
/// Some brokers are known to need their own special headers for auth etc.
client.websocketHeader = {'sjh-test': 'SJH'};

/// Set logging on if needed, defaults to off
client.logging(on: false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ abstract class MqttConnectionHandlerBase implements IMqttConnectionHandler {
/// User supplied websocket protocols
List<String>? websocketProtocols;

/// User supplied websocket headers
Map<String, dynamic>? websocketHeaders;

/// The connection
@protected
late MqttConnectionBase connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class MqttServerWsConnection extends MqttServerConnection<WebSocket> {
/// The websocket subprotocol list
List<String> protocols = MqttClientConstants.protocolsMultipleDefault;

/// User defined websocket headers
Map<String, dynamic>? headers;

/// Connect
@override
Future<MqttClientConnectionStatus?> connect(String server, int port) {
Expand Down Expand Up @@ -51,7 +54,8 @@ class MqttServerWsConnection extends MqttServerConnection<WebSocket> {
try {
// Connect and save the socket.
WebSocket.connect(uriString,
protocols: protocols.isNotEmpty ? protocols : null)
protocols: protocols.isNotEmpty ? protocols : null,
headers: headers)
.then((socket) {
client = socket;
readWrapper = ReadWrapper();
Expand Down Expand Up @@ -101,7 +105,8 @@ class MqttServerWsConnection extends MqttServerConnection<WebSocket> {
try {
// Connect and save the socket.
WebSocket.connect(uriString,
protocols: protocols.isNotEmpty ? protocols : null)
protocols: protocols.isNotEmpty ? protocols : null,
headers: headers)
.then((socket) {
client = socket;
_startListening();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class SynchronousMqttServerConnectionHandler
connection.protocols = websocketProtocols;
}

final websocketHeaders = this.websocketHeaders;
if (websocketHeaders != null) {
connection.headers = websocketHeaders;
}

this.connection = connection;
} else if (secure) {
MqttLogger.log(
Expand Down
19 changes: 17 additions & 2 deletions lib/src/mqtt_server_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ class MqttServerClient extends MqttClient {
/// Applicable only to TCP sockets
List<RawSocketOption> socketOptions = <RawSocketOption>[];

/// User definable websocket headers.
/// This allows the specification of additional HTTP headers for setting up the connection
/// should a broker need specific headers.
/// The keys of the map are the header fields and the values are either String or List.
@protected
Map<String, dynamic>? websocketHeaders;
set websocketHeader(Map<String, dynamic> header) {
websocketHeaders = header;

final connectionHandler = this.connectionHandler;
if (connectionHandler != null) {
connectionHandler.websocketHeaders = header;
}
}

/// Performs a connect to the message broker with an optional
/// username and password for the purposes of authentication.
/// If a username and password are supplied these will override
Expand Down Expand Up @@ -95,8 +110,8 @@ class MqttServerClient extends MqttClient {
if (connectionHandler.useAlternateWebSocketImplementation) {
connectionHandler.securityContext = securityContext;
}
if (websocketProtocolString != null) {
connectionHandler.websocketProtocols = websocketProtocolString;
if (websocketHeaders != null) {
connectionHandler.websocketHeaders = websocketHeaders;
}
}
if (secure) {
Expand Down
3 changes: 2 additions & 1 deletion test/mqtt_client_connection_ws_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ void main() {
final clientEventBus = events.EventBus();
final ch = SynchronousMqttServerConnectionHandler(clientEventBus,
maxConnectionAttempts: 3, socketOptions: socketOptions);
MqttLogger.loggingOn = true;
MqttLogger.loggingOn = false;
ch.useWebSocket = true;
ch.websocketProtocols = <String>['SJHprotocol'];
ch.websocketHeaders = {'Origin': 'SJH'};
brokerWs.setMessageHandler = messageHandlerConnect;
await ch.connect(mockBrokerAddressWs, mockBrokerPortWs,
MqttConnectMessage().withClientIdentifier(testClientId));
Expand Down
15 changes: 15 additions & 0 deletions test/support/mqtt_client_mockbroker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ class MockBrokerWs {
'Mockbroker WS server is running on http://${server.address.address}:$port/');
server.listen((HttpRequest request) {
if (request.uri.path == '/ws') {
final websocketHeader = request.headers.value('Origin');
if (websocketHeader != 'SJH') {
return completer.completeError((request) => websocketHeader);
} else {
print(
'Mockbroker WS server::listen - Origin header is correctly set');
}
final websocketProtocol =
request.headers.value('sec-websocket-protocol');
if (websocketProtocol != 'SJHprotocol') {
return completer.completeError((request) => websocketProtocol);
} else {
print(
'Mockbroker WS server::listen - WS protocol is correctly set');
}
WebSocketTransformer.upgrade(request).then((WebSocket websocket) {
_webSocket = websocket;
websocket.listen(_handleMessage);
Expand Down
15 changes: 15 additions & 0 deletions test/support/mqtt_client_ws_broker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ class MockBrokerWs {
print(
'Mockbroker WS server::listen - request received ${request.uri.path}');
if (request.uri.path == '/ws') {
final websocketHeader = request.headers.value('Origin');
if (websocketHeader != 'SJH') {
return completer.completeError((request) => websocketHeader);
} else {
print(
'Mockbroker WS server::listen - Origin header is correctly set');
}
final websocketProtocol =
request.headers.value('sec-websocket-protocol');
if (websocketProtocol != 'SJHprotocol') {
return completer.completeError((request) => websocketProtocol);
} else {
print(
'Mockbroker WS server::listen - WS protocol is correctly set');
}
print('Mockbroker WS server::listen - upgrading');
WebSocketTransformer.upgrade(request).then((WebSocket websocket) {
_webSocket = websocket;
Expand Down

0 comments on commit 9539532

Please sign in to comment.