Skip to content

Commit

Permalink
Merge pull request #824 from DRK3/OnlyRunOneExpiryServicePerClusterAt…
Browse files Browse the repository at this point in the history
…ATime

feat: Only run one DB expiry check per cluster at a time
  • Loading branch information
sandrask authored Oct 21, 2021
2 parents 2addba7 + 50b3fbf commit 4aedfef
Show file tree
Hide file tree
Showing 8 changed files with 533 additions and 141 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ Flags:
-O, --mq-op-pool string The size of the operation queue subscriber pool. If 0 then a pool will not be created. Alternatively, this can be set with the following environment variable: MQ_OP_POOL
-q, --mq-url string The URL of the message broker. Alternatively, this can be set with the following environment variable: MQ_URL
-R, --nodeinfo-refresh-interval string The interval for refreshing NodeInfo data. For example, '30s' for a 30 second interval. Alternatively, this can be set with the following environment variable: NODEINFO_REFRESH_INTERVAL
--orb-id string An ID that uniquely identifies an Orb instance within a cluster. If not set, then it will default to a random UUID. It's recommended to set this to ensure best performance for the automatic expired data cleanup service.Alternatively, this can be set with the following environment variable: ORB_ID
--private-key string Private Key base64 (ED25519Type). Alternatively, this can be set with the following environment variable: ORB_PRIVATE_KEY
--replicate-local-cas-writes-in-ipfs string If enabled, writes to the local CAS will also be replicated in IPFS. This setting only takes effect if this server has both a local CAS and IPFS enabled. If the IPFS node is set to ipfs.io, then this setting will be disabled since ipfs.io does not support writes. Supported options: false, true. Defaults to false if not set. Alternatively, this can be set with the following environment variable: REPLICATE_LOCAL_CAS_WRITES_IN_IPFS (default "false")
--secret-lock-key-path string The path to the file with key to be used by local secret lock. If missing noop service lock is used. Alternatively, this can be set with the following environment variable: ORB_SECRET_LOCK_KEY_PATH
Expand Down
24 changes: 24 additions & 0 deletions cmd/orb-server/startcmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"
"time"

"github.com/google/uuid"
"github.com/spf13/cobra"
cmdutils "github.com/trustbloc/edge-core/pkg/utils/cmd"
"github.com/trustbloc/sidetree-core-go/pkg/api/operation"
Expand Down Expand Up @@ -368,6 +369,12 @@ const (
"For example, a setting of '1m' will cause the expiry service to run a check every 1 minute. " +
"Defaults to 1 minute if not set. " + commonEnvVarUsageText + dataExpiryCheckIntervalEnvKey

instanceIDFlagName = "orb-id"
instanceIDFlagUsage = "An ID that uniquely identifies an Orb instance within a cluster. " +
"If not set, then it will default to a random UUID. It's recommended to set this to ensure best performance " +
"for the automatic expired data cleanup service." + commonEnvVarUsageText + instanceIDEnvKey
instanceIDEnvKey = "ORB_ID"

// TODO: Add verification method

)
Expand Down Expand Up @@ -433,6 +440,7 @@ type orbParameters struct {
contextProviderURLs []string
unpublishedOperationLifespan time.Duration
dataExpiryCheckInterval time.Duration
instanceID string
}

type anchorCredentialParams struct {
Expand Down Expand Up @@ -829,6 +837,20 @@ func getOrbParameters(cmd *cobra.Command) (*orbParameters, error) {
return nil, fmt.Errorf("%s: %w", dataExpiryCheckIntervalFlagName, err)
}

instanceID, err := cmdutils.GetUserSetVarFromString(cmd, instanceIDFlagName, instanceIDEnvKey, true)
if err != nil {
return nil, err
}

if instanceID == "" {
instanceID := uuid.New().String()

logger.Warnf("Orb ID not set. A randomly generated UUID (%s) was generated and will be used. "+
"It's recommended to set this value yourself. If this Orb instance restarts, a new UUID will be "+
"generated which won't break anything but is less efficient for the automatic expired data cleanup "+
"service. A custom ID will also make for better logging.", instanceID)
}

return &orbParameters{
hostURL: hostURL,
hostMetricsURL: hostMetricsURL,
Expand Down Expand Up @@ -880,6 +902,7 @@ func getOrbParameters(cmd *cobra.Command) (*orbParameters, error) {
contextProviderURLs: contextProviderURLs,
unpublishedOperationLifespan: unpublishedOperationLifespan,
dataExpiryCheckInterval: dataExpiryCheckInterval,
instanceID: instanceID,
}, nil
}

Expand Down Expand Up @@ -1221,4 +1244,5 @@ func createFlags(startCmd *cobra.Command) {
startCmd.Flags().StringP(databaseTimeoutFlagName, "", "", databaseTimeoutFlagUsage)
startCmd.Flags().StringP(unpublishedOperationLifespanFlagName, "", "", unpublishedOperationLifespanFlagUsage)
startCmd.Flags().StringP(dataExpiryCheckIntervalFlagName, "", "", dataExpiryCheckIntervalFlagUsage)
startCmd.Flags().StringP(instanceIDFlagName, "", "", instanceIDFlagUsage)
}
2 changes: 1 addition & 1 deletion cmd/orb-server/startcmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func startOrbServices(parameters *orbParameters) error {
if parameters.updateDocumentStoreEnabled {
// TODO (#810): Make it possible to run the expiry service from only one instance within a cluster (or as
// a separate server)
expiryService = expiry.NewService(parameters.dataExpiryCheckInterval)
expiryService = expiry.NewService(parameters.dataExpiryCheckInterval, configStore, parameters.instanceID)

updateDocumentStore, err = unpublishedopstore.New(storeProviders.provider,
parameters.unpublishedOperationLifespan, expiryService)
Expand Down
5 changes: 4 additions & 1 deletion pkg/protocolversion/versions/v1_0/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ func TestFactory_Create(t *testing.T) {
})

t.Run("success - with update store config", func(t *testing.T) {
coordinationStore, err := mem.NewProvider().OpenStore("coordination")
require.NoError(t, err)

updateDocumentStore, err := unpublishedopstore.New(storeProvider, time.Minute,
expiry.NewService(time.Millisecond))
expiry.NewService(time.Millisecond, coordinationStore, "InstanceID"))
require.NoError(t, err)

cfg := &config.Sidetree{
Expand Down
171 changes: 150 additions & 21 deletions pkg/store/expiry/expiry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
package expiry

import (
"encoding/json"
"errors"
"fmt"
"time"

Expand All @@ -16,11 +18,19 @@ import (
"github.com/trustbloc/orb/pkg/lifecycle"
)

const loggerModule = "expiry-service"
const (
loggerModule = "expiry-service"
coordinationPermitKey = "expired_data_cleanup_permit"
// When the Orb server with the expired data cleanup duty (permit holder) has not done it for an unusually
// long time (indicating it's down), another Orb server will take over the duty. This value multiplied by the
// configured interval time defines what an "unusually long time" is.
permitTimeLimitIntervalMultiplier = 3
)

type logger interface {
Debugf(msg string, args ...interface{})
Infof(msg string, args ...interface{})
Warnf(msg string, args ...interface{})
Errorf(msg string, args ...interface{})
}

Expand All @@ -30,28 +40,51 @@ type registeredStore struct {
name string
}

// expiredDataCleanupPermit is used as an entry within the coordination store to ensure that only one Orb instance
// within a cluster has the duty of performing periodic expired data clean up.
type expiredDataCleanupPermit struct {
// CurrentHolder indicates which Orb server currently has the responsibility.
CurrentHolder string `json:"currentHolder,omitempty"`
// TimeLastCleanupPerformed indicates when the last cleanup was successfully performed.
TimeLastCleanupPerformed int64 `json:"timeCleanupLastPerformed,omitempty"` // This is a Unix timestamp.
}

// Service is an expiry service that periodically polls registered stores and removes data past a specified
// expiration time.
// expiration time.
type Service struct {
*lifecycle.Lifecycle

done chan struct{}
logger logger
registeredStores []registeredStore
interval time.Duration
done chan struct{}
logger logger
registeredStores []registeredStore
interval time.Duration
coordinationStore storage.Store
instanceID string
}

// NewService returns a new expiry Service.
// interval is how frequently this service will check for (and delete as needed) expired data. Shorter intervals will
// remove expired data sooner at the expense of increased resource usage.
// remove expired data sooner at the expense of increased resource usage. Each Orb instance within a cluster should
// have the same interval configured in order for this service to work efficiently.
// coordinationStore is used for ensuring that only one Orb instance within a cluster has the duty of performing
// expired data cleanup (in order to avoid every instance doing the same work, which is wasteful). Every Orb instance
// within the cluster needs to be connected to the same database for it to work correctly. Note that when initializing
// Orb servers (or if the Orb server with the duty goes down) it is possible for multiple Orb instances to briefly
// assign themselves the duty, but only for one round. This will automatically be resolved on
// the next check and only one will end up with the duty from that point on. This situation is not of concern since
// it's safe for two instances to perform the check at the same time.
// instanceID is used in the coordinationStore for determining who currently has the duty of doing the expired data
// cleanup. It must be unique for every Orb instance within the cluster in order for this service to work efficiently.
// You must register each store you want this service to run on using the Register method. Once all your stores are
// registered, call the Start method to start the service.
func NewService(interval time.Duration) *Service {
func NewService(interval time.Duration, coordinationStore storage.Store, instanceID string) *Service {
s := &Service{
done: make(chan struct{}),
logger: log.New(loggerModule),
registeredStores: make([]registeredStore, 0),
interval: interval,
done: make(chan struct{}),
logger: log.New(loggerModule),
registeredStores: make([]registeredStore, 0),
interval: interval,
coordinationStore: coordinationStore,
instanceID: instanceID,
}

s.Lifecycle = lifecycle.New("expiry",
Expand All @@ -62,7 +95,7 @@ func NewService(interval time.Duration) *Service {
}

// Register adds a store to this expiry service.
// store is the store on which to check for expired data.
// store is the store on which to periodically cleanup expired data.
// name is used to identify the purpose of this expiry service for logging purposes.
// expiryTagName is the tag name used to store expiry values under. The expiry values must be standard Unix timestamps.
func (s *Service) Register(store storage.Store, expiryTagName, storeName string) {
Expand Down Expand Up @@ -91,8 +124,7 @@ func (s *Service) refresh() {
for {
select {
case <-time.After(s.interval):
s.logger.Debugf("Checking for expired data...")
s.deleteExpiredData()
s.runExpiryCheck()
case <-s.done:
s.logger.Debugf("Stopping expiry service.")

Expand All @@ -101,20 +133,117 @@ func (s *Service) refresh() {
}
}

func (s *Service) runExpiryCheck() {
s.logger.Debugf("Checking to see if it's my duty to clean up expired data.")

if s.isMyDutyToCheckForExpiredData() {
s.deleteExpiredData()

err := s.updatePermit()
if err != nil {
s.logger.Errorf("Failed to update permit: %s", err.Error())
}
}
}

func (s *Service) isMyDutyToCheckForExpiredData() bool {
currentExpiryCheckPermitBytes, err := s.coordinationStore.Get(coordinationPermitKey)
if err != nil {
if errors.Is(err, storage.ErrDataNotFound) {
s.logger.Infof("No existing permit found. " +
"I will take on the duty of periodically deleting expired data.")

return true
}

s.logger.Errorf("Unexpected failure while getting the permit: %s", err.Error())

return false
}

var currentPermit expiredDataCleanupPermit

err = json.Unmarshal(currentExpiryCheckPermitBytes, &currentPermit)
if err != nil {
s.logger.Errorf("Failed to unmarshal the current permit: %s", err.Error())

return false
}

timeOfLastCleanup := time.Unix(currentPermit.TimeLastCleanupPerformed, 0)

// Time.Since uses Time.Now() to determine the current time to a fine degree of precision. Here we are checking the
// time since a specific Unix timestamp, which is a value that is effectively truncated to the nearest second.
// Thus, the result of this calculation should also be truncated down to the nearest second since that's all the
// precision we have. This also makes the log statements look cleaner since it won't display an excessive amount
// of (meaningless) precision.
timeSinceLastCleanup := time.Since(timeOfLastCleanup).Truncate(time.Second)

if currentPermit.CurrentHolder == s.instanceID {
s.logger.Debugf("It's currently my duty to clean up expired data. I last did this %s ago. I will "+
"perform another cleanup and then update the permit timestamp.", timeSinceLastCleanup.String())

return true
}

// The idea here is to only take away the data cleanup responsibilities from the current permit holder if it's
// been an unusually long time since the current permit holder has performed a successful cleanup. If that happens
// then it indicates that the other Orb server with the permit is down, so someone else needs to grab the permit
// and take over the duty of doing expired data checks. Note that the assumption here is that all Orb servers
// within the cluster have the same interval setting (which they should).
timeLimit := s.interval * permitTimeLimitIntervalMultiplier

if timeSinceLastCleanup > timeLimit {
s.logger.Warnf("The current permit holder (%s) has not performed an expired data cleanup in an "+
"unusually long time (%s ago, over %d times longer than the configured interval of %s). This indicates a "+
"problem with %s - it may be down or not responding. I will take over the expired data "+
"cleanup duty and grab the permit.", currentPermit.CurrentHolder, timeSinceLastCleanup.String(),
permitTimeLimitIntervalMultiplier, s.interval.String(), currentPermit.CurrentHolder)

return true
}

s.logger.Debugf("I will not do an expired data cleanup since %s currently has the duty and did it recently "+
"(%s ago).", currentPermit.CurrentHolder, timeSinceLastCleanup.String())

return false
}

func (s *Service) deleteExpiredData() {
for _, registeredStore := range s.registeredStores {
registeredStore.deleteExpiredData(s.logger)
}
}

func (r *registeredStore) deleteExpiredData(logger logger) {
queryExpression := fmt.Sprintf("%s<=%d", r.expiryTagName, time.Now().Unix())
func (s *Service) updatePermit() error {
s.logger.Debugf("Updating the permit with the current time.")

permit := expiredDataCleanupPermit{
CurrentHolder: s.instanceID,
TimeLastCleanupPerformed: time.Now().Unix(),
}

permitBytes, err := json.Marshal(permit)
if err != nil {
return fmt.Errorf("failed to marshal permit: %w", err)
}

logger.Debugf("About to run the following query in %s: %s", r.name, queryExpression)
err = s.coordinationStore.Put(coordinationPermitKey, permitBytes)
if err != nil {
return fmt.Errorf("failed to store permit: %w", err)
}

s.logger.Debugf("Permit successfully updated with the current time.")

return nil
}

func (r *registeredStore) deleteExpiredData(logger logger) {
logger.Debugf("Checking for expired data in %s.", r.name)

iterator, err := r.store.Query(queryExpression)
iterator, err := r.store.Query(fmt.Sprintf("%s<=%d", r.expiryTagName, time.Now().Unix()))
if err != nil {
logger.Errorf("failed to query store: %s", err.Error())
logger.Errorf("failed to query store for expired data: %s", err.Error())

return
}
Expand Down Expand Up @@ -164,6 +293,6 @@ func (r *registeredStore) deleteExpiredData(logger logger) {
return
}

logger.Debugf("Successfully deleted %d pieces of expired data", len(operations))
logger.Debugf("Successfully deleted %d pieces of expired data.", len(operations))
}
}
Loading

0 comments on commit 4aedfef

Please sign in to comment.