Skip to content

Commit

Permalink
wait for drain duration before stopping shard controller
Browse files Browse the repository at this point in the history
  • Loading branch information
alfred-landrum committed Jul 6, 2023
1 parent 5dc16b7 commit e9eef52
Showing 1 changed file with 7 additions and 36 deletions.
43 changes: 7 additions & 36 deletions service/history/service.go
Expand Up @@ -42,7 +42,6 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence/client"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/history/configs"
)

Expand Down Expand Up @@ -138,37 +137,19 @@ func (s *Service) Stop() {
return
}

// initiate graceful shutdown :
// 1. remove self from the membership ring
// 2. wait for other members to discover we are going down
// 3. stop acquiring new shards (periodically or based on other membership changes)
// 4. wait for shard ownership to transfer (and inflight requests to drain) while still accepting new requests
// 5. Reject all requests arriving at rpc handler to avoid taking on more work except for RespondXXXCompleted and
// RecordXXStarted APIs - for these APIs, most of the work is already one and rejecting at last stage is
// probably not that desirable. If the shard is closed, these requests will fail anyways.
// 6. wait for grace period
// 7. force stop the whole world and return

const gossipPropagationDelay = 400 * time.Millisecond
const shardOwnershipTransferDelay = 5 * time.Second
const gracePeriod = 2 * time.Second

remainingTime := s.config.ShutdownDrainDuration()

logger.Info("ShutdownHandler: Evicting self from membership ring")
_ = s.membershipMonitor.EvictSelf()
s.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_NOT_SERVING)

logger.Info("ShutdownHandler: Waiting for others to discover I am unhealthy")
remainingTime = s.sleep(gossipPropagationDelay, remainingTime)
if delay := s.config.ShutdownDrainDuration(); delay > 0 {
s.logger.Info("ShutdownHandler: delaying for shutdown drain",
tag.NewDurationTag("shutdownDrainDuration", delay))
time.Sleep(s.config.ShutdownDrainDuration())
}

s.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_NOT_SERVING)

logger.Info("ShutdownHandler: Initiating shardController shutdown")
s.handler.controller.Stop()
logger.Info("ShutdownHandler: Waiting for traffic to drain")
remainingTime = s.sleep(shardOwnershipTransferDelay, remainingTime)

logger.Info("ShutdownHandler: No longer taking rpc requests")
_ = s.sleep(gracePeriod, remainingTime)

// TODO: Change this to GracefulStop when integration tests are refactored.
s.server.Stop()
Expand All @@ -179,16 +160,6 @@ func (s *Service) Stop() {
logger.Info("history stopped")
}

// sleep sleeps for the minimum of desired and available duration
// returns the remaining available time duration
func (s *Service) sleep(desired time.Duration, available time.Duration) time.Duration {
d := util.Min(desired, available)
if d > 0 {
time.Sleep(d)
}
return available - d
}

func (s *Service) GetFaultInjection() *client.FaultInjectionDataStoreFactory {
return s.faultInjectionDataStoreFactory
}

0 comments on commit e9eef52

Please sign in to comment.