Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
Allow overriding base Temporal server config and client options (#75)
Browse files Browse the repository at this point in the history
* added options for configuring tls

* added tests for tls with and without mutal tls

* renamed flags and simplified certificate generation

* fixed some mixed case occurences of Tls

* added docs for public methods and replaced useMtls with clientAuth

* renamed generateCa method and removed old comment

* Update temporaltest/options.go

Co-authored-by: Jacob LeGrone <jlegrone@users.noreply.github.com>

* refactored to use config file for tls options

* removed superfluous tls code

* applied formatting for readability with temporal test server options

* refactored config construction to avoid panic

* added missing comment

* Update options.go

Co-authored-by: Jacob LeGrone <jlegrone@users.noreply.github.com>

* Update temporaltest/server.go

Co-authored-by: Jacob LeGrone <jlegrone@users.noreply.github.com>

* Update temporaltest/options.go

Co-authored-by: Jacob LeGrone <jlegrone@users.noreply.github.com>

* Update cmd/temporalite/main.go

Co-authored-by: Jacob LeGrone <jlegrone@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Jacob LeGrone <jlegrone@users.noreply.github.com>

* adding review suggestions

* Run goimports

* Use WithBaseClientOptions to match WithBaseConfig

Co-authored-by: Jacob LeGrone <jlegrone@users.noreply.github.com>
Co-authored-by: Jacob LeGrone <git@jacob.work>
  • Loading branch information
3 people committed Jun 28, 2022
1 parent 2cabcd1 commit 349fdfc
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 63 deletions.
25 changes: 25 additions & 0 deletions cmd/temporalite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strings"

"github.com/urfave/cli/v2"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/temporal"
Expand Down Expand Up @@ -44,6 +45,7 @@ const (
logLevelFlag = "log-level"
namespaceFlag = "namespace"
pragmaFlag = "sqlite-pragma"
configFlag = "config"
)

func init() {
Expand Down Expand Up @@ -125,6 +127,13 @@ func buildCLI() *cli.App {
EnvVars: nil,
Value: nil,
},
&cli.StringFlag{
Name: configFlag,
Aliases: []string{"c"},
Usage: `config dir path`,
EnvVars: []string{config.EnvKeyConfigDir},
Value: "",
},
},
Before: func(c *cli.Context) error {
if c.Args().Len() > 0 {
Expand All @@ -151,6 +160,13 @@ func buildCLI() *cli.App {
return cli.Exit(fmt.Sprintf("bad value %q passed for flag %q", c.String(ipFlag), ipFlag), 1)
}

if c.IsSet(configFlag) {
cfgPath := c.String(configFlag)
if _, err := os.Stat(cfgPath); os.IsNotExist(err) {
return cli.Exit(fmt.Sprintf("bad value %q passed for flag %q: file not found", c.String(configFlag), configFlag), 1)
}
}

return nil
},
Action: func(c *cli.Context) error {
Expand All @@ -169,6 +185,14 @@ func buildCLI() *cli.App {
return err
}

baseConfig := &config.Config{}
if c.IsSet(configFlag) {
baseConfig, err = config.LoadConfig("temporalite", c.String(configFlag), "")
if err != nil {
return err
}
}

opts := []temporalite.ServerOption{
temporalite.WithDynamicPorts(),
temporalite.WithFrontendPort(serverPort),
Expand All @@ -179,6 +203,7 @@ func buildCLI() *cli.App {
temporalite.WithUpstreamOptions(
temporal.InterruptOn(temporal.InterruptCh()),
),
temporalite.WithBaseConfig(baseConfig),
}
if !c.Bool(headlessFlag) {
opt := newUIOption(fmt.Sprintf(":%d", c.Int(portFlag)), ip, uiPort)
Expand Down
120 changes: 60 additions & 60 deletions internal/liteconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Config struct {
portProvider *portProvider
FrontendIP string
UIServer UIServer
BaseConfig *config.Config
}

var SupportedPragmas = map[string]struct{}{
Expand Down Expand Up @@ -93,6 +94,7 @@ func NewDefaultConfig() (*Config, error) {
})),
portProvider: &portProvider{},
FrontendIP: "",
BaseConfig: &config.Config{},
}, nil
}

Expand Down Expand Up @@ -135,76 +137,74 @@ func Convert(cfg *Config) *config.Config {
pprofPort = cfg.FrontendPort + 201
}

return &config.Config{
Global: config.Global{
Membership: config.Membership{
MaxJoinDuration: 30 * time.Second,
BroadcastAddress: broadcastAddress,
},
Metrics: &metrics.Config{
Prometheus: &metrics.PrometheusConfig{
ListenAddress: fmt.Sprintf("%s:%d", broadcastAddress, metricsPort),
HandlerPath: "/metrics",
},
},
PProf: config.PProf{Port: pprofPort},
baseConfig := cfg.BaseConfig
baseConfig.Global.Membership = config.Membership{
MaxJoinDuration: 30 * time.Second,
BroadcastAddress: broadcastAddress,
}
baseConfig.Global.Metrics = &metrics.Config{
Prometheus: &metrics.PrometheusConfig{
ListenAddress: fmt.Sprintf("%s:%d", broadcastAddress, metricsPort),
HandlerPath: "/metrics",
},
Persistence: config.Persistence{
DefaultStore: PersistenceStoreName,
VisibilityStore: PersistenceStoreName,
NumHistoryShards: 1,
DataStores: map[string]config.DataStore{
PersistenceStoreName: {SQL: &sqliteConfig},
},
}
baseConfig.Global.PProf = config.PProf{Port: pprofPort}
baseConfig.Persistence = config.Persistence{
DefaultStore: PersistenceStoreName,
VisibilityStore: PersistenceStoreName,
NumHistoryShards: 1,
DataStores: map[string]config.DataStore{
PersistenceStoreName: {SQL: &sqliteConfig},
},
ClusterMetadata: &cluster.Config{
EnableGlobalNamespace: false,
FailoverVersionIncrement: 10,
MasterClusterName: "active",
CurrentClusterName: "active",
ClusterInformation: map[string]cluster.ClusterInformation{
"active": {
Enabled: true,
InitialFailoverVersion: 1,
RPCAddress: fmt.Sprintf("%s:%d", broadcastAddress, cfg.FrontendPort),
},
}
baseConfig.ClusterMetadata = &cluster.Config{
EnableGlobalNamespace: false,
FailoverVersionIncrement: 10,
MasterClusterName: "active",
CurrentClusterName: "active",
ClusterInformation: map[string]cluster.ClusterInformation{
"active": {
Enabled: true,
InitialFailoverVersion: 1,
RPCAddress: fmt.Sprintf("%s:%d", broadcastAddress, cfg.FrontendPort),
},
},
DCRedirectionPolicy: config.DCRedirectionPolicy{
Policy: "noop",
}
baseConfig.DCRedirectionPolicy = config.DCRedirectionPolicy{
Policy: "noop",
}
baseConfig.Services = map[string]config.Service{
"frontend": cfg.mustGetService(0),
"history": cfg.mustGetService(1),
"matching": cfg.mustGetService(2),
"worker": cfg.mustGetService(3),
}
baseConfig.Archival = config.Archival{
History: config.HistoryArchival{
State: "disabled",
EnableRead: false,
Provider: nil,
},
Services: map[string]config.Service{
"frontend": cfg.mustGetService(0),
"history": cfg.mustGetService(1),
"matching": cfg.mustGetService(2),
"worker": cfg.mustGetService(3),
Visibility: config.VisibilityArchival{
State: "disabled",
EnableRead: false,
Provider: nil,
},
Archival: config.Archival{
History: config.HistoryArchival{
State: "disabled",
EnableRead: false,
Provider: nil,
},
Visibility: config.VisibilityArchival{
State: "disabled",
EnableRead: false,
Provider: nil,
}
baseConfig.PublicClient = config.PublicClient{
HostPort: fmt.Sprintf("%s:%d", broadcastAddress, cfg.FrontendPort),
}
baseConfig.NamespaceDefaults = config.NamespaceDefaults{
Archival: config.ArchivalNamespaceDefaults{
History: config.HistoryArchivalNamespaceDefaults{
State: "disabled",
},
},
PublicClient: config.PublicClient{
HostPort: fmt.Sprintf("%s:%d", broadcastAddress, cfg.FrontendPort),
},
NamespaceDefaults: config.NamespaceDefaults{
Archival: config.ArchivalNamespaceDefaults{
History: config.HistoryArchivalNamespaceDefaults{
State: "disabled",
},
Visibility: config.VisibilityArchivalNamespaceDefaults{
State: "disabled",
},
Visibility: config.VisibilityArchivalNamespaceDefaults{
State: "disabled",
},
},
}
return baseConfig
}

func (o *Config) mustGetService(frontendPortOffset int) config.Service {
Expand Down
11 changes: 11 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package temporalite

import (
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
"go.temporal.io/server/temporal"

Expand Down Expand Up @@ -99,6 +100,16 @@ func WithUpstreamOptions(options ...temporal.ServerOption) ServerOption {
})
}

// WithBaseConfig sets the default Temporal server configuration.
//
// Storage and client configuration will always be overridden, however base config can be
// used to enable settings like TLS or authentication.
func WithBaseConfig(base *config.Config) ServerOption {
return newApplyFuncContainer(func(cfg *liteconfig.Config) {
cfg.BaseConfig = base
})
}

type applyFuncContainer struct {
applyInternal func(*liteconfig.Config)
}
Expand Down
22 changes: 21 additions & 1 deletion temporaltest/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

package temporaltest

import "testing"
import (
"testing"

"go.temporal.io/sdk/client"

"github.com/DataDog/temporalite"
)

type TestServerOption interface {
apply(*TestServer)
Expand All @@ -20,6 +26,20 @@ func WithT(t *testing.T) TestServerOption {
})
}

// WithBaseClientOptions configures options for the default clients and workers connected to the test server.
func WithBaseClientOptions(o client.Options) TestServerOption {
return newApplyFuncContainer(func(server *TestServer) {
server.defaultClientOptions = o
})
}

// WithTemporaliteOptions provides the ability to use additional Temporalite options, including temporalite.WithUpstreamOptions.
func WithTemporaliteOptions(options ...temporalite.ServerOption) TestServerOption {
return newApplyFuncContainer(func(server *TestServer) {
server.serverOptions = append(server.serverOptions, options...)
})
}

type applyFuncContainer struct {
applyInternal func(*TestServer)
}
Expand Down
10 changes: 8 additions & 2 deletions temporaltest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type TestServer struct {
clients []client.Client
workers []worker.Worker
t *testing.T
defaultClientOptions client.Options
serverOptions []temporalite.ServerOption
}

func (ts *TestServer) fatal(err error) {
Expand Down Expand Up @@ -56,7 +58,7 @@ func (ts *TestServer) Worker(taskQueue string, registerFunc func(registry worker
// be closed on TestServer.Stop.
func (ts *TestServer) Client() client.Client {
if ts.defaultClient == nil {
ts.defaultClient = ts.NewClientWithOptions(client.Options{})
ts.defaultClient = ts.NewClientWithOptions(ts.defaultClientOptions)
}
return ts.defaultClient
}
Expand Down Expand Up @@ -119,12 +121,16 @@ func NewServer(opts ...TestServerOption) *TestServer {
})
}

s, err := temporalite.NewServer(
// Order of these options matters. When there are conflicts, options later in the list take precedence.
// Always specify options that are required for temporaltest last to avoid accidental overrides.
ts.serverOptions = append(ts.serverOptions,
temporalite.WithNamespaces(ts.defaultTestNamespace),
temporalite.WithPersistenceDisabled(),
temporalite.WithDynamicPorts(),
temporalite.WithLogger(log.NewNoopLogger()),
)

s, err := temporalite.NewServer(ts.serverOptions...)
if err != nil {
ts.fatal(fmt.Errorf("error creating server: %w", err))
}
Expand Down

0 comments on commit 349fdfc

Please sign in to comment.