Skip to content

Commit

Permalink
Add support for MQTT client ID and protocol version (#1027)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavius committed Nov 17, 2018
1 parent 8dc35ee commit 1a7583f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
10 changes: 8 additions & 2 deletions pkg/processor/trigger/mqtt/trigger.go
Expand Up @@ -82,9 +82,13 @@ func (m *mqtt) GetConfig() map[string]interface{} {
func (m *mqtt) createSubscriptions() error {
m.Logger.InfoWith("Creating subscriptions",
"brokerUrl", m.configuration.URL,
"subscriptions", m.configuration.Subscriptions)
"subscriptions", m.configuration.Subscriptions,
"clientID", m.configuration.ClientID)

clientOptions := mqttclient.NewClientOptions().AddBroker(m.configuration.URL)
clientOptions := mqttclient.NewClientOptions()

clientOptions.AddBroker(m.configuration.URL)
clientOptions.SetProtocolVersion(uint(m.configuration.ProtocolVersion))

if m.configuration.Username != "" {
clientOptions.SetUsername(m.configuration.Username)
Expand All @@ -94,6 +98,8 @@ func (m *mqtt) createSubscriptions() error {
clientOptions.SetPassword(m.configuration.Password)
}

clientOptions.SetClientID(m.configuration.ClientID)

client := mqttclient.NewClient(clientOptions)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return errors.Wrap(token.Error(), "Failed to connect to broker")
Expand Down
8 changes: 7 additions & 1 deletion pkg/processor/trigger/mqtt/types.go
Expand Up @@ -32,7 +32,9 @@ type Subscription struct {

type Configuration struct {
trigger.Configuration
Subscriptions []Subscription
Subscriptions []Subscription
ClientID string
ProtocolVersion int
}

func NewConfiguration(ID string,
Expand All @@ -48,5 +50,9 @@ func NewConfiguration(ID string,
return nil, errors.Wrap(err, "Failed to decode attributes")
}

if newConfiguration.ProtocolVersion == 0 {
newConfiguration.ProtocolVersion = 4
}

return &newConfiguration, nil
}

0 comments on commit 1a7583f

Please sign in to comment.