Skip to content

Commit

Permalink
Adding reporter to shard context for access to UserScope (#2961)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbreiding committed Jun 7, 2022
1 parent 81e7a13 commit e4918d9
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 8 deletions.
1 change: 1 addition & 0 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type (
GetLogger() log.Logger
GetThrottledLogger() log.Logger
GetMetricsClient() metrics.Client
GetMetricsReporter() metrics.Reporter
GetTimeSource() clock.TimeSource

GetEngine() (Engine, error)
Expand Down
19 changes: 13 additions & 6 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ import (
"go.temporal.io/server/service/history/vclock"
)

var (
persistenceOperationRetryPolicy = common.CreatePersistenceRetryPolicy()
)
var persistenceOperationRetryPolicy = common.CreatePersistenceRetryPolicy()

const (
// See transitionLocked for overview of state transitions.
Expand All @@ -90,6 +88,7 @@ type (
shardID int32
executionManager persistence.ExecutionManager
metricsClient metrics.Client
metricsReporter metrics.Reporter
eventsCache events.Cache
closeCallback func(*ContextImpl)
config *configs.Config
Expand Down Expand Up @@ -439,7 +438,6 @@ func (s *ContextImpl) UpdateReplicatorDLQAckLevel(
sourceCluster string,
ackLevel int64,
) error {

s.wLock()
defer s.wUnlock()

Expand Down Expand Up @@ -1116,7 +1114,8 @@ func (s *ContextImpl) renewRangeLocked(isStealing bool) error {
defer cancel()
err := s.persistenceShardManager.UpdateShard(ctx, &persistence.UpdateShardRequest{
ShardInfo: updatedShardInfo.ShardInfo,
PreviousRangeID: s.shardInfo.GetRangeId()})
PreviousRangeID: s.shardInfo.GetRangeId(),
})
if err != nil {
// Failure in updating shard to grab new RangeID
s.contextTaggedLogger.Error("Persistent store operation failure",
Expand Down Expand Up @@ -1818,6 +1817,7 @@ func newContext(
clientBean client.Bean,
historyClient historyservice.HistoryServiceClient,
metricsClient metrics.Client,
metricsReporter metrics.Reporter,
payloadSerializer serialization.Serializer,
timeSource clock.TimeSource,
namespaceRegistry namespace.Registry,
Expand All @@ -1827,7 +1827,6 @@ func newContext(
archivalMetadata archiver.ArchivalMetadata,
hostInfoProvider membership.HostInfoProvider,
) (*ContextImpl, error) {

hostIdentity := hostInfoProvider.HostInfo().Identity()

lifecycleCtx, lifecycleCancel := context.WithCancel(context.Background())
Expand All @@ -1837,6 +1836,7 @@ func newContext(
shardID: shardID,
executionManager: persistenceExecutionManager,
metricsClient: metricsClient,
metricsReporter: metricsReporter,
closeCallback: closeCallback,
config: config,
contextTaggedLogger: log.With(logger, tag.ShardID(shardID), tag.Address(hostIdentity)),
Expand Down Expand Up @@ -1918,6 +1918,7 @@ func copyShardInfo(shardInfo *persistence.ShardInfoWithFailover) *persistence.Sh
func (s *ContextImpl) GetRemoteAdminClient(cluster string) (adminservice.AdminServiceClient, error) {
return s.clientBean.GetRemoteAdminClient(cluster)
}

func (s *ContextImpl) GetPayloadSerializer() serialization.Serializer {
return s.payloadSerializer
}
Expand All @@ -1930,6 +1931,10 @@ func (s *ContextImpl) GetMetricsClient() metrics.Client {
return s.metricsClient
}

func (s *ContextImpl) GetMetricsReporter() metrics.Reporter {
return s.metricsReporter
}

func (s *ContextImpl) GetTimeSource() clock.TimeSource {
return s.timeSource
}
Expand All @@ -1941,9 +1946,11 @@ func (s *ContextImpl) GetNamespaceRegistry() namespace.Registry {
func (s *ContextImpl) GetSearchAttributesProvider() searchattribute.Provider {
return s.saProvider
}

func (s *ContextImpl) GetSearchAttributesMapper() searchattribute.Mapper {
return s.saMapper
}

func (s *ContextImpl) GetClusterMetadata() cluster.Metadata {
return s.clusterMetadata
}
Expand Down
14 changes: 14 additions & 0 deletions service/history/shard/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions service/history/shard/controller_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type (
historyClient historyservice.HistoryServiceClient
historyServiceResolver membership.ServiceResolver
metricsClient metrics.Client
metricsReporter metrics.Reporter
payloadSerializer serialization.Serializer
timeSource clock.TimeSource
namespaceRegistry namespace.Registry
Expand Down Expand Up @@ -242,6 +243,7 @@ func (c *ControllerImpl) getOrCreateShardContext(shardID int32) (*ContextImpl, e
c.clientBean,
c.historyClient,
c.metricsClient,
c.metricsReporter,
c.payloadSerializer,
c.timeSource,
c.namespaceRegistry,
Expand Down Expand Up @@ -290,14 +292,12 @@ func (c *ControllerImpl) removeShard(shardID int32, expected *ContextImpl) (*Con
// b. Periodic ticker
// c. ShardOwnershipLostError and subsequent ShardClosedEvents from engine
func (c *ControllerImpl) shardManagementPump() {

defer c.shutdownWG.Done()

acquireTicker := time.NewTicker(c.config.AcquireShardInterval())
defer acquireTicker.Stop()

for {

select {
case <-c.shutdownCh:
c.doShutdown()
Expand Down
2 changes: 2 additions & 0 deletions service/history/shard/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func ShardControllerProvider(
historyClient historyservice.HistoryServiceClient,
historyServiceResolver membership.ServiceResolver,
metricsClient metrics.Client,
metricsReporter metrics.Reporter,
payloadSerializer serialization.Serializer,
timeSource clock.TimeSource,
namespaceRegistry namespace.Registry,
Expand All @@ -84,6 +85,7 @@ func ShardControllerProvider(
historyClient: historyClient,
historyServiceResolver: historyServiceResolver,
metricsClient: metricsClient,
metricsReporter: metricsReporter,
payloadSerializer: payloadSerializer,
timeSource: timeSource,
namespaceRegistry: namespaceRegistry,
Expand Down

0 comments on commit e4918d9

Please sign in to comment.