From 4c212e5bb48fe270f35dea96bc0bdd0b9ed746d1 Mon Sep 17 00:00:00 2001 From: Mikhail Gryzykhin <12602502+Ardagan@users.noreply.github.com> Date: Wed, 23 Feb 2022 15:37:02 -0800 Subject: [PATCH] Add dynamic config change diff logging (#2494) * Add dynamic config change diff logging --- common/dynamicconfig/basic_client.go | 100 ++++++- common/dynamicconfig/config.go | 73 ++--- common/dynamicconfig/config_test.go | 30 --- common/dynamicconfig/constants.go | 4 +- common/dynamicconfig/file_based_client.go | 115 +++++--- .../dynamicconfig/file_based_client_mock.go | 89 +++++++ .../dynamicconfig/file_based_client_test.go | 252 ++++++++++++++++++ common/dynamicconfig/interface.go | 30 ++- .../dynamicconfig/mutable_ephemeral_client.go | 4 +- common/dynamicconfig/shared_constants.go | 27 ++ service/worker/service.go | 6 +- temporal/server_impl.go | 23 +- 12 files changed, 606 insertions(+), 147 deletions(-) create mode 100644 common/dynamicconfig/file_based_client_mock.go create mode 100644 common/dynamicconfig/shared_constants.go diff --git a/common/dynamicconfig/basic_client.go b/common/dynamicconfig/basic_client.go index dfa8b03b439..fe6bd6c3590 100644 --- a/common/dynamicconfig/basic_client.go +++ b/common/dynamicconfig/basic_client.go @@ -27,10 +27,12 @@ package dynamicconfig import ( "errors" "fmt" + "reflect" "strings" "sync/atomic" "time" + "go.temporal.io/server/common/log" "go.temporal.io/server/common/primitives/timestamp" ) @@ -44,15 +46,28 @@ type constrainedValue struct { } type basicClient struct { + logger log.Logger values atomic.Value // configValueMap } -func newBasicClient() *basicClient { - bc := &basicClient{} +func newBasicClient(logger log.Logger) *basicClient { + bc := &basicClient{ + logger: logger, + } bc.values.Store(configValueMap{}) return bc } +func (bc *basicClient) getValues() configValueMap { + return bc.values.Load().(configValueMap) +} + +func (bc *basicClient) updateValues(newValues configValueMap) { + oldValues := bc.getValues() + bc.values.Store(newValues) + bc.logDiff(oldValues, newValues) +} + func (bc *basicClient) GetValue( name Key, defaultValue interface{}, @@ -210,3 +225,84 @@ func match(v *constrainedValue, filters map[Filter]interface{}) bool { } return true } + +func (fc *basicClient) logDiff(old configValueMap, new configValueMap) { + for key, newValues := range new { + oldValues, ok := old[key] + if !ok { + for _, newValue := range newValues { + // new key added + fc.logValueDiff(key, nil, newValue) + } + } else { + // compare existing keys + fc.logConstraintsDiff(key, oldValues, newValues) + } + } + + // check for removed values + for key, oldValues := range old { + if _, ok := new[key]; !ok { + for _, oldValue := range oldValues { + fc.logValueDiff(key, oldValue, nil) + } + } + } +} + +func (bc *basicClient) logConstraintsDiff(key string, oldValues []*constrainedValue, newValues []*constrainedValue) { + for _, oldValue := range oldValues { + matchFound := false + for _, newValue := range newValues { + if reflect.DeepEqual(oldValue.Constraints, newValue.Constraints) { + matchFound = true + if !reflect.DeepEqual(oldValue.Value, newValue.Value) { + bc.logValueDiff(key, oldValue, newValue) + } + } + } + if !matchFound { + bc.logValueDiff(key, oldValue, nil) + } + } + + for _, newValue := range newValues { + matchFound := false + for _, oldValue := range oldValues { + if reflect.DeepEqual(oldValue.Constraints, newValue.Constraints) { + matchFound = true + } + } + if !matchFound { + bc.logValueDiff(key, nil, newValue) + } + } +} + +func (bc *basicClient) logValueDiff(key string, oldValue *constrainedValue, newValue *constrainedValue) { + logLine := &strings.Builder{} + logLine.Grow(128) + logLine.WriteString("dynamic config changed for the key: ") + logLine.WriteString(key) + logLine.WriteString(" oldValue: ") + bc.appendConstrainedValue(logLine, oldValue) + logLine.WriteString(" newValue: ") + bc.appendConstrainedValue(logLine, newValue) + bc.logger.Info(logLine.String()) +} + +func (bc *basicClient) appendConstrainedValue(logLine *strings.Builder, value *constrainedValue) { + if value == nil { + logLine.WriteString("nil") + } else { + logLine.WriteString("{ constraints: {") + for constraintKey, constraintValue := range value.Constraints { + logLine.WriteString("{") + logLine.WriteString(constraintKey) + logLine.WriteString(":") + logLine.WriteString(fmt.Sprintf("%v", constraintValue)) + logLine.WriteString("}") + } + logLine.WriteString(fmt.Sprint("} value: ", value.Value, " }")) + } +} diff --git a/common/dynamicconfig/config.go b/common/dynamicconfig/config.go index 2415326068d..a8956ca0317 100644 --- a/common/dynamicconfig/config.go +++ b/common/dynamicconfig/config.go @@ -25,8 +25,6 @@ package dynamicconfig import ( - "reflect" - "sync" "sync/atomic" "time" @@ -36,6 +34,15 @@ import ( "go.temporal.io/server/common/log/tag" ) +// Collection wraps dynamic config client with a closure so that across the code, the config values +// can be directly accessed by calling the function without propagating the client everywhere in +// code +type Collection struct { + client Client + logger log.Logger + errCount int64 +} + const ( errCountLogThreshold = 1000 ) @@ -45,21 +52,10 @@ func NewCollection(client Client, logger log.Logger) *Collection { return &Collection{ client: client, logger: logger, - keys: &sync.Map{}, errCount: -1, } } -// Collection wraps dynamic config client with a closure so that across the code, the config values -// can be directly accessed by calling the function without propagating the client everywhere in -// code -type Collection struct { - client Client - logger log.Logger - keys *sync.Map // map of config Key to strongly typed value - errCount int64 -} - func (c *Collection) logError(key Key, err error) { errCount := atomic.AddInt64(&c.errCount, 1) if errCount%errCountLogThreshold == 0 { @@ -68,18 +64,6 @@ func (c *Collection) logError(key Key, err error) { } } -func (c *Collection) logValue( - key Key, - value, defaultValue interface{}, - cmpValueEquals func(interface{}, interface{}) bool, -) { - cachedValue, isInCache := c.keys.Load(key) - if !isInCache || !cmpValueEquals(cachedValue, value) { - c.keys.Store(key, value) - c.logger.Info("Get dynamic config", tag.Name(key.String()), tag.Value(value), tag.DefaultValue(defaultValue)) - } -} - // PropertyFn is a wrapper to get property from dynamic config type PropertyFn func() interface{} @@ -153,7 +137,6 @@ func (c *Collection) GetProperty(key Key, defaultValue interface{}) PropertyFn { if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, reflect.DeepEqual) return val } } @@ -174,7 +157,7 @@ func (c *Collection) GetIntProperty(key Key, defaultValue int) IntPropertyFn { if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, intCompareEquals) + return val } } @@ -186,7 +169,7 @@ func (c *Collection) GetIntPropertyFilteredByNamespace(key Key, defaultValue int if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, intCompareEquals) + return val } } @@ -217,7 +200,6 @@ func (c *Collection) GetIntPropertyFilteredByTaskQueueInfo(key Key, defaultValue } } - c.logValue(key, val, defaultValue, intCompareEquals) return val } } @@ -233,7 +215,7 @@ func (c *Collection) GetIntPropertyFilteredByShardID(key Key, defaultValue int) if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, intCompareEquals) + return val } } @@ -245,7 +227,7 @@ func (c *Collection) GetFloat64Property(key Key, defaultValue float64) FloatProp if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, float64CompareEquals) + return val } } @@ -261,7 +243,7 @@ func (c *Collection) GetFloat64PropertyFilteredByShardID(key Key, defaultValue f if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, float64CompareEquals) + return val } } @@ -273,7 +255,7 @@ func (c *Collection) GetFloatPropertyFilteredByNamespace(key Key, defaultValue f if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, float64CompareEquals) + return val } } @@ -304,7 +286,6 @@ func (c *Collection) GetFloatPropertyFilteredByTaskQueueInfo(key Key, defaultVal } } - c.logValue(key, val, defaultValue, float64CompareEquals) return val } } @@ -316,7 +297,7 @@ func (c *Collection) GetDurationProperty(key Key, defaultValue time.Duration) Du if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, durationCompareEquals) + return val } } @@ -328,7 +309,7 @@ func (c *Collection) GetDurationPropertyFilteredByNamespace(key Key, defaultValu if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, durationCompareEquals) + return val } } @@ -340,7 +321,7 @@ func (c *Collection) GetDurationPropertyFilteredByNamespaceID(key Key, defaultVa if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, durationCompareEquals) + return val } } @@ -371,7 +352,6 @@ func (c *Collection) GetDurationPropertyFilteredByTaskQueueInfo(key Key, default } } - c.logValue(key, val, defaultValue, durationCompareEquals) return val } } @@ -387,7 +367,7 @@ func (c *Collection) GetDurationPropertyFilteredByShardID(key Key, defaultValue if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, durationCompareEquals) + return val } } @@ -399,7 +379,7 @@ func (c *Collection) GetBoolProperty(key Key, defaultValue bool) BoolPropertyFn if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, boolCompareEquals) + return val } } @@ -411,7 +391,7 @@ func (c *Collection) GetStringProperty(key Key, defaultValue string) StringPrope if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, stringCompareEquals) + return val } } @@ -423,7 +403,7 @@ func (c *Collection) GetMapProperty(key Key, defaultValue map[string]interface{} if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, reflect.DeepEqual) + return val } } @@ -435,7 +415,7 @@ func (c *Collection) GetStringPropertyFnWithNamespaceFilter(key Key, defaultValu if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, stringCompareEquals) + return val } } @@ -447,7 +427,7 @@ func (c *Collection) GetMapPropertyFnWithNamespaceFilter(key Key, defaultValue m if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, reflect.DeepEqual) + return val } } @@ -459,7 +439,7 @@ func (c *Collection) GetBoolPropertyFnWithNamespaceFilter(key Key, defaultValue if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, boolCompareEquals) + return val } } @@ -471,7 +451,7 @@ func (c *Collection) GetBoolPropertyFnWithNamespaceIDFilter(key Key, defaultValu if err != nil { c.logError(key, err) } - c.logValue(key, val, defaultValue, boolCompareEquals) + return val } } @@ -502,7 +482,6 @@ func (c *Collection) GetBoolPropertyFilteredByTaskQueueInfo(key Key, defaultValu } } - c.logValue(key, val, defaultValue, boolCompareEquals) return val } } diff --git a/common/dynamicconfig/config_test.go b/common/dynamicconfig/config_test.go index ea5d85ad698..4840dd41f6c 100644 --- a/common/dynamicconfig/config_test.go +++ b/common/dynamicconfig/config_test.go @@ -247,33 +247,3 @@ func TestDynamicConfigFilterTypeIsMapped(t *testing.T) { require.NotEmpty(t, filters[i]) } } - -func BenchmarkLogValue(b *testing.B) { - keys := []Key{ - HistorySizeLimitError, - MatchingThrottledLogRPS, - MatchingIdleTaskqueueCheckInterval, - } - values := []interface{}{ - 1024 * 1024, - 0.1, - 30 * time.Second, - } - cmpFuncs := []func(interface{}, interface{}) bool{ - intCompareEquals, - float64CompareEquals, - durationCompareEquals, - } - - collection := NewCollection(newInMemoryClient(), log.NewNoopLogger()) - // pre-warm the collection logValue map - for i := range keys { - collection.logValue(keys[i], values[i], values[i], cmpFuncs[i]) - } - - for i := 0; i < b.N; i++ { - for i := range keys { - collection.logValue(keys[i], values[i], values[i], cmpFuncs[i]) - } - } -} diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 54c43663a0e..79349b84fd9 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -578,7 +578,7 @@ const ( type Filter int func (f Filter) String() string { - if f <= unknownFilter || f > TaskType { + if f <= unknownFilter || f >= lastFilterTypeForTest { return filters[unknownFilter] } return filters[f] @@ -610,8 +610,6 @@ const ( lastFilterTypeForTest ) -const DefaultNumTaskQueuePartitions = 4 - // FilterOption is used to provide filters for dynamic config keys type FilterOption func(filterMap map[Filter]interface{}) diff --git a/common/dynamicconfig/file_based_client.go b/common/dynamicconfig/file_based_client.go index bcf1d931f54..4684f3904c6 100644 --- a/common/dynamicconfig/file_based_client.go +++ b/common/dynamicconfig/file_based_client.go @@ -22,6 +22,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination file_based_client_mock.go + package dynamicconfig import ( @@ -44,53 +46,91 @@ const ( fileMode = 0644 // used for update config file ) -// FileBasedClientConfig is the config for the file based dynamic config client. -// It specifies where the config file is stored and how often the config should be -// updated by checking the config file again. -type FileBasedClientConfig struct { - Filepath string `yaml:"filepath"` - PollInterval time.Duration `yaml:"pollInterval"` -} +type ( + fileReader interface { + Stat(src string) (os.FileInfo, error) + ReadFile(src string) ([]byte, error) + } -type fileBasedClient struct { - *basicClient - lastUpdatedTime time.Time - config *FileBasedClientConfig - doneCh <-chan interface{} - logger log.Logger -} + // FileBasedClientConfig is the config for the file based dynamic config client. + // It specifies where the config file is stored and how often the config should be + // updated by checking the config file again. + FileBasedClientConfig struct { + Filepath string `yaml:"filepath"` + PollInterval time.Duration `yaml:"pollInterval"` + } + + fileBasedClient struct { + *basicClient + reader fileReader + lastUpdatedTime time.Time + config *FileBasedClientConfig + doneCh <-chan interface{} + } + + osReader struct { + } +) // NewFileBasedClient creates a file based client. -func NewFileBasedClient(config *FileBasedClientConfig, logger log.Logger, doneCh <-chan interface{}) (Client, error) { - if err := validateConfig(config); err != nil { - return nil, fmt.Errorf("unable to validate dynamic config: %w", err) +func NewFileBasedClient(config *FileBasedClientConfig, logger log.Logger, doneCh <-chan interface{}) (*fileBasedClient, error) { + client := &fileBasedClient{ + basicClient: newBasicClient(logger), + reader: &osReader{}, + config: config, + doneCh: doneCh, } + err := client.init() + if err != nil { + return nil, err + } + + return client, nil +} + +func NewFileBasedClientWithReader(reader fileReader, config *FileBasedClientConfig, logger log.Logger, doneCh <-chan interface{}) (*fileBasedClient, error) { client := &fileBasedClient{ - basicClient: newBasicClient(), + basicClient: newBasicClient(logger), + reader: reader, config: config, doneCh: doneCh, - logger: logger, } - if err := client.update(); err != nil { - return nil, fmt.Errorf("unable to read dynamic config: %w", err) + + err := client.init() + if err != nil { + return nil, err + } + + return client, nil +} + +func (fc *fileBasedClient) init() error { + if err := fc.validateConfig(fc.config); err != nil { + return fmt.Errorf("unable to validate dynamic config: %w", err) } + + if err := fc.update(); err != nil { + return fmt.Errorf("unable to read dynamic config: %w", err) + } + go func() { - ticker := time.NewTicker(client.config.PollInterval) + ticker := time.NewTicker(fc.config.PollInterval) for { select { case <-ticker.C: - err := client.update() + err := fc.update() if err != nil { - client.logger.Error("Unable to update dynamic config.", tag.Error(err)) + fc.logger.Error("Unable to update dynamic config.", tag.Error(err)) } - case <-client.doneCh: + case <-fc.doneCh: ticker.Stop() return } } }() - return client, nil + + return nil } func (fc *fileBasedClient) update() error { @@ -100,7 +140,7 @@ func (fc *fileBasedClient) update() error { newValues := make(configValueMap) - info, err := os.Stat(fc.config.Filepath) + info, err := fc.reader.Stat(fc.config.Filepath) if err != nil { return fmt.Errorf("dynamic config file: %s: %w", fc.config.Filepath, err) } @@ -108,7 +148,7 @@ func (fc *fileBasedClient) update() error { return nil } - confContent, err := os.ReadFile(fc.config.Filepath) + confContent, err := fc.reader.ReadFile(fc.config.Filepath) if err != nil { return fmt.Errorf("dynamic config file: %s: %w", fc.config.Filepath, err) } @@ -117,7 +157,9 @@ func (fc *fileBasedClient) update() error { return fmt.Errorf("unable to decode dynamic config: %w", err) } - return fc.storeValues(newValues) + err = fc.storeValues(newValues) + + return err } func (fc *fileBasedClient) storeValues(newValues map[string][]*constrainedValue) error { @@ -138,16 +180,17 @@ func (fc *fileBasedClient) storeValues(newValues map[string][]*constrainedValue) formattedNewValues[strings.ToLower(key)] = valuesSlice } - fc.values.Store(formattedNewValues) + fc.basicClient.updateValues(formattedNewValues) fc.logger.Info("Updated dynamic config") + return nil } -func validateConfig(config *FileBasedClientConfig) error { +func (fc *fileBasedClient) validateConfig(config *FileBasedClientConfig) error { if config == nil { return errors.New("configuration for dynamic config client is nil") } - if _, err := os.Stat(config.Filepath); err != nil { + if _, err := fc.reader.Stat(config.Filepath); err != nil { return fmt.Errorf("dynamic config: %s: %w", config.Filepath, err) } if config.PollInterval < minPollInterval { @@ -194,3 +237,11 @@ func convertKeyTypeToStringSlice(s []interface{}) ([]interface{}, error) { } return stringKeySlice, nil } + +func (r *osReader) ReadFile(src string) ([]byte, error) { + return os.ReadFile(src) +} + +func (r *osReader) Stat(src string) (os.FileInfo, error) { + return os.Stat(src) +} diff --git a/common/dynamicconfig/file_based_client_mock.go b/common/dynamicconfig/file_based_client_mock.go new file mode 100644 index 00000000000..23c222faabf --- /dev/null +++ b/common/dynamicconfig/file_based_client_mock.go @@ -0,0 +1,89 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: file_based_client.go + +// Package dynamicconfig is a generated GoMock package. +package dynamicconfig + +import ( + os "os" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockfileReader is a mock of fileReader interface. +type MockfileReader struct { + ctrl *gomock.Controller + recorder *MockfileReaderMockRecorder +} + +// MockfileReaderMockRecorder is the mock recorder for MockfileReader. +type MockfileReaderMockRecorder struct { + mock *MockfileReader +} + +// NewMockfileReader creates a new mock instance. +func NewMockfileReader(ctrl *gomock.Controller) *MockfileReader { + mock := &MockfileReader{ctrl: ctrl} + mock.recorder = &MockfileReaderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockfileReader) EXPECT() *MockfileReaderMockRecorder { + return m.recorder +} + +// ReadFile mocks base method. +func (m *MockfileReader) ReadFile(src string) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadFile", src) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadFile indicates an expected call of ReadFile. +func (mr *MockfileReaderMockRecorder) ReadFile(src interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadFile", reflect.TypeOf((*MockfileReader)(nil).ReadFile), src) +} + +// Stat mocks base method. +func (m *MockfileReader) Stat(src string) (os.FileInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Stat", src) + ret0, _ := ret[0].(os.FileInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Stat indicates an expected call of Stat. +func (mr *MockfileReaderMockRecorder) Stat(src interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stat", reflect.TypeOf((*MockfileReader)(nil).Stat), src) +} diff --git a/common/dynamicconfig/file_based_client_test.go b/common/dynamicconfig/file_based_client_test.go index 459c1434350..f92a7648584 100644 --- a/common/dynamicconfig/file_based_client_test.go +++ b/common/dynamicconfig/file_based_client_test.go @@ -25,9 +25,11 @@ package dynamicconfig import ( + "os" "testing" "time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -357,3 +359,253 @@ func (s *fileBasedClientSuite) TestMatch() { s.Equal(tc.matched, matched) } } + +type MockFileInfo struct { + FileName string + IsDirectory bool + ModTimeValue time.Time +} + +func (mfi MockFileInfo) Name() string { return mfi.FileName } +func (mfi MockFileInfo) Size() int64 { return int64(8) } +func (mfi MockFileInfo) Mode() os.FileMode { return os.ModePerm } +func (mfi MockFileInfo) ModTime() time.Time { return mfi.ModTimeValue } +func (mfi MockFileInfo) IsDir() bool { return mfi.IsDirectory } +func (mfi MockFileInfo) Sys() interface{} { return nil } + +func (s *fileBasedClientSuite) TestUpdate_ChangedValue() { + ctrl := gomock.NewController(s.T()) + defer ctrl.Finish() + + doneCh := make(chan interface{}) + reader := NewMockfileReader(ctrl) + mockLogger := log.NewMockLogger(ctrl) + + updateInterval := time.Minute * 5 + originFileInfo := &MockFileInfo{ModTimeValue: time.Now()} + updatedFileInfo := &MockFileInfo{ModTimeValue: originFileInfo.ModTimeValue.Add(updateInterval + time.Second)} + + originFileData := []byte(` +testGetFloat64PropertyKey: +- value: 12 + constraints: {} + +testGetIntPropertyKey: +- value: 1000 + constraints: {} + +testGetBoolPropertyKey: +- value: false + constraints: {} +- value: true + constraints: + namespace: global-samples-namespace +- value: true + constraints: + namespace: samples-namespace +`) + updatedFileData := []byte(` +testGetFloat64PropertyKey: +- value: 13 + constraints: {} + +testGetIntPropertyKey: +- value: 2000 + constraints: {} + +testGetBoolPropertyKey: +- value: true + constraints: {} +- value: false + constraints: + namespace: global-samples-namespace +- value: true + constraints: + namespace: samples-namespace +`) + + reader.EXPECT().Stat(gomock.Any()).Return(originFileInfo, nil).Times(2) + reader.EXPECT().ReadFile(gomock.Any()).Return(originFileData, nil) + + mockLogger.EXPECT().Info(gomock.Any()).Times(6) + client, err := NewFileBasedClientWithReader(reader, + &FileBasedClientConfig{ + Filepath: "anyValue", + PollInterval: updateInterval, + }, mockLogger, s.doneCh) + s.NoError(err) + + reader.EXPECT().Stat(gomock.Any()).Return(updatedFileInfo, nil) + reader.EXPECT().ReadFile(gomock.Any()).Return(updatedFileData, nil) + + mockLogger.EXPECT().Info("dynamic config changed for the key: testgetfloat64propertykey oldValue: { constraints: {} value: 12 } newValue: { constraints: {} value: 13 }", gomock.Any()) + mockLogger.EXPECT().Info("dynamic config changed for the key: testgetintpropertykey oldValue: { constraints: {} value: 1000 } newValue: { constraints: {} value: 2000 }", gomock.Any()) + mockLogger.EXPECT().Info("dynamic config changed for the key: testgetboolpropertykey oldValue: { constraints: {} value: false } newValue: { constraints: {} value: true }", gomock.Any()) + mockLogger.EXPECT().Info("dynamic config changed for the key: testgetboolpropertykey oldValue: { constraints: {{namespace:global-samples-namespace}} value: true } newValue: { constraints: {{namespace:global-samples-namespace}} value: false }", gomock.Any()) + mockLogger.EXPECT().Info(gomock.Any()) + client.update() + s.NoError(err) + close(doneCh) +} + +func (s *fileBasedClientSuite) TestUpdate_ChangedMapValue() { + ctrl := gomock.NewController(s.T()) + defer ctrl.Finish() + + doneCh := make(chan interface{}) + reader := NewMockfileReader(ctrl) + mockLogger := log.NewMockLogger(ctrl) + + updateInterval := time.Minute * 5 + originFileInfo := &MockFileInfo{ModTimeValue: time.Now()} + updatedFileInfo := &MockFileInfo{ModTimeValue: originFileInfo.ModTimeValue.Add(updateInterval + time.Second)} + + originFileData := []byte(` +history.defaultActivityRetryPolicy: +- value: + InitialIntervalInSeconds: 1 + MaximumIntervalCoefficient: 100.0 + BackoffCoefficient: 3.0 + MaximumAttempts: 0 +`) + updatedFileData := []byte(` +history.defaultActivityRetryPolicy: +- value: + InitialIntervalInSeconds: 3 + MaximumIntervalCoefficient: 100.0 + BackoffCoefficient: 2.0 + MaximumAttempts: 0 +`) + + reader.EXPECT().Stat(gomock.Any()).Return(originFileInfo, nil).Times(2) + reader.EXPECT().ReadFile(gomock.Any()).Return(originFileData, nil) + + mockLogger.EXPECT().Info(gomock.Any()).Times(2) + client, err := NewFileBasedClientWithReader(reader, + &FileBasedClientConfig{ + Filepath: "anyValue", + PollInterval: updateInterval, + }, mockLogger, s.doneCh) + s.NoError(err) + + reader.EXPECT().Stat(gomock.Any()).Return(updatedFileInfo, nil) + reader.EXPECT().ReadFile(gomock.Any()).Return(updatedFileData, nil) + + mockLogger.EXPECT().Info("dynamic config changed for the key: history.defaultactivityretrypolicy oldValue: { constraints: {} value: map[BackoffCoefficient:3 InitialIntervalInSeconds:1 MaximumAttempts:0 MaximumIntervalCoefficient:100] } newValue: { constraints: {} value: map[BackoffCoefficient:2 InitialIntervalInSeconds:3 MaximumAttempts:0 MaximumIntervalCoefficient:100] }", gomock.Any()) + mockLogger.EXPECT().Info(gomock.Any()) + client.update() + s.NoError(err) + close(doneCh) +} + +func (s *fileBasedClientSuite) TestUpdate_NewEntry() { + ctrl := gomock.NewController(s.T()) + defer ctrl.Finish() + + doneCh := make(chan interface{}) + reader := NewMockfileReader(ctrl) + mockLogger := log.NewMockLogger(ctrl) + + updateInterval := time.Minute * 5 + originFileInfo := &MockFileInfo{ModTimeValue: time.Now()} + updatedFileInfo := &MockFileInfo{ModTimeValue: originFileInfo.ModTimeValue.Add(updateInterval + time.Second)} + + originFileData := []byte(` +testGetFloat64PropertyKey: +- value: 12 + constraints: {} +`) + updatedFileData := []byte(` +testGetFloat64PropertyKey: +- value: 12 + constraints: {} +- value: 22 + constraints: + namespace: samples-namespace + +testGetIntPropertyKey: +- value: 2000 + constraints: {} +`) + + reader.EXPECT().Stat(gomock.Any()).Return(originFileInfo, nil).Times(2) + reader.EXPECT().ReadFile(gomock.Any()).Return(originFileData, nil) + + mockLogger.EXPECT().Info("dynamic config changed for the key: testgetfloat64propertykey oldValue: nil newValue: { constraints: {} value: 12 }", gomock.Any()) + mockLogger.EXPECT().Info(gomock.Any()) + client, err := NewFileBasedClientWithReader(reader, + &FileBasedClientConfig{ + Filepath: "anyValue", + PollInterval: updateInterval, + }, mockLogger, s.doneCh) + s.NoError(err) + + reader.EXPECT().Stat(gomock.Any()).Return(updatedFileInfo, nil) + reader.EXPECT().ReadFile(gomock.Any()).Return(updatedFileData, nil) + + mockLogger.EXPECT().Info("dynamic config changed for the key: testgetfloat64propertykey oldValue: nil newValue: { constraints: {{namespace:samples-namespace}} value: 22 }", gomock.Any()) + mockLogger.EXPECT().Info("dynamic config changed for the key: testgetintpropertykey oldValue: nil newValue: { constraints: {} value: 2000 }", gomock.Any()) + mockLogger.EXPECT().Info(gomock.Any()) + client.update() + s.NoError(err) + close(doneCh) +} + +func (s *fileBasedClientSuite) TestUpdate_ChangeOrder_ShouldNotWriteLog() { + ctrl := gomock.NewController(s.T()) + defer ctrl.Finish() + + doneCh := make(chan interface{}) + reader := NewMockfileReader(ctrl) + mockLogger := log.NewMockLogger(ctrl) + + updateInterval := time.Minute * 5 + originFileInfo := &MockFileInfo{ModTimeValue: time.Now()} + updatedFileInfo := &MockFileInfo{ModTimeValue: originFileInfo.ModTimeValue.Add(updateInterval + time.Second)} + + originFileData := []byte(` +testGetFloat64PropertyKey: +- value: 12 + constraints: {} +- value: 22 + constraints: + namespace: samples-namespace + testConstraint: testConstraintValue + +testGetIntPropertyKey: +- value: 2000 + constraints: {} +`) + updatedFileData := []byte(` +testGetIntPropertyKey: +- value: 2000 + constraints: {} + +testGetFloat64PropertyKey: +- value: 22 + constraints: + testConstraint: testConstraintValue + namespace: samples-namespace +- value: 12 + constraints: {} +`) + + reader.EXPECT().Stat(gomock.Any()).Return(originFileInfo, nil).Times(2) + reader.EXPECT().ReadFile(gomock.Any()).Return(originFileData, nil) + + mockLogger.EXPECT().Info(gomock.Any()).Times(4) + client, err := NewFileBasedClientWithReader(reader, + &FileBasedClientConfig{ + Filepath: "anyValue", + PollInterval: updateInterval, + }, mockLogger, s.doneCh) + s.NoError(err) + + reader.EXPECT().Stat(gomock.Any()).Return(updatedFileInfo, nil) + reader.EXPECT().ReadFile(gomock.Any()).Return(updatedFileData, nil) + + mockLogger.EXPECT().Info(gomock.Any()) + client.update() + s.NoError(err) + close(doneCh) +} diff --git a/common/dynamicconfig/interface.go b/common/dynamicconfig/interface.go index 708e9b360e1..f74b875fee2 100644 --- a/common/dynamicconfig/interface.go +++ b/common/dynamicconfig/interface.go @@ -32,18 +32,20 @@ import ( // Client allows fetching values from a dynamic configuration system NOTE: This does not have async // options right now. In the interest of keeping it minimal, we can add when requirement arises. -type Client interface { - GetValue(name Key, defaultValue interface{}) (interface{}, error) - GetValueWithFilters(name Key, filters map[Filter]interface{}, defaultValue interface{}) (interface{}, error) +type ( + Client interface { + GetValue(name Key, defaultValue interface{}) (interface{}, error) + GetValueWithFilters(name Key, filters map[Filter]interface{}, defaultValue interface{}) (interface{}, error) - GetIntValue(name Key, filters map[Filter]interface{}, defaultValue int) (int, error) - GetFloatValue(name Key, filters map[Filter]interface{}, defaultValue float64) (float64, error) - GetBoolValue(name Key, filters map[Filter]interface{}, defaultValue bool) (bool, error) - GetStringValue(name Key, filters map[Filter]interface{}, defaultValue string) (string, error) - GetMapValue( - name Key, filters map[Filter]interface{}, defaultValue map[string]interface{}, - ) (map[string]interface{}, error) - GetDurationValue( - name Key, filters map[Filter]interface{}, defaultValue time.Duration, - ) (time.Duration, error) -} + GetIntValue(name Key, filters map[Filter]interface{}, defaultValue int) (int, error) + GetFloatValue(name Key, filters map[Filter]interface{}, defaultValue float64) (float64, error) + GetBoolValue(name Key, filters map[Filter]interface{}, defaultValue bool) (bool, error) + GetStringValue(name Key, filters map[Filter]interface{}, defaultValue string) (string, error) + GetMapValue( + name Key, filters map[Filter]interface{}, defaultValue map[string]interface{}, + ) (map[string]interface{}, error) + GetDurationValue( + name Key, filters map[Filter]interface{}, defaultValue time.Duration, + ) (time.Duration, error) + } +) diff --git a/common/dynamicconfig/mutable_ephemeral_client.go b/common/dynamicconfig/mutable_ephemeral_client.go index e9e7f8c11b4..7cbe404147f 100644 --- a/common/dynamicconfig/mutable_ephemeral_client.go +++ b/common/dynamicconfig/mutable_ephemeral_client.go @@ -27,6 +27,8 @@ package dynamicconfig import ( "strings" "sync" + + "go.temporal.io/server/common/log" ) // MutableEphemeralClient is a dynamicconfig.Client implementation that is @@ -42,7 +44,7 @@ type MutableEphemeralClient struct { // NewMutableEphemeralClient constructs a new MutableEphemeralClient with an // empty internal store. func NewMutableEphemeralClient(mutations ...Mutation) *MutableEphemeralClient { - c := &MutableEphemeralClient{basicClient: newBasicClient()} + c := &MutableEphemeralClient{basicClient: newBasicClient(log.NewNoopLogger())} c.Update(mutations...) return c } diff --git a/common/dynamicconfig/shared_constants.go b/common/dynamicconfig/shared_constants.go new file mode 100644 index 00000000000..5608e0aa55c --- /dev/null +++ b/common/dynamicconfig/shared_constants.go @@ -0,0 +1,27 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dynamicconfig + +const DefaultNumTaskQueuePartitions = 4 diff --git a/service/worker/service.go b/service/worker/service.go index 9c892042126..04cbe271f11 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -167,11 +167,7 @@ func NewService( } // NewConfig builds the new Config for worker service -func NewConfig(logger log.Logger, dcClient dynamicconfig.Client, params *resource.BootstrapParams) *Config { - dc := dynamicconfig.NewCollection( - dcClient, - logger, - ) +func NewConfig(dc *dynamicconfig.Collection, params *resource.BootstrapParams) *Config { config := &Config{ ArchiverConfig: &archiver.Config{ MaxConcurrentActivityExecutionSize: dc.GetIntProperty( diff --git a/temporal/server_impl.go b/temporal/server_impl.go index 4d5d3118c82..fd9c10e5e03 100644 --- a/temporal/server_impl.go +++ b/temporal/server_impl.go @@ -64,8 +64,7 @@ type ( namespaceLogger NamespaceLogger serverReporter metrics.Reporter - dynamicConfigClient dynamicconfig.Client - dcCollection *dynamicconfig.Collection + dcCollection *dynamicconfig.Collection persistenceConfig config.Persistence clusterMetadata *cluster.Config @@ -83,7 +82,6 @@ func NewServerFxImpl( logger log.Logger, namespaceLogger NamespaceLogger, stoppedCh chan interface{}, - dynamicConfigClient dynamicconfig.Client, dcCollection *dynamicconfig.Collection, serverReporter ServerReporter, servicesGroup ServicesGroupIn, @@ -91,16 +89,15 @@ func NewServerFxImpl( clusterMetadata *cluster.Config, ) *ServerImpl { s := &ServerImpl{ - so: opts, - servicesMetadata: servicesGroup.Services, - stoppedCh: stoppedCh, - logger: logger, - namespaceLogger: namespaceLogger, - serverReporter: serverReporter, - dynamicConfigClient: dynamicConfigClient, - dcCollection: dcCollection, - persistenceConfig: persistenceConfig, - clusterMetadata: clusterMetadata, + so: opts, + servicesMetadata: servicesGroup.Services, + stoppedCh: stoppedCh, + logger: logger, + namespaceLogger: namespaceLogger, + serverReporter: serverReporter, + dcCollection: dcCollection, + persistenceConfig: persistenceConfig, + clusterMetadata: clusterMetadata, } return s }