Skip to content

Commit

Permalink
Add feature flag to disable cross namespace commands (#1432)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Apr 7, 2021
1 parent 3fc9b16 commit 0ff2418
Show file tree
Hide file tree
Showing 22 changed files with 92 additions and 55 deletions.
13 changes: 10 additions & 3 deletions cmd/server/main.go
Expand Up @@ -35,8 +35,10 @@ import (

"go.temporal.io/server/common/authorization"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
tlog "go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
_ "go.temporal.io/server/common/persistence/sql/sqlplugin/mysql" // needed to load mysql plugin
_ "go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql" // needed to load postgresql plugin
"go.temporal.io/server/temporal"
Expand Down Expand Up @@ -129,19 +131,24 @@ func buildCLI() *cli.App {

logger := tlog.NewZapLogger(tlog.BuildZapLogger(cfg.Log))

authorizer, err := authorization.GetAuthorizerFromConfig(&cfg.Global.Authorization)
dynamicConfigClient, err := dynamicconfig.NewFileBasedClient(&cfg.DynamicConfigClient, logger, temporal.InterruptCh())
if err != nil {
return cli.Exit(fmt.Sprintf("Unable to instantiate authorizer: %v.", err), 1)
logger.Info("Unable to create file based dynamic config client, use no-op config client instead.", tag.Error(err))
dynamicConfigClient = dynamicconfig.NewNoopClient()
}

authorizer, err := authorization.GetAuthorizerFromConfig(
&cfg.Global.Authorization,
)

claimMapper, err := authorization.GetClaimMapperFromConfig(&cfg.Global.Authorization, logger)
if err != nil {
return cli.Exit(fmt.Sprintf("Unable to instantiate claim mapper: %v.", err), 1)
}

s := temporal.NewServer(
temporal.ForServices(services),
temporal.WithConfig(cfg),
temporal.WithDynamicConfigClient(dynamicConfigClient),
temporal.WithLogger(logger),
temporal.InterruptOn(temporal.InterruptCh()),
temporal.WithAuthorizer(authorizer),
Expand Down
6 changes: 4 additions & 2 deletions common/authorization/authorizer.go
Expand Up @@ -22,7 +22,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination authority_mock.go
//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination authorizer_mock.go

package authorization

Expand Down Expand Up @@ -50,6 +50,8 @@ type CallTarget struct {
APIName string
// If a Namespace is not being targeted this be set to an empty string.
Namespace string
// If a Namespace is not being targeted this be set to an empty string.
Request interface{}
}

// @@@SNIPEND
Expand All @@ -72,7 +74,7 @@ type Authorizer interface {

// @@@SNIPEND

type requestWithNamespace interface {
type hasNamespace interface {
GetNamespace() string
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

File renamed without changes.
2 changes: 1 addition & 1 deletion common/authorization/claim_mapper_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -24,9 +24,16 @@

package authorization

import "context"
import (
"context"
)

type defaultAuthorizer struct{}
type (
defaultAuthorizer struct {
}
)

var _ Authorizer = (*defaultAuthorizer)(nil)

// NewDefaultAuthorizer creates a default authorizer
func NewDefaultAuthorizer() Authorizer {
Expand Down Expand Up @@ -54,7 +61,6 @@ func (a *defaultAuthorizer) Authorize(_ context.Context, claims *Claims, target
if !found || roles == RoleUndefined {
return Result{Decision: DecisionDeny}, nil
}

return Result{Decision: DecisionAllow}, nil
}

var _ Authorizer = (*defaultAuthorizer)(nil)
50 changes: 29 additions & 21 deletions common/authorization/interceptor.go
Expand Up @@ -28,25 +28,27 @@ import (
"context"
"crypto/x509/pkix"

"go.temporal.io/api/serviceerror"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"

"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
)

type (
contextKeyMappedClaims struct{}
contextKeyAuthHeader struct{}
)

var (
errUnauthorized = serviceerror.NewPermissionDenied("Request unauthorized.")
)

const (
ContextKeyMappedClaims = "auth-mappedClaims"
ContextAuthHeader = "auth-header"
MappedClaims contextKeyMappedClaims
AuthHeader contextKeyAuthHeader
)

func (a *interceptor) Interceptor(
Expand Down Expand Up @@ -100,45 +102,51 @@ func (a *interceptor) Interceptor(
}
mappedClaims, err := a.claimMapper.GetClaims(&authInfo)
if err != nil {
return nil, a.logAuthError(err)
a.logAuthError(err)
return nil, errUnauthorized // return a generic error to the caller without disclosing details
}
claims = mappedClaims
ctx = context.WithValue(ctx, ContextKeyMappedClaims, mappedClaims)
ctx = context.WithValue(ctx, MappedClaims, mappedClaims)
if authHeader != "" {
ctx = context.WithValue(ctx, ContextAuthHeader, authHeader)
ctx = context.WithValue(ctx, AuthHeader, authHeader)
}
}
}

if a.authorizer != nil {
var namespace string
requestWithNamespace, ok := req.(requestWithNamespace)
requestWithNamespace, ok := req.(hasNamespace)
if ok {
namespace = requestWithNamespace.GetNamespace()
}

apiName := info.FullMethod

scope := a.getMetricsScope(metrics.AuthorizationScope, namespace)
sw := scope.StartTimer(metrics.ServiceAuthorizationLatency)
defer sw.Stop()

result, err := a.authorizer.Authorize(ctx, claims, &CallTarget{Namespace: namespace, APIName: apiName})
result, err := a.authorize(ctx, claims, &CallTarget{
Namespace: namespace,
APIName: info.FullMethod,
Request: req,
}, scope)
if err != nil {
scope.IncCounter(metrics.ServiceErrAuthorizeFailedCounter)
return nil, a.logAuthError(err)
a.logAuthError(err)
return nil, errUnauthorized // return a generic error to the caller without disclosing details
}
if result.Decision != DecisionAllow {
scope.IncCounter(metrics.ServiceErrUnauthorizedCounter)
return nil, errUnauthorized
return nil, errUnauthorized // return a generic error to the caller without disclosing details
}
}
return handler(ctx, req)
}

func (a *interceptor) logAuthError(err error) error {
a.logger.Error("authorization error", tag.Error(err))
return errUnauthorized // return a generic error to the caller without disclosing details
func (a *interceptor) authorize(ctx context.Context, claims *Claims, callTarget *CallTarget, scope metrics.Scope) (Result, error) {
sw := scope.StartTimer(metrics.ServiceAuthorizationLatency)
defer sw.Stop()
return a.authorizer.Authorize(ctx, claims, callTarget)
}

func (a *interceptor) logAuthError(err error) {
a.logger.Error("Authorization error", tag.Error(err))
}

type interceptor struct {
Expand Down
4 changes: 2 additions & 2 deletions common/authorization/interceptor_test.go
Expand Up @@ -45,11 +45,11 @@ const (

var (
ctx = context.Background()
describeNamespaceTarget = &CallTarget{Namespace: testNamespace, APIName: "/temporal.api.workflowservice.v1.WorkflowService/DescribeNamespace"}
describeNamespaceRequest = &workflowservice.DescribeNamespaceRequest{Namespace: testNamespace}
describeNamespaceTarget = &CallTarget{Namespace: testNamespace, Request: describeNamespaceRequest, APIName: "/temporal.api.workflowservice.v1.WorkflowService/DescribeNamespace"}
describeNamespaceInfo = &grpc.UnaryServerInfo{FullMethod: "/temporal.api.workflowservice.v1.WorkflowService/DescribeNamespace"}
startWorkflowExecutionTarget = &CallTarget{Namespace: testNamespace, APIName: "/temporal.api.workflowservice.v1.WorkflowService/StartWorkflowExecution"}
startWorkflowExecutionRequest = &workflowservice.StartWorkflowExecutionRequest{Namespace: testNamespace}
startWorkflowExecutionTarget = &CallTarget{Namespace: testNamespace, Request: startWorkflowExecutionRequest, APIName: "/temporal.api.workflowservice.v1.WorkflowService/StartWorkflowExecution"}
startWorkflowExecutionInfo = &grpc.UnaryServerInfo{FullMethod: "/temporal.api.workflowservice.v1.WorkflowService/StartWorkflowExecution"}
)

Expand Down
File renamed without changes.
File renamed without changes.
3 changes: 3 additions & 0 deletions common/dynamicconfig/constants.go
Expand Up @@ -83,6 +83,7 @@ var Keys = map[Key]string{
EnableStickyQuery: "system.enableStickyQuery",
EnablePriorityTaskProcessor: "system.enablePriorityTaskProcessor",
EnableAuthorization: "system.enableAuthorization",
EnableCrossNamespaceCommands: "system.enableCrossNamespaceCommands",

// size limit
BlobSizeLimitError: "limit.blobSize.error",
Expand Down Expand Up @@ -379,6 +380,8 @@ const (
EnablePriorityTaskProcessor
// EnableAuthorization is the key to enable authorization for a namespace
EnableAuthorization
// EnableCrossNamespaceCommands is the key to enable commands for external namespaces
EnableCrossNamespaceCommands
// BlobSizeLimitError is the per event blob size limit
BlobSizeLimitError
// BlobSizeLimitWarn is the per event blob size limit for warning
Expand Down
4 changes: 2 additions & 2 deletions common/dynamicconfig/file_based_client.go
Expand Up @@ -62,12 +62,12 @@ type fileBasedClient struct {
values atomic.Value
lastUpdatedTime time.Time
config *FileBasedClientConfig
doneCh chan struct{}
doneCh <-chan interface{}
logger log.Logger
}

// NewFileBasedClient creates a file based client.
func NewFileBasedClient(config *FileBasedClientConfig, logger log.Logger, doneCh chan struct{}) (Client, error) {
func NewFileBasedClient(config *FileBasedClientConfig, logger log.Logger, doneCh <-chan interface{}) (Client, error) {
if err := validateConfig(config); err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions common/dynamicconfig/file_based_client_test.go
Expand Up @@ -38,7 +38,7 @@ type fileBasedClientSuite struct {
suite.Suite
*require.Assertions
client Client
doneCh chan struct{}
doneCh chan interface{}
}

func TestFileBasedClientSuite(t *testing.T) {
Expand All @@ -48,7 +48,7 @@ func TestFileBasedClientSuite(t *testing.T) {

func (s *fileBasedClientSuite) SetupSuite() {
var err error
s.doneCh = make(chan struct{})
s.doneCh = make(chan interface{})
s.client, err = NewFileBasedClient(&FileBasedClientConfig{
Filepath: "config/testConfig.yaml",
PollInterval: time.Second * 5,
Expand Down
8 changes: 7 additions & 1 deletion service/history/commandChecker.go
Expand Up @@ -58,6 +58,7 @@ type (
searchAttributesValidator *searchattribute.Validator
getDefaultActivityRetrySettings dynamicconfig.MapPropertyFnWithNamespaceFilter
getDefaultWorkflowRetrySettings dynamicconfig.MapPropertyFnWithNamespaceFilter
enableCrossNamespaceCommands dynamicconfig.BoolPropertyFn
}

workflowSizeChecker struct {
Expand Down Expand Up @@ -94,6 +95,7 @@ func newCommandAttrValidator(
searchAttributesValidator: searchAttributesValidator,
getDefaultActivityRetrySettings: config.DefaultActivityRetryPolicy,
getDefaultWorkflowRetrySettings: config.DefaultWorkflowRetryPolicy,
enableCrossNamespaceCommands: config.EnableCrossNamespaceCommands,
}
}

Expand Down Expand Up @@ -666,6 +668,10 @@ func (v *commandAttrValidator) validateCrossNamespaceCall(
return nil
}

if !v.enableCrossNamespaceCommands() {
return serviceerror.NewInvalidArgument("cross namespace commands are not allowed")
}

namespaceEntry, err := v.namespaceCache.GetNamespaceByID(namespaceID)
if err != nil {
return err
Expand Down Expand Up @@ -699,5 +705,5 @@ func (v *commandAttrValidator) createCrossNamespaceCallError(
namespaceEntry *cache.NamespaceCacheEntry,
targetNamespaceEntry *cache.NamespaceCacheEntry,
) error {
return serviceerror.NewInvalidArgument(fmt.Sprintf("cannot make cross namespace call between %v and %v", namespaceEntry.GetInfo().Name, targetNamespaceEntry.GetInfo().Name))
return serviceerror.NewInvalidArgument(fmt.Sprintf("unable to process cross namespace command between %v and %v", namespaceEntry.GetInfo().Name, targetNamespaceEntry.GetInfo().Name))
}
1 change: 1 addition & 0 deletions service/history/commandChecker_test.go
Expand Up @@ -91,6 +91,7 @@ func (s *commandAttrValidatorSuite) SetupTest() {
SearchAttributesTotalSizeLimit: dynamicconfig.GetIntPropertyFilteredByNamespace(40 * 1024),
DefaultActivityRetryPolicy: dynamicconfig.GetMapPropertyFnWithNamespaceFilter(common.GetDefaultRetryPolicyConfigOptions()),
DefaultWorkflowRetryPolicy: dynamicconfig.GetMapPropertyFnWithNamespaceFilter(common.GetDefaultRetryPolicyConfigOptions()),
EnableCrossNamespaceCommands: dynamicconfig.GetBoolPropertyFn(true),
}
s.validator = newCommandAttrValidator(
s.mockNamespaceCache,
Expand Down
4 changes: 4 additions & 0 deletions service/history/configs/config.go
Expand Up @@ -258,6 +258,8 @@ type Config struct {
ESProcessorBulkSize dynamicconfig.IntPropertyFn // max total size of bytes in bulk
ESProcessorFlushInterval dynamicconfig.DurationPropertyFn
ESProcessorAckTimeout dynamicconfig.DurationPropertyFn

EnableCrossNamespaceCommands dynamicconfig.BoolPropertyFn
}

const (
Expand Down Expand Up @@ -445,6 +447,8 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
ESProcessorBulkSize: dc.GetIntProperty(dynamicconfig.WorkerESProcessorBulkSize, 2<<24), // 16MB
ESProcessorFlushInterval: dc.GetDurationProperty(dynamicconfig.WorkerESProcessorFlushInterval, 1*time.Second),
ESProcessorAckTimeout: dc.GetDurationProperty(dynamicconfig.WorkerESProcessorAckTimeout, 1*time.Minute),

EnableCrossNamespaceCommands: dc.GetBoolProperty(dynamicconfig.EnableCrossNamespaceCommands, true),
}

return cfg
Expand Down
4 changes: 2 additions & 2 deletions temporal/server.go
Expand Up @@ -72,7 +72,7 @@ type (
so *serverOptions
services map[string]common.Daemon
serviceStoppedChs map[string]chan struct{}
stoppedCh chan struct{}
stoppedCh chan interface{}
logger log.Logger
}
)
Expand Down Expand Up @@ -104,7 +104,7 @@ func (s *Server) Start() error {
return err
}

s.stoppedCh = make(chan struct{})
s.stoppedCh = make(chan interface{})

s.logger = s.so.logger
if s.logger == nil {
Expand Down

0 comments on commit 0ff2418

Please sign in to comment.