From 1a7583f4f2e9c5c75fc8244711d80fa3b3c6cb41 Mon Sep 17 00:00:00 2001 From: Eran Duchan Date: Sat, 17 Nov 2018 17:42:09 +0200 Subject: [PATCH] Add support for MQTT client ID and protocol version (#1027) --- pkg/processor/trigger/mqtt/trigger.go | 10 ++++++++-- pkg/processor/trigger/mqtt/types.go | 8 +++++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg/processor/trigger/mqtt/trigger.go b/pkg/processor/trigger/mqtt/trigger.go index 060c92364b..e716d0b0c1 100644 --- a/pkg/processor/trigger/mqtt/trigger.go +++ b/pkg/processor/trigger/mqtt/trigger.go @@ -82,9 +82,13 @@ func (m *mqtt) GetConfig() map[string]interface{} { func (m *mqtt) createSubscriptions() error { m.Logger.InfoWith("Creating subscriptions", "brokerUrl", m.configuration.URL, - "subscriptions", m.configuration.Subscriptions) + "subscriptions", m.configuration.Subscriptions, + "clientID", m.configuration.ClientID) - clientOptions := mqttclient.NewClientOptions().AddBroker(m.configuration.URL) + clientOptions := mqttclient.NewClientOptions() + + clientOptions.AddBroker(m.configuration.URL) + clientOptions.SetProtocolVersion(uint(m.configuration.ProtocolVersion)) if m.configuration.Username != "" { clientOptions.SetUsername(m.configuration.Username) @@ -94,6 +98,8 @@ func (m *mqtt) createSubscriptions() error { clientOptions.SetPassword(m.configuration.Password) } + clientOptions.SetClientID(m.configuration.ClientID) + client := mqttclient.NewClient(clientOptions) if token := client.Connect(); token.Wait() && token.Error() != nil { return errors.Wrap(token.Error(), "Failed to connect to broker") diff --git a/pkg/processor/trigger/mqtt/types.go b/pkg/processor/trigger/mqtt/types.go index 16996b5ca3..b44f40fcf6 100644 --- a/pkg/processor/trigger/mqtt/types.go +++ b/pkg/processor/trigger/mqtt/types.go @@ -32,7 +32,9 @@ type Subscription struct { type Configuration struct { trigger.Configuration - Subscriptions []Subscription + Subscriptions []Subscription + ClientID string + ProtocolVersion int } func NewConfiguration(ID string, @@ -48,5 +50,9 @@ func NewConfiguration(ID string, return nil, errors.Wrap(err, "Failed to decode attributes") } + if newConfiguration.ProtocolVersion == 0 { + newConfiguration.ProtocolVersion = 4 + } + return &newConfiguration, nil }