Skip to content

Commit

Permalink
Fix replication task execution latency (#3671)
Browse files Browse the repository at this point in the history
* Fix replication task execution latency

* Fix emit latency
  • Loading branch information
yux0 committed Nov 28, 2022
1 parent 82cbe64 commit 2105645
Show file tree
Hide file tree
Showing 15 changed files with 73 additions and 43 deletions.
4 changes: 3 additions & 1 deletion common/authorization/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ func (a *interceptor) authorize(
callTarget *CallTarget,
metricsHandler metrics.MetricsHandler) (Result, error) {
startTime := time.Now().UTC()
defer metricsHandler.Timer(metrics.ServiceAuthorizationLatency.GetMetricName()).Record(time.Since(startTime))
defer func() {
metricsHandler.Timer(metrics.ServiceAuthorizationLatency.GetMetricName()).Record(time.Since(startTime))
}()
return a.authorizer.Authorize(ctx, claims, callTarget)
}

Expand Down
8 changes: 6 additions & 2 deletions common/namespace/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,9 @@ func (r *registry) triggerNamespaceChangePrepareCallback(
prepareCallbacks []PrepareCallbackFn,
) {
startTime := time.Now().UTC()
defer r.metricsHandler.Timer(metrics.NamespaceCachePrepareCallbacksLatency.GetMetricName()).Record(time.Since(startTime))
defer func() {
r.metricsHandler.Timer(metrics.NamespaceCachePrepareCallbacksLatency.GetMetricName()).Record(time.Since(startTime))
}()

for _, prepareCallback := range prepareCallbacks {
prepareCallback()
Expand All @@ -630,7 +632,9 @@ func (r *registry) triggerNamespaceChangeCallback(
newNamespaces []*Namespace,
) {
startTime := time.Now().UTC()
defer r.metricsHandler.Timer(metrics.NamespaceCacheCallbacksLatency.GetMetricName()).Record(time.Since(startTime))
defer func() {
r.metricsHandler.Timer(metrics.NamespaceCacheCallbacksLatency.GetMetricName()).Record(time.Since(startTime))
}()

for _, callback := range callbacks {
callback(oldNamespaces, newNamespaces)
Expand Down
4 changes: 3 additions & 1 deletion service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,9 @@ func (adh *AdminHandler) GetWorkflowExecutionRawHistoryV2(ctx context.Context, r
defer log.CapturePanic(adh.logger, &retError)

taggedMetricsHandler, startTime := adh.startRequestProfile(metrics.AdminGetWorkflowExecutionRawHistoryV2Scope)
defer taggedMetricsHandler.Timer(metrics.ServiceLatency.GetMetricName()).Record(time.Since(startTime))
defer func() {
taggedMetricsHandler.Timer(metrics.ServiceLatency.GetMetricName()).Record(time.Since(startTime))
}()

if err := adh.validateGetWorkflowExecutionRawHistoryV2Request(
request,
Expand Down
10 changes: 5 additions & 5 deletions service/frontend/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (h *OperatorHandlerImpl) AddSearchAttributes(ctx context.Context, request *
defer log.CapturePanic(h.logger, &retError)

scope, startTime := h.startRequestProfile(metrics.OperatorAddSearchAttributesScope)
defer scope.Timer(metrics.ServiceLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { scope.Timer(metrics.ServiceLatency.GetMetricName()).Record(time.Since(startTime)) }()

// validate request
if request == nil {
Expand Down Expand Up @@ -294,7 +294,7 @@ func (h *OperatorHandlerImpl) DeleteNamespace(ctx context.Context, request *oper
defer log.CapturePanic(h.logger, &retError)

scope, startTime := h.startRequestProfile(metrics.OperatorDeleteNamespaceScope)
defer scope.Timer(metrics.ServiceLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { scope.Timer(metrics.ServiceLatency.GetMetricName()).Record(time.Since(startTime)) }()

// validate request
if request == nil {
Expand Down Expand Up @@ -353,7 +353,7 @@ func (h *OperatorHandlerImpl) AddOrUpdateRemoteCluster(
) (_ *operatorservice.AddOrUpdateRemoteClusterResponse, retError error) {
defer log.CapturePanic(h.logger, &retError)
scope, startTime := h.startRequestProfile(metrics.OperatorAddOrUpdateRemoteClusterScope)
defer scope.Timer(metrics.ServiceLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { scope.Timer(metrics.ServiceLatency.GetMetricName()).Record(time.Since(startTime)) }()

adminClient := h.clientFactory.NewRemoteAdminClientWithTimeout(
request.GetFrontendAddress(),
Expand Down Expand Up @@ -423,7 +423,7 @@ func (h *OperatorHandlerImpl) RemoveRemoteCluster(
) (_ *operatorservice.RemoveRemoteClusterResponse, retError error) {
defer log.CapturePanic(h.logger, &retError)
scope, startTime := h.startRequestProfile(metrics.OperatorRemoveRemoteClusterScope)
defer scope.Timer(metrics.ServiceLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { scope.Timer(metrics.ServiceLatency.GetMetricName()).Record(time.Since(startTime)) }()

if err := h.clusterMetadataManager.DeleteClusterMetadata(
ctx,
Expand All @@ -441,7 +441,7 @@ func (h *OperatorHandlerImpl) ListClusters(
) (_ *operatorservice.ListClustersResponse, retError error) {
defer log.CapturePanic(h.logger, &retError)
scope, startTime := h.startRequestProfile(metrics.OperatorListClustersScope)
defer scope.Timer(metrics.ServiceLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { scope.Timer(metrics.ServiceLatency.GetMetricName()).Record(time.Since(startTime)) }()

if request == nil {
return nil, errRequestNotSet
Expand Down
4 changes: 3 additions & 1 deletion service/frontend/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ func (vc *VersionChecker) performVersionCheck(
ctx context.Context,
) {
startTime := time.Now().UTC()
defer vc.metricsHandler.Timer(metrics.VersionCheckLatency.GetMetricName()).Record(time.Since(startTime))
defer func() {
vc.metricsHandler.Timer(metrics.VersionCheckLatency.GetMetricName()).Record(time.Since(startTime))
}()
metadata, err := vc.clusterMetadataManager.GetCurrentClusterMetadata(ctx)
if err != nil {
vc.metricsHandler.Counter(metrics.VersionCheckFailedCount.GetMetricName()).Record(1)
Expand Down
4 changes: 2 additions & 2 deletions service/history/api/queryworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func Invoke(
// If we get here it means query could not be dispatched through matching directly, so it must block
// until either an result has been obtained on a workflow task response or until it is safe to dispatch directly through matching.
startTime := time.Now().UTC()
defer scope.Timer(metrics.WorkflowTaskQueryLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { scope.Timer(metrics.WorkflowTaskQueryLatency.GetMetricName()).Record(time.Since(startTime)) }()

queryReg := mutableState.GetQueryRegistry()
if len(queryReg.GetBufferedIDs()) >= shard.GetConfig().MaxBufferedQueryCount() {
Expand Down Expand Up @@ -229,7 +229,7 @@ func queryDirectlyThroughMatching(
) (*historyservice.QueryWorkflowResponse, error) {

startTime := time.Now().UTC()
defer scope.Timer(metrics.DirectQueryDispatchLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { scope.Timer(metrics.DirectQueryDispatchLatency.GetMetricName()).Record(time.Since(startTime)) }()

if msResp.GetIsStickyTaskQueueEnabled() &&
len(msResp.GetStickyTaskQueue().GetName()) != 0 &&
Expand Down
8 changes: 4 additions & 4 deletions service/history/events/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (e *CacheImpl) GetEvent(ctx context.Context, key EventKey, firstEventID int
handler := e.metricsHandler.WithTags(metrics.OperationTag(metrics.EventsCacheGetEventScope))
handler.Counter(metrics.CacheRequests.GetMetricName()).Record(1)
startTime := time.Now().UTC()
defer handler.Timer(metrics.CacheLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { handler.Timer(metrics.CacheLatency.GetMetricName()).Record(time.Since(startTime)) }()

validKey := e.validateKey(key)

Expand Down Expand Up @@ -150,7 +150,7 @@ func (e *CacheImpl) PutEvent(key EventKey, event *historypb.HistoryEvent) {
handler := e.metricsHandler.WithTags(metrics.OperationTag(metrics.EventsCachePutEventScope))
handler.Counter(metrics.CacheRequests.GetMetricName()).Record(1)
startTime := time.Now().UTC()
defer handler.Timer(metrics.CacheLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { handler.Timer(metrics.CacheLatency.GetMetricName()).Record(time.Since(startTime)) }()

if !e.validateKey(key) {
return
Expand All @@ -162,7 +162,7 @@ func (e *CacheImpl) DeleteEvent(key EventKey) {
handler := e.metricsHandler.WithTags(metrics.OperationTag(metrics.EventsCacheDeleteEventScope))
handler.Counter(metrics.CacheRequests.GetMetricName()).Record(1)
startTime := time.Now().UTC()
defer handler.Timer(metrics.CacheLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { handler.Timer(metrics.CacheLatency.GetMetricName()).Record(time.Since(startTime)) }()

e.validateKey(key) // just for log message, delete anyway
e.Delete(key)
Expand All @@ -178,7 +178,7 @@ func (e *CacheImpl) getHistoryEventFromStore(
handler := e.metricsHandler.WithTags(metrics.OperationTag(metrics.EventsCacheGetFromStoreScope))
handler.Counter(metrics.CacheRequests.GetMetricName()).Record(1)
startTime := time.Now().UTC()
defer handler.Timer(metrics.CacheLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { handler.Timer(metrics.CacheLatency.GetMetricName()).Record(time.Since(startTime)) }()

response, err := e.eventsMgr.ReadHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
BranchToken: branchToken,
Expand Down
4 changes: 3 additions & 1 deletion service/history/events/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ func (notifier *NotifierImpl) dispatchHistoryEventNotification(event *Notificati
identifier := event.ID

startTime := time.Now().UTC()
defer notifier.metricsHandler.Timer(metrics.HistoryEventNotificationFanoutLatency.GetMetricName()).Record(time.Since(startTime))
defer func() {
notifier.metricsHandler.Timer(metrics.HistoryEventNotificationFanoutLatency.GetMetricName()).Record(time.Since(startTime))
}()
_, _, _ = notifier.eventsPubsubs.GetAndDo(identifier, func(key interface{}, value interface{}) error {
subscribers := value.(map[string]chan *Notification)

Expand Down
40 changes: 24 additions & 16 deletions service/history/replication/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,12 @@ func (e *taskExecutorImpl) handleActivityTask(
}

startTime := time.Now().UTC()
defer e.metricsHandler.Timer(metrics.ServiceLatency.GetMetricName()).Record(
time.Since(startTime),
metrics.OperationTag(metrics.SyncActivityTaskScope),
)
defer func() {
e.metricsHandler.Timer(metrics.ServiceLatency.GetMetricName()).Record(
time.Since(startTime),
metrics.OperationTag(metrics.SyncActivityTaskScope),
)
}()

request := &historyservice.SyncActivityRequest{
NamespaceId: attr.NamespaceId,
Expand Down Expand Up @@ -181,10 +183,12 @@ func (e *taskExecutorImpl) handleActivityTask(
metrics.OperationTag(metrics.HistoryRereplicationByActivityReplicationScope),
)
startTime := time.Now().UTC()
defer e.metricsHandler.Timer(metrics.ClientLatency.GetMetricName()).Record(
time.Since(startTime),
metrics.OperationTag(metrics.HistoryRereplicationByActivityReplicationScope),
)
defer func() {
e.metricsHandler.Timer(metrics.ClientLatency.GetMetricName()).Record(
time.Since(startTime),
metrics.OperationTag(metrics.HistoryRereplicationByActivityReplicationScope),
)
}()

resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory(
ctx,
Expand Down Expand Up @@ -227,10 +231,12 @@ func (e *taskExecutorImpl) handleHistoryReplicationTask(
}

startTime := time.Now().UTC()
defer e.metricsHandler.Timer(metrics.ServiceLatency.GetMetricName()).Record(
time.Since(startTime),
metrics.OperationTag(metrics.HistoryReplicationTaskScope),
)
defer func() {
e.metricsHandler.Timer(metrics.ServiceLatency.GetMetricName()).Record(
time.Since(startTime),
metrics.OperationTag(metrics.HistoryReplicationTaskScope),
)
}()

request := &historyservice.ReplicateEventsV2Request{
NamespaceId: attr.NamespaceId,
Expand All @@ -257,10 +263,12 @@ func (e *taskExecutorImpl) handleHistoryReplicationTask(
metrics.OperationTag(metrics.HistoryRereplicationByHistoryReplicationScope),
)
startTime := time.Now().UTC()
defer e.metricsHandler.Timer(metrics.ClientLatency.GetMetricName()).Record(
time.Since(startTime),
metrics.OperationTag(metrics.HistoryRereplicationByHistoryReplicationScope),
)
defer func() {
e.metricsHandler.Timer(metrics.ClientLatency.GetMetricName()).Record(
time.Since(startTime),
metrics.OperationTag(metrics.HistoryRereplicationByHistoryReplicationScope),
)
}()

resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory(
ctx,
Expand Down
4 changes: 2 additions & 2 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1538,7 +1538,7 @@ func (s *ContextImpl) wLock() {
handler := s.metricsHandler.WithTags(metrics.OperationTag(metrics.ShardInfoScope))
handler.Counter(metrics.LockRequests.GetMetricName()).Record(1)
startTime := time.Now().UTC()
defer handler.Timer(metrics.LockLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { handler.Timer(metrics.LockLatency.GetMetricName()).Record(time.Since(startTime)) }()

s.rwLock.Lock()
}
Expand All @@ -1547,7 +1547,7 @@ func (s *ContextImpl) rLock() {
handler := s.metricsHandler.WithTags(metrics.OperationTag(metrics.ShardInfoScope))
handler.Counter(metrics.LockRequests.GetMetricName()).Record(1)
startTime := time.Now().UTC()
defer handler.Timer(metrics.LockLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { handler.Timer(metrics.LockLatency.GetMetricName()).Record(time.Since(startTime)) }()

s.rwLock.RLock()
}
Expand Down
16 changes: 12 additions & 4 deletions service/history/shard/controller_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,18 @@ func (c *ControllerImpl) GetShardByID(
shardID int32,
) (Context, error) {
startTime := time.Now().UTC()
defer c.taggedMetricsHandler.Timer(metrics.GetEngineForShardLatency.GetMetricName()).Record(time.Since(startTime))
defer func() {
c.taggedMetricsHandler.Timer(metrics.GetEngineForShardLatency.GetMetricName()).Record(time.Since(startTime))
}()

return c.getOrCreateShardContext(shardID)
}

func (c *ControllerImpl) CloseShardByID(shardID int32) {
startTime := time.Now().UTC()
defer c.taggedMetricsHandler.Timer(metrics.RemoveEngineForShardLatency.GetMetricName()).Record(time.Since(startTime))
defer func() {
c.taggedMetricsHandler.Timer(metrics.RemoveEngineForShardLatency.GetMetricName()).Record(time.Since(startTime))
}()

shard, newNumShards := c.removeShard(shardID, nil)
// Stop the current shard, if it exists.
Expand All @@ -209,7 +213,9 @@ func (c *ControllerImpl) CloseShardByID(shardID int32) {

func (c *ControllerImpl) shardClosedCallback(shard *ContextImpl) {
startTime := time.Now().UTC()
defer c.taggedMetricsHandler.Timer(metrics.RemoveEngineForShardLatency.GetMetricName()).Record(time.Since(startTime))
defer func() {
c.taggedMetricsHandler.Timer(metrics.RemoveEngineForShardLatency.GetMetricName()).Record(time.Since(startTime))
}()

c.taggedMetricsHandler.Counter(metrics.ShardContextClosedCounter.GetMetricName()).Record(1)
_, newNumShards := c.removeShard(shard.shardID, shard)
Expand Down Expand Up @@ -350,7 +356,9 @@ func (c *ControllerImpl) shardManagementPump() {
func (c *ControllerImpl) acquireShards() {
c.taggedMetricsHandler.Counter(metrics.AcquireShardsCounter.GetMetricName()).Record(1)
startTime := time.Now().UTC()
defer c.taggedMetricsHandler.Timer(metrics.AcquireShardsLatency.GetMetricName()).Record(time.Since(startTime))
defer func() {
c.taggedMetricsHandler.Timer(metrics.AcquireShardsLatency.GetMetricName()).Record(time.Since(startTime))
}()

tryAcquire := func(shardID int32) {
info, err := c.historyServiceResolver.Lookup(convert.Int32ToString(shardID))
Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func (t *timerQueueStandbyTaskExecutor) fetchHistoryFromRemote(
scope := t.metricHandler.WithTags(metrics.OperationTag(metrics.HistoryRereplicationByTimerTaskScope))
scope.Counter(metrics.ClientRequests.GetMetricName()).Record(1)
startTime := time.Now()
defer scope.Timer(metrics.ClientLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { scope.Timer(metrics.ClientLatency.GetMetricName()).Record(time.Since(startTime)) }()

adminClient, err := t.shard.GetRemoteAdminClient(remoteClusterName)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion service/history/transferQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func (t *transferQueueStandbyTaskExecutor) fetchHistoryFromRemote(
scope := t.metricHandler.WithTags(metrics.OperationTag(metrics.HistoryRereplicationByTransferTaskScope))
scope.Counter(metrics.ClientRequests.GetMetricName()).Record(1)
startTime := time.Now().UTC()
defer scope.Timer(metrics.ClientLatency.GetMetricName()).Record(time.Since(startTime))
defer func() { scope.Timer(metrics.ClientLatency.GetMetricName()).Record(time.Since(startTime)) }()

adminClient, err := t.shard.GetRemoteAdminClient(remoteClusterName)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *CacheImpl) GetOrCreateWorkflowExecution(
handler := c.metricsHandler.WithTags(metrics.OperationTag(metrics.HistoryCacheGetOrCreateScope))
handler.Counter(metrics.CacheRequests.GetMetricName()).Record(1)
start := time.Now()
defer handler.Timer(metrics.CacheLatency.GetMetricName()).Record(time.Since(start))
defer func() { handler.Timer(metrics.CacheLatency.GetMetricName()).Record(time.Since(start)) }()

weCtx, weReleaseFunc, err := c.getOrCreateWorkflowExecutionInternal(
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ func (p *namespaceReplicationMessageProcessor) handleNamespaceReplicationTask(
) error {
p.metricsHandler.Counter(metrics.ReplicatorMessages.GetMetricName()).Record(1)
startTime := time.Now().UTC()
defer p.metricsHandler.Timer(metrics.ReplicatorLatency.GetMetricName()).Record(time.Since(startTime))
defer func() {
p.metricsHandler.Timer(metrics.ReplicatorLatency.GetMetricName()).Record(time.Since(startTime))
}()

return p.taskExecutor.Execute(ctx, task.GetNamespaceTaskAttributes())
}
Expand Down

0 comments on commit 2105645

Please sign in to comment.