Skip to content

Commit

Permalink
Merge pull request #1840 from angiglesias/allow-mqtt_bearer_token
Browse files Browse the repository at this point in the history
[ADDED] Allow bearer token as mqtt authentication method
  • Loading branch information
kozlovic committed Jan 26, 2021
2 parents 7efb9a6 + 07c3214 commit 0fe9209
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 0 deletions.
1 change: 1 addition & 0 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1436,6 +1436,7 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int) (byte, *mqttConnectProt
return 0, nil, err
}
c.opts.Token = c.opts.Password
c.opts.JWT = c.opts.Password
}
return 0, cp, nil
}
Expand Down
78 changes: 78 additions & 0 deletions server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
)

type mqttErrorReader struct {
Expand Down Expand Up @@ -1119,6 +1120,83 @@ func TestMQTTTokenAuth(t *testing.T) {
}
}

func TestMQTTJWTWithAllowedConnectionTypes(t *testing.T) {
o := testMQTTDefaultOptions()
// Create System Account
syskp, _ := nkeys.CreateAccount()
syspub, _ := syskp.PublicKey()
sysAc := jwt.NewAccountClaims(syspub)
sysjwt, err := sysAc.Encode(oKp)
if err != nil {
t.Fatalf("Error generating account JWT: %v", err)
}
// Create memory resolver and store system account
mr := &MemAccResolver{}
mr.Store(syspub, sysjwt)
if err != nil {
t.Fatalf("Error saving system account JWT to memory resolver: %v", err)
}
// Add system account and memory resolver to server options
o.SystemAccount = syspub
o.AccountResolver = mr
setupAddTrusted(o)

s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

for _, test := range []struct {
name string
connectionTypes []string
rc byte
}{
{"not allowed", []string{jwt.ConnectionTypeStandard}, mqttConnAckRCNotAuthorized},
{"allowed", []string{jwt.ConnectionTypeStandard, strings.ToLower(jwt.ConnectionTypeMqtt)}, mqttConnAckRCConnectionAccepted},
{"allowed with unknown", []string{jwt.ConnectionTypeMqtt, "SomeNewType"}, mqttConnAckRCConnectionAccepted},
{"not allowed with unknown", []string{"SomeNewType"}, mqttConnAckRCNotAuthorized},
} {
t.Run(test.name, func(t *testing.T) {

nuc := newJWTTestUserClaims()
nuc.AllowedConnectionTypes = test.connectionTypes
nuc.BearerToken = true

okp, _ := nkeys.FromSeed(oSeed)

akp, _ := nkeys.CreateAccount()
apub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(apub)
// Enable Jetstream on account with lax limitations
nac.Limits.JetStreamLimits.Consumer = -1
nac.Limits.JetStreamLimits.Streams = -1
nac.Limits.JetStreamLimits.MemoryStorage = 1024 * 1024
ajwt, err := nac.Encode(okp)
if err != nil {
t.Fatalf("Error generating account JWT: %v", err)
}

nkp, _ := nkeys.CreateUser()
pub, _ := nkp.PublicKey()
nuc.Subject = pub
jwt, err := nuc.Encode(akp)
if err != nil {
t.Fatalf("Error generating user JWT: %v", err)
}

addAccountToMemResolver(s, apub, ajwt)

ci := &mqttConnInfo{
cleanSess: true,
user: "ignore_use_token",
pass: jwt,
}

mc, r := testMQTTConnect(t, ci, o.MQTT.Host, o.MQTT.Port)
defer mc.Close()
testMQTTCheckConnAck(t, r, test.rc, false)
})
}
}

func TestMQTTUsersAuth(t *testing.T) {
users := []*User{&User{Username: "user", Password: "pwd"}}
for _, test := range []struct {
Expand Down

0 comments on commit 0fe9209

Please sign in to comment.