Skip to content

Commit

Permalink
Remove params field from service and handler objects (#1424)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Apr 1, 2021
1 parent a04944a commit a3571d1
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 196 deletions.
4 changes: 2 additions & 2 deletions common/resource/bootstrapParams.go
Expand Up @@ -62,9 +62,9 @@ type (
MetricsClient metrics.Client
ESClient client.Client
ESConfig *config.Elasticsearch
DynamicConfig dynamicconfig.Client
DynamicConfigClient dynamicconfig.Client
DCRedirectionPolicy config.DCRedirectionPolicy
PublicClient sdkclient.Client
SdkClient sdkclient.Client
ArchivalMetadata archiver.ArchivalMetadata
ArchiverProvider provider.ArchiverProvider
Authorizer authorization.Authorizer
Expand Down
4 changes: 2 additions & 2 deletions common/resource/resourceImpl.go
Expand Up @@ -201,7 +201,7 @@ func New(
return nil, err
}

dynamicCollection := dynamicconfig.NewCollection(params.DynamicConfig, logger)
dynamicCollection := dynamicconfig.NewCollection(params.DynamicConfigClient, logger)
clientBean, err := client.NewClientBean(
client.NewRPCClientFactory(
params.RPCFactory,
Expand Down Expand Up @@ -327,7 +327,7 @@ func New(

// internal services clients

sdkClient: params.PublicClient,
sdkClient: params.SdkClient,
frontendRawClient: frontendRawClient,
frontendClient: frontendClient,
matchingRawClient: matchingRawClient,
Expand Down
22 changes: 11 additions & 11 deletions host/onebox.go
Expand Up @@ -364,7 +364,7 @@ func (c *temporalImpl) GetHistoryClient() historyservice.HistoryServiceClient {
}

func (c *temporalImpl) startFrontend(hosts map[string][]string, startWG *sync.WaitGroup) {
params := new(resource.BootstrapParams)
params := &resource.BootstrapParams{}
params.DCRedirectionPolicy = config.DCRedirectionPolicy{}
params.Name = common.FrontendServiceName
params.Logger = c.logger
Expand All @@ -377,7 +377,7 @@ func (c *temporalImpl) startFrontend(hosts map[string][]string, startWG *sync.Wa
}
params.ClusterMetadataConfig = c.clusterMetadataConfig
params.MetricsClient = metrics.NewClient(params.MetricsScope, metrics.GetMetricsServiceIdx(params.Name, c.logger))
params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNoopClient())
params.DynamicConfigClient = newIntegrationConfigClient(dynamicconfig.NewNoopClient())
params.ArchivalMetadata = c.archiverMetadata
params.ArchiverProvider = c.archiverProvider
params.ESConfig = c.esConfig
Expand Down Expand Up @@ -430,7 +430,7 @@ func (c *temporalImpl) startHistory(
) {
membershipPorts := c.HistoryServiceAddress(2)
for i, grpcPort := range c.HistoryServiceAddress(3) {
params := new(resource.BootstrapParams)
params := &resource.BootstrapParams{}
params.Name = common.HistoryServiceName
params.Logger = c.logger
params.ThrottledLogger = c.logger
Expand All @@ -443,10 +443,10 @@ func (c *temporalImpl) startHistory(
params.MetricsClient = metrics.NewClient(params.MetricsScope, metrics.GetMetricsServiceIdx(params.Name, c.logger))
integrationClient := newIntegrationConfigClient(dynamicconfig.NewNoopClient())
c.overrideHistoryDynamicConfig(integrationClient)
params.DynamicConfig = integrationClient
params.DynamicConfigClient = integrationClient

var err error
params.PublicClient, err = sdkclient.NewClient(sdkclient.Options{
params.SdkClient, err = sdkclient.NewClient(sdkclient.Options{
HostPort: c.FrontendGRPCAddress(),
Namespace: common.SystemLocalNamespace,
MetricsScope: params.MetricsScope,
Expand Down Expand Up @@ -513,7 +513,7 @@ func (c *temporalImpl) startHistory(

func (c *temporalImpl) startMatching(hosts map[string][]string, startWG *sync.WaitGroup) {

params := new(resource.BootstrapParams)
params := &resource.BootstrapParams{}
params.Name = common.MatchingServiceName
params.Logger = c.logger
params.ThrottledLogger = c.logger
Expand All @@ -524,7 +524,7 @@ func (c *temporalImpl) startMatching(hosts map[string][]string, startWG *sync.Wa
}
params.ClusterMetadataConfig = c.clusterMetadataConfig
params.MetricsClient = metrics.NewClient(params.MetricsScope, metrics.GetMetricsServiceIdx(params.Name, c.logger))
params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNoopClient())
params.DynamicConfigClient = newIntegrationConfigClient(dynamicconfig.NewNoopClient())
params.ArchivalMetadata = c.archiverMetadata
params.ArchiverProvider = c.archiverProvider

Expand Down Expand Up @@ -556,7 +556,7 @@ func (c *temporalImpl) startMatching(hosts map[string][]string, startWG *sync.Wa
}

func (c *temporalImpl) startWorker(hosts map[string][]string, startWG *sync.WaitGroup) {
params := new(resource.BootstrapParams)
params := &resource.BootstrapParams{}
params.Name = common.WorkerServiceName
params.Logger = c.logger
params.ThrottledLogger = c.logger
Expand All @@ -567,7 +567,7 @@ func (c *temporalImpl) startWorker(hosts map[string][]string, startWG *sync.Wait
}
params.ClusterMetadataConfig = c.clusterMetadataConfig
params.MetricsClient = metrics.NewClient(params.MetricsScope, metrics.GetMetricsServiceIdx(params.Name, c.logger))
params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNoopClient())
params.DynamicConfigClient = newIntegrationConfigClient(dynamicconfig.NewNoopClient())
params.ArchivalMetadata = c.archiverMetadata
params.ArchiverProvider = c.archiverProvider

Expand All @@ -578,7 +578,7 @@ func (c *temporalImpl) startWorker(hosts map[string][]string, startWG *sync.Wait
}
params.PersistenceServiceResolver = resolver.NewNoopResolver()

params.PublicClient, err = sdkclient.NewClient(sdkclient.Options{
params.SdkClient, err = sdkclient.NewClient(sdkclient.Options{
HostPort: c.FrontendGRPCAddress(),
Namespace: common.SystemLocalNamespace,
MetricsScope: params.MetricsScope,
Expand Down Expand Up @@ -663,7 +663,7 @@ func (c *temporalImpl) startWorkerClientWorker(params *resource.BootstrapParams,
workerConfig.ArchiverConfig.ArchiverConcurrency = dynamicconfig.GetIntPropertyFn(10)

bc := &archiver.BootstrapContainer{
PublicClient: params.PublicClient,
SdkClient: params.SdkClient,
MetricsClient: service.GetMetricsClient(),
Logger: c.logger,
HistoryV2Manager: c.historyV2Mgr,
Expand Down
31 changes: 18 additions & 13 deletions service/frontend/adminHandler.go
Expand Up @@ -30,31 +30,32 @@ import (
"fmt"
"sync/atomic"

historyspb "go.temporal.io/server/api/history/v1"
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/searchattribute"

"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/api/adminservice/v1"
clusterspb "go.temporal.io/server/api/cluster/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
historyspb "go.temporal.io/server/api/history/v1"
"go.temporal.io/server/api/historyservice/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
esclient "go.temporal.io/server/common/persistence/elasticsearch/client"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/xdc"
)

Expand All @@ -70,7 +71,9 @@ type (
status int32

numberOfHistoryShards int32
params *resource.BootstrapParams
ESConfig *config.Elasticsearch
ESClient esclient.Client
DynamicConfigClient dynamicconfig.Client
config *Config
namespaceDLQHandler namespace.DLQMessageHandler
eventSerializder persistence.PayloadSerializer
Expand Down Expand Up @@ -99,14 +102,16 @@ func NewAdminHandler(
Resource: resource,
status: common.DaemonStatusInitialized,
numberOfHistoryShards: params.PersistenceConfig.NumHistoryShards,
params: params,
config: config,
namespaceDLQHandler: namespace.NewDLQMessageHandler(
namespaceReplicationTaskExecutor,
resource.GetNamespaceReplicationQueue(),
resource.GetLogger(),
),
eventSerializder: persistence.NewPayloadSerializer(),
eventSerializder: persistence.NewPayloadSerializer(),
DynamicConfigClient: params.DynamicConfigClient,
ESConfig: params.ESConfig,
ESClient: params.ESClient,
}
}

Expand Down Expand Up @@ -159,7 +164,7 @@ func (adh *AdminHandler) AddSearchAttribute(ctx context.Context, request *admins
}

searchAttr := request.GetSearchAttribute()
currentValidAttr, _ := adh.params.DynamicConfig.GetMapValue(
currentValidAttr, _ := adh.DynamicConfigClient.GetMapValue(
dynamicconfig.ValidSearchAttributes, nil, searchattribute.GetDefaultTypeMap())
for k, v := range searchAttr {
if searchattribute.IsReservedField(k) || searchattribute.IsBuiltIn(k) {
Expand All @@ -173,19 +178,19 @@ func (adh *AdminHandler) AddSearchAttribute(ctx context.Context, request *admins
}

// update dynamic config
err := adh.params.DynamicConfig.UpdateValue(dynamicconfig.ValidSearchAttributes, currentValidAttr)
err := adh.DynamicConfigClient.UpdateValue(dynamicconfig.ValidSearchAttributes, currentValidAttr)
if err != nil {
return nil, adh.error(serviceerror.NewInternal(fmt.Sprintf(errFailedUpdateDynamicConfigMessage, err)), scope)
}

// update elasticsearch mapping, new added field will not be able to remove or update
index := adh.params.ESConfig.GetVisibilityIndex()
index := adh.ESConfig.GetVisibilityIndex()
for k, v := range searchAttr {
esType := searchattribute.GetESType(v)
if len(esType) == 0 {
return nil, adh.error(serviceerror.NewInvalidArgument(fmt.Sprintf(errUnknownValueTypeMessage, v)), scope)
}
err := adh.params.ESClient.PutMapping(ctx, index, searchattribute.Attr, k, esType)
err := adh.ESClient.PutMapping(ctx, index, searchattribute.Attr, k, esType)
if err != nil {
return nil, adh.error(serviceerror.NewInternal(fmt.Sprintf(errFailedToUpdateESMappingMessage, err)), scope)
}
Expand Down Expand Up @@ -938,7 +943,7 @@ func (adh *AdminHandler) validateGetWorkflowExecutionRawHistoryV2Request(
}

func (adh *AdminHandler) validateConfigForAdvanceVisibility() error {
if adh.params.ESConfig == nil || adh.params.ESClient == nil {
if adh.ESConfig == nil || adh.ESClient == nil {
return errors.New("ES related config not found")
}
return nil
Expand Down
15 changes: 7 additions & 8 deletions service/frontend/adminHandler_test.go
Expand Up @@ -395,7 +395,6 @@ func (s *adminHandlerSuite) Test_SetRequestDefaultValueAndGetTargetVersionHistor

func (s *adminHandlerSuite) Test_AddSearchAttribute_Validate() {
handler := s.handler
handler.params = &resource.BootstrapParams{}
ctx := context.Background()

type test struct {
Expand Down Expand Up @@ -431,17 +430,17 @@ func (s *adminHandlerSuite) Test_AddSearchAttribute_Validate() {
s.Nil(resp)
}

dynamicConfig := dynamicconfig.NewMockClient(s.controller)
handler.params.DynamicConfig = dynamicConfig
dynamicConfigClient := dynamicconfig.NewMockClient(s.controller)
handler.DynamicConfigClient = dynamicConfigClient
// add advanced visibility store related config
handler.params.ESConfig = &config.Elasticsearch{}
handler.ESConfig = &config.Elasticsearch{}
esClient := client.NewMockClient(s.controller)
handler.params.ESClient = esClient
handler.ESClient = esClient

mockValidAttr := map[string]interface{}{
"testkey": enumspb.INDEXED_VALUE_TYPE_KEYWORD,
}
dynamicConfig.EXPECT().GetMapValue(dynamicconfig.ValidSearchAttributes, nil, searchattribute.GetDefaultTypeMap()).
dynamicConfigClient.EXPECT().GetMapValue(dynamicconfig.ValidSearchAttributes, nil, searchattribute.GetDefaultTypeMap()).
Return(mockValidAttr, nil).AnyTimes()

testCases2 := []test{
Expand Down Expand Up @@ -479,7 +478,7 @@ func (s *adminHandlerSuite) Test_AddSearchAttribute_Validate() {
},
Expected: &serviceerror.Internal{Message: "Failed to update dynamic config, err: error."},
}
dynamicConfig.EXPECT().UpdateValue(dynamicconfig.ValidSearchAttributes, map[string]interface{}{
dynamicConfigClient.EXPECT().UpdateValue(dynamicconfig.ValidSearchAttributes, map[string]interface{}{
"testkey": enumspb.INDEXED_VALUE_TYPE_KEYWORD,
"testkey2": 1,
}).Return(errors.New("error"))
Expand All @@ -489,7 +488,7 @@ func (s *adminHandlerSuite) Test_AddSearchAttribute_Validate() {
s.Nil(resp)

// ES operations tests
dynamicConfig.EXPECT().UpdateValue(gomock.Any(), gomock.Any()).Return(nil).Times(2)
dynamicConfigClient.EXPECT().UpdateValue(gomock.Any(), gomock.Any()).Return(nil).Times(2)

convertFailedTest := test{
Name: "unknown value type",
Expand Down

0 comments on commit a3571d1

Please sign in to comment.