Skip to content

Commit

Permalink
bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Jul 31, 2023
1 parent 358d2bb commit 81dcd5e
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions service/history/shard/context_impl.go
Expand Up @@ -247,6 +247,11 @@ func (s *ContextImpl) GetEngine(
func (s *ContextImpl) AssertOwnership(
ctx context.Context,
) error {
if err := s.ioSemaphore.Acquire(ctx, 1); err != nil {
return err
}
defer s.ioSemaphore.Release(1)

s.wLock()

if err := s.errorByState(); err != nil {
Expand Down Expand Up @@ -1150,6 +1155,11 @@ func (s *ContextImpl) updateShardInfo(
updateFnLocked func(),
) error {
s.wLock()
if err := s.errorByState(); err != nil {
s.wUnlock()
return err
}

updateFnLocked()
s.shardInfo.StolenSinceRenew = 0

Expand All @@ -1171,6 +1181,8 @@ func (s *ContextImpl) updateShardInfo(
}
s.wUnlock()

// TODO: check if release shard lock is safe here

if err := s.ioSemaphore.Acquire(s.lifecycleCtx, 1); err != nil {
return err
}
Expand All @@ -1184,7 +1196,7 @@ func (s *ContextImpl) updateShardInfo(
s.wLock()
defer s.wUnlock()
// revert lastUpdated time so that operation can be retried
s.lastUpdated = previousLastUpdate
s.lastUpdated = util.MaxTime(previousLastUpdate, s.lastUpdated)
return s.handleWriteErrorLocked(request.PreviousRangeID, err)
}

Expand All @@ -1194,9 +1206,7 @@ func (s *ContextImpl) updateShardInfo(
func (s *ContextImpl) emitShardInfoMetricsLogsLocked(
queueStates map[int32]*persistencespb.QueueState,
) {
if !s.config.EmitShardLagLog() {
return
}
emitShardLagLog := s.config.EmitShardLagLog()

metricsHandler := s.GetMetricsHandler().WithTags(metrics.OperationTag(metrics.ShardInfoScope))

Expand All @@ -1214,7 +1224,7 @@ Loop:
continue Loop
}
lag := s.taskKeyManager.getExclusiveReaderHighWatermark(category).TaskID - minTaskKey.TaskID
if lag > logWarnImmediateTaskLag {
if emitShardLagLog && lag > logWarnImmediateTaskLag {
s.contextTaggedLogger.Warn(
"Shard queue lag exceeds warn threshold.",
tag.ShardQueueAcks(category.Name(), minTaskKey.TaskID),
Expand All @@ -1231,7 +1241,7 @@ Loop:
continue Loop
}
lag := s.taskKeyManager.getExclusiveReaderHighWatermark(category).FireTime.Sub(minTaskKey.FireTime)
if lag > logWarnScheduledTaskLag {
if emitShardLagLog && lag > logWarnScheduledTaskLag {
s.contextTaggedLogger.Warn(
"Shard queue lag exceeds warn threshold.",
tag.ShardQueueAcks(category.Name(), minTaskKey.FireTime),
Expand Down Expand Up @@ -1938,7 +1948,7 @@ func newContext(

lifecycleCtx, lifecycleCancel := context.WithCancel(context.Background())

ioConcurrency := historyConfig.AcquireShardConcurrency()
ioConcurrency := historyConfig.ShardIOConcurrency()
if persistenceConfig.DataStores[persistenceConfig.DefaultStore].Cassandra != nil {
ioConcurrency = 1
}
Expand Down

0 comments on commit 81dcd5e

Please sign in to comment.