Skip to content

Commit

Permalink
[CHANGED] Cluster permissions moved out of cluster's authorization
Browse files Browse the repository at this point in the history
It will be possible to set subjects permissions regardless of the
presence of an authorization block.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Sep 10, 2018
1 parent 2ee18d6 commit e1202dd
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 25 deletions.
23 changes: 15 additions & 8 deletions README.md
Expand Up @@ -679,10 +679,11 @@ cluster {
# bcrypted hash of "top_secret"
password: $2a$11$UaoHwUEqHaMwqo6L4kM2buOBnGFnSCWxNXY87hl.kCERqKK8WAXM.
timeout: 3
permissions {
import:["_INBOX.>", "global.>"]
export:["_INBOX.>", "global.>", "sensors.>"]
}
}
permissions {
import:["_INBOX.>", "global.>"]
export:["_INBOX.>", "global.>", "sensors.>"]
}
routes = [
Expand All @@ -702,10 +703,11 @@ cluster {
# bcrypted hash of "top_secret"
password: $2a$11$UaoHwUEqHaMwqo6L4kM2buOBnGFnSCWxNXY87hl.kCERqKK8WAXM.
timeout: 3
permissions {
import:["_INBOX.>", "global.>", "sensors.>"]
export:["_INBOX.>", "global.>"]
}
}
permissions {
import:["_INBOX.>", "global.>", "sensors.>"]
export:["_INBOX.>", "global.>"]
}
routes = [
Expand All @@ -716,6 +718,11 @@ cluster {

The example above allows request/reply and messages published to any subject matching `global.>` to be freely propagated throughout the cluster. The cloud server imports and locally delivers messages published to subjects matching `sensors.>`, but won't export messages published to subjects matching `sensors.>`. This enforces a directional flow of sensor data from edge servers to the cloud servers. Also, as new edge servers are added they will not receive sensor data from other edge servers. Importing and exporting subjects in server clustering can provide additional security and optimize use of network resources.

> Note: When first introduced, the `permissions` block had to be defined in the `authorization` block forcing a cluster user to be defined in order for permissions to work.
This has been changed and the `permissions` block is now moved to the top-level `cluster` block, allowing use of subject permissions even without the presence of an `authorization` block.
If `permissions` are defined in both `authorization` and top-level `cluster` blocks, the content of `permissions` in the `authorization` block is ignored. It is recommended that the configuration
files be updated to move the permissions to the top-level block.

### TLS

The server can use modern TLS semantics for client connections, route connections, and the HTTPS monitoring port.
Expand Down
1 change: 0 additions & 1 deletion server/auth.go
Expand Up @@ -217,7 +217,6 @@ func (s *Server) isRouterAuthorized(c *client) bool {
if !comparePasswords(opts.Cluster.Password, c.opts.Password) {
return false
}
c.setRoutePermissions(opts.Cluster.Permissions)
return true
}

Expand Down
5 changes: 5 additions & 0 deletions server/client.go
Expand Up @@ -773,8 +773,13 @@ func (c *client) processConnect(arg []byte) error {

// Grab connection name of remote route.
if typ == ROUTER && r != nil {
var routePerms *RoutePermissions
if srv != nil {
routePerms = srv.getOpts().Cluster.Permissions
}
c.mu.Lock()
c.route.remoteID = c.opts.Name
c.setRoutePermissions(routePerms)
c.mu.Unlock()
}

Expand Down
38 changes: 28 additions & 10 deletions server/opts.go
Expand Up @@ -387,16 +387,9 @@ func parseCluster(cm map[string]interface{}, opts *Options) error {
opts.Cluster.Username = auth.user
opts.Cluster.Password = auth.pass
opts.Cluster.AuthTimeout = auth.timeout
if auth.defaultPermissions != nil {
// Import is whether or not we will send a SUB for interest to the other side.
// Export is whether or not we will accept a SUB from the remote for a given subject.
// Both only effect interest registration.
// The parsing sets Import into Publish and Export into Subscribe, convert
// accordingly.
opts.Cluster.Permissions = &RoutePermissions{
Import: auth.defaultPermissions.Publish,
Export: auth.defaultPermissions.Subscribe,
}
// Do not set permissions if they were specified in top-level cluster block.
if auth.defaultPermissions != nil && opts.Cluster.Permissions == nil {
setClusterPermissions(&opts.Cluster, auth.defaultPermissions)
}
case "routes":
ra := mv.([]interface{})
Expand Down Expand Up @@ -430,11 +423,36 @@ func parseCluster(cm map[string]interface{}, opts *Options) error {
opts.Cluster.NoAdvertise = mv.(bool)
case "connect_retries":
opts.Cluster.ConnectRetries = int(mv.(int64))
case "permissions":
pm, ok := mv.(map[string]interface{})
if !ok {
return fmt.Errorf("Expected permissions to be a map/struct, got %+v", mv)
}
perms, err := parseUserPermissions(pm)
if err != nil {
return err
}
// This will possibly override permissions that were define in auth block
setClusterPermissions(&opts.Cluster, perms)
}
}
return nil
}

// Sets cluster's permissions based on given pub/sub permissions,
// doing the appropriate translation.
func setClusterPermissions(opts *ClusterOpts, perms *Permissions) {
// Import is whether or not we will send a SUB for interest to the other side.
// Export is whether or not we will accept a SUB from the remote for a given subject.
// Both only effect interest registration.
// The parsing sets Import into Publish and Export into Subscribe, convert
// accordingly.
opts.Permissions = &RoutePermissions{
Import: perms.Publish,
Export: perms.Subscribe,
}
}

// Helper function to parse Authorization configs.
func parseAuthorization(am map[string]interface{}) (*authorization, error) {
auth := &authorization{}
Expand Down
159 changes: 159 additions & 0 deletions server/opts_test.go
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"crypto/tls"
"flag"
"fmt"
"io/ioutil"
"net/url"
"os"
Expand Down Expand Up @@ -1152,3 +1153,161 @@ func TestConfigureOptions(t *testing.T) {
t.Fatal("Expected TLSConfig to be set")
}
}

func TestClusterPermissionsConfig(t *testing.T) {
template := `
cluster {
port: 1234
%s
authorization {
user: ivan
password: pwd
permissions {
import {
allow: "foo"
}
export {
allow: "bar"
}
}
}
}
`
conf := createConfFile(t, []byte(fmt.Sprintf(template, "")))
defer os.Remove(conf)
opts, err := ProcessConfigFile(conf)
if err != nil {
t.Fatalf("Error processing config file: %v", err)
}
if opts.Cluster.Permissions == nil {
t.Fatal("Expected cluster permissions to be set")
}
if opts.Cluster.Permissions.Import == nil {
t.Fatal("Expected cluster import permissions to be set")
}
if len(opts.Cluster.Permissions.Import.Allow) != 1 || opts.Cluster.Permissions.Import.Allow[0] != "foo" {
t.Fatalf("Expected cluster import permissions to have %q, got %v", "foo", opts.Cluster.Permissions.Import.Allow)
}
if opts.Cluster.Permissions.Export == nil {
t.Fatal("Expected cluster export permissions to be set")
}
if len(opts.Cluster.Permissions.Export.Allow) != 1 || opts.Cluster.Permissions.Export.Allow[0] != "bar" {
t.Fatalf("Expected cluster export permissions to have %q, got %v", "bar", opts.Cluster.Permissions.Export.Allow)
}

// Now add permissions in top level cluster and check
// that this is the one that is being used.
conf = createConfFile(t, []byte(fmt.Sprintf(template, `
permissions {
import {
allow: "baz"
}
export {
allow: "bat"
}
}
`)))
defer os.Remove(conf)
opts, err = ProcessConfigFile(conf)
if err != nil {
t.Fatalf("Error processing config file: %v", err)
}
if opts.Cluster.Permissions == nil {
t.Fatal("Expected cluster permissions to be set")
}
if opts.Cluster.Permissions.Import == nil {
t.Fatal("Expected cluster import permissions to be set")
}
if len(opts.Cluster.Permissions.Import.Allow) != 1 || opts.Cluster.Permissions.Import.Allow[0] != "baz" {
t.Fatalf("Expected cluster import permissions to have %q, got %v", "baz", opts.Cluster.Permissions.Import.Allow)
}
if opts.Cluster.Permissions.Export == nil {
t.Fatal("Expected cluster export permissions to be set")
}
if len(opts.Cluster.Permissions.Export.Allow) != 1 || opts.Cluster.Permissions.Export.Allow[0] != "bat" {
t.Fatalf("Expected cluster export permissions to have %q, got %v", "bat", opts.Cluster.Permissions.Export.Allow)
}

// Tests with invalid permissions
invalidPerms := []string{
`permissions: foo`,
`permissions {
unknown_field: "foo"
}`,
`permissions {
import: [1, 2, 3]
}`,
`permissions {
import {
unknown_field: "foo"
}
}`,
`permissions {
import {
allow {
x: y
}
}
}`,
`permissions {
import {
deny {
x: y
}
}
}`,
`permissions {
export: [1, 2, 3]
}`,
`permissions {
export {
unknown_field: "foo"
}
}`,
`permissions {
export {
allow {
x: y
}
}
}`,
`permissions {
export {
deny {
x: y
}
}
}`,
}
for _, perms := range invalidPerms {
conf = createConfFile(t, []byte(fmt.Sprintf(`
cluster {
port: 1234
%s
}
`, perms)))
_, err := ProcessConfigFile(conf)
os.Remove(conf)
if err == nil {
t.Fatalf("Expected failure for permissions %s", perms)
}
}

for _, perms := range invalidPerms {
conf = createConfFile(t, []byte(fmt.Sprintf(`
cluster {
port: 1234
authorization {
user: ivan
password: pwd
%s
}
}
`, perms)))
_, err := ProcessConfigFile(conf)
os.Remove(conf)
if err == nil {
t.Fatalf("Expected failure for permissions %s", perms)
}
}
}
90 changes: 90 additions & 0 deletions server/routes_test.go
Expand Up @@ -998,3 +998,93 @@ func TestRouteFailedConnRemovedFromTmpMap(t *testing.T) {
srvA.Shutdown()
wg.Wait()
}

func TestRoutePermsAppliedOnInboundAndOutboundRoute(t *testing.T) {

perms := &RoutePermissions{
Import: &SubjectPermission{
Allow: []string{"imp.foo"},
Deny: []string{"imp.bar"},
},
Export: &SubjectPermission{
Allow: []string{"exp.foo"},
Deny: []string{"exp.bar"},
},
}

optsA, _ := ProcessConfigFile("./configs/seed.conf")
optsA.NoLog = true
optsA.NoSigs = true
optsA.Cluster.Permissions = perms
srva := RunServer(optsA)
defer srva.Shutdown()

optsB := DefaultOptions()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port))
srvb := RunServer(optsB)
defer srvb.Shutdown()

checkClusterFormed(t, srva, srvb)

// Ensure permission is properly set
check := func(t *testing.T, s *Server) {
t.Helper()
var route *client
s.mu.Lock()
for _, r := range s.routes {
route = r
break
}
s.mu.Unlock()
route.mu.Lock()
perms := route.perms
route.mu.Unlock()
if perms == nil {
t.Fatal("Expected perms to be set")
}
if perms.pub.allow == nil || perms.pub.allow.Count() != 1 {
t.Fatal("unexpected pub allow perms")
}
if r := perms.pub.allow.Match("imp.foo"); len(r.psubs) != 1 {
t.Fatal("unexpected pub allow match")
}
if perms.pub.deny == nil || perms.pub.deny.Count() != 1 {
t.Fatal("unexpected pub deny perms")
}
if r := perms.pub.deny.Match("imp.bar"); len(r.psubs) != 1 {
t.Fatal("unexpected pub deny match")
}
if perms.sub.allow == nil || perms.sub.allow.Count() != 1 {
t.Fatal("unexpected sub allow perms")
}
if r := perms.sub.allow.Match("exp.foo"); len(r.psubs) != 1 {
t.Fatal("unexpected sub allow match")
}
if perms.sub.deny == nil || perms.sub.deny.Count() != 1 {
t.Fatal("unexpected sub deny perms")
}
if r := perms.sub.deny.Match("exp.bar"); len(r.psubs) != 1 {
t.Fatal("unexpected sub deny match")
}
}

// First check when permissions are set on the server accepting the route connection
check(t, srva)

srvb.Shutdown()
srva.Shutdown()

optsA.Cluster.Permissions = nil
optsB.Cluster.Permissions = perms

srva = RunServer(optsA)
defer srva.Shutdown()

srvb = RunServer(optsB)
defer srvb.Shutdown()

checkClusterFormed(t, srva, srvb)

// Now check for permissions set on server initiating the route connection
check(t, srvb)
}

0 comments on commit e1202dd

Please sign in to comment.