Skip to content

Commit

Permalink
cmd/tools/segment-verify: don't cache offline status forever
Browse files Browse the repository at this point in the history
Because it was originally intended to work on only a few pieces from
each segment at a time, and would frequently have reset its list of
online nodes, segment-verify has been taking nodes out of its
onlineNodes set and never putting them back. This means that over a long
run in Check=0 mode, we end up treating more and more nodes as offline,
permanently. This trend obfuscates the number of missing pieces that
each segment really has, because we don't check pieces on offline nodes.

This commit changes the onlineNodes set to an "offlineNodes" set, with
an expiration time on the offline-ness quality. So nodes are added to
the offlineNodes set when we see they are offline, and then we only
treat them as offline for the next 30 minutes (configurable). After that
point, we will try connecting to them again.

Change-Id: I14f0332de25cdc6ef655f923739bcb4df71e079e
  • Loading branch information
thepaul authored and ethanadams committed Jan 3, 2023
1 parent 5362dff commit 2feb49a
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 12 deletions.
4 changes: 2 additions & 2 deletions cmd/tools/segment-verify/batch.go
Expand Up @@ -118,14 +118,14 @@ func (service *Service) CreateBatches(ctx context.Context, segments []*Segment)
// selectOnlinePieces modifies slice such that it only contains online pieces.
func (service *Service) selectOnlinePieces(segment *Segment) {
for i, x := range segment.AliasPieces {
if service.onlineNodes.Contains(x.Alias) {
if !service.offlineNodes.Contains(x.Alias) && !service.ignoreNodes.Contains(x.Alias) {
continue
}

// found an offline node, start removing
rs := segment.AliasPieces[:i]
for _, x := range segment.AliasPieces[i+1:] {
if service.onlineNodes.Contains(x.Alias) {
if !service.offlineNodes.Contains(x.Alias) && !service.ignoreNodes.Contains(x.Alias) {
rs = append(rs, x)
}
}
Expand Down
49 changes: 48 additions & 1 deletion cmd/tools/segment-verify/nodealias.go
Expand Up @@ -3,7 +3,11 @@

package main

import "storj.io/storj/satellite/metabase"
import (
"time"

"storj.io/storj/satellite/metabase"
)

// NodeAliasSet is a set containing node aliases.
type NodeAliasSet map[metabase.NodeAlias]struct{}
Expand All @@ -30,3 +34,46 @@ func (set NodeAliasSet) RemoveAll(xs NodeAliasSet) {
delete(set, x)
}
}

type nodeAliasExpiringSet struct {
nowFunc func() time.Time
aliasesAndExpiryTimes map[metabase.NodeAlias]time.Time
timeToExpire time.Duration
}

func newNodeAliasExpiringSet(timeToExpire time.Duration) *nodeAliasExpiringSet {
return &nodeAliasExpiringSet{
nowFunc: time.Now,
aliasesAndExpiryTimes: make(map[metabase.NodeAlias]time.Time),
timeToExpire: timeToExpire,
}
}

// Contains checks whether v was added to the set since the last timeToExpire.
func (expiringSet nodeAliasExpiringSet) Contains(v metabase.NodeAlias) bool {
expiry, ok := expiringSet.aliasesAndExpiryTimes[v]
if ok {
if expiringSet.nowFunc().Before(expiry) {
return true
}
delete(expiringSet.aliasesAndExpiryTimes, v)
}
return false
}

// Add adds v to the set.
func (expiringSet nodeAliasExpiringSet) Add(v metabase.NodeAlias) {
expiringSet.aliasesAndExpiryTimes[v] = expiringSet.nowFunc().Add(expiringSet.timeToExpire)
}

// Remove removes v from the set.
func (expiringSet nodeAliasExpiringSet) Remove(v metabase.NodeAlias) {
delete(expiringSet.aliasesAndExpiryTimes, v)
}

// AddAll adds all xs to the set.
func (expiringSet nodeAliasExpiringSet) AddAll(xs NodeAliasSet) {
for x := range xs {
expiringSet.Add(x)
}
}
119 changes: 119 additions & 0 deletions cmd/tools/segment-verify/nodealias_test.go
@@ -0,0 +1,119 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.

package main

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"storj.io/common/testrand"
"storj.io/storj/satellite/metabase"
)

func randomNodeAlias() metabase.NodeAlias {
return metabase.NodeAlias(testrand.Intn(65536))
}

func TestNodeAliasExpiringSetAddAndRemove(t *testing.T) {
// choose 3 random and unique aliases
alias1 := randomNodeAlias()
alias2 := alias1
for alias2 == alias1 {
alias2 = randomNodeAlias()
}
alias3 := alias1
for alias3 == alias1 || alias3 == alias2 {
alias3 = randomNodeAlias()
}

// add them to a nodeAliasExpiringSet one at a time

set := newNodeAliasExpiringSet(24 * time.Hour)
assert.False(t, set.Contains(alias1))
assert.False(t, set.Contains(alias2))
assert.False(t, set.Contains(alias3))

set.Add(alias1)
assert.True(t, set.Contains(alias1))
assert.False(t, set.Contains(alias2))
assert.False(t, set.Contains(alias3))

set.Add(alias2)
assert.True(t, set.Contains(alias1))
assert.True(t, set.Contains(alias2))
assert.False(t, set.Contains(alias3))

set.Add(alias3)
assert.True(t, set.Contains(alias1))
assert.True(t, set.Contains(alias2))
assert.True(t, set.Contains(alias3))

// then remove one at a time

set.Remove(alias2)
assert.True(t, set.Contains(alias1))
assert.False(t, set.Contains(alias2))
assert.True(t, set.Contains(alias3))

set.Remove(alias2) // again; should have no effect this time
assert.True(t, set.Contains(alias1))
assert.False(t, set.Contains(alias2))
assert.True(t, set.Contains(alias3))

set.Remove(alias1)
assert.False(t, set.Contains(alias1))
assert.False(t, set.Contains(alias2))
assert.True(t, set.Contains(alias3))

set.Remove(alias3)
assert.False(t, set.Contains(alias1))
assert.False(t, set.Contains(alias2))
assert.False(t, set.Contains(alias3))
}

type dummyTime struct {
time.Time
}

func (dt *dummyTime) Elapse(d time.Duration) {
dt.Time = dt.Time.Add(d)
}

func (dt *dummyTime) Get() time.Time {
return dt.Time
}

func TestNodeAliasExpiringSetExpiration(t *testing.T) {
mockTime := dummyTime{time.Now()}
set := newNodeAliasExpiringSet(time.Minute)
set.nowFunc = mockTime.Get

alias1 := randomNodeAlias()
alias2 := alias1
for alias2 == alias1 {
alias2 = randomNodeAlias()
}

set.Add(alias1)
assert.True(t, set.Contains(alias1))
assert.False(t, set.Contains(alias2))

mockTime.Elapse(30 * time.Second)
assert.True(t, set.Contains(alias1))
assert.False(t, set.Contains(alias2))

set.Add(alias2)
assert.True(t, set.Contains(alias1))
assert.True(t, set.Contains(alias2))

mockTime.Elapse(31 * time.Second)
assert.False(t, set.Contains(alias1))
assert.True(t, set.Contains(alias2))

mockTime.Elapse(30 * time.Second)
assert.False(t, set.Contains(alias1))
assert.False(t, set.Contains(alias2))
}
4 changes: 2 additions & 2 deletions cmd/tools/segment-verify/process.go
Expand Up @@ -96,11 +96,11 @@ func (service *Service) VerifyBatches(ctx context.Context, batches []*Batch) err
if ErrNodeOffline.Has(err) {
mu.Lock()
if verifiedCount == 0 {
service.onlineNodes.Remove(batch.Alias)
service.offlineNodes.Add(batch.Alias)
} else {
service.offlineCount[batch.Alias]++
if service.config.MaxOffline > 0 && service.offlineCount[batch.Alias] >= service.config.MaxOffline {
service.onlineNodes.Remove(batch.Alias)
service.offlineNodes.Add(batch.Alias)
}
}
mu.Unlock()
Expand Down
16 changes: 9 additions & 7 deletions cmd/tools/segment-verify/service.go
Expand Up @@ -66,6 +66,8 @@ type ServiceConfig struct {
Concurrency int `help:"number of concurrent verifiers" default:"1000"`
MaxOffline int `help:"maximum number of offline in a sequence (if 0, no limit)" default:"2"`

OfflineStatusCacheTime time.Duration `help:"how long to cache a \"node offline\" status" default:"30m"`

AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`
}

Expand All @@ -91,7 +93,8 @@ type Service struct {
aliasMap *metabase.NodeAliasMap
aliasToNodeURL map[metabase.NodeAlias]storj.NodeURL
priorityNodes NodeAliasSet
onlineNodes NodeAliasSet
ignoreNodes NodeAliasSet
offlineNodes *nodeAliasExpiringSet
offlineCount map[metabase.NodeAlias]int
bucketList BucketList
nodesVersionMap map[metabase.NodeAlias]string
Expand Down Expand Up @@ -132,7 +135,8 @@ func NewService(log *zap.Logger, metabaseDB Metabase, verifier Verifier, overlay

aliasToNodeURL: map[metabase.NodeAlias]storj.NodeURL{},
priorityNodes: NodeAliasSet{},
onlineNodes: NodeAliasSet{},
ignoreNodes: NodeAliasSet{},
offlineNodes: newNodeAliasExpiringSet(config.OfflineStatusCacheTime),
offlineCount: map[metabase.NodeAlias]int{},
nodesVersionMap: map[metabase.NodeAlias]string{},

Expand Down Expand Up @@ -177,7 +181,6 @@ func (service *Service) loadOnlineNodes(ctx context.Context) (err error) {
ID: node.ID,
Address: addr,
}
service.onlineNodes.Add(alias)
}

return nil
Expand All @@ -193,19 +196,18 @@ func (service *Service) loadPriorityNodes(ctx context.Context) (err error) {
return Error.Wrap(err)
}

// applyIgnoreNodes loads the list of nodes to ignore completely and modifies priority and online nodes.
// applyIgnoreNodes loads the list of nodes to ignore completely and modifies priority nodes.
func (service *Service) applyIgnoreNodes(ctx context.Context) (err error) {
if service.config.IgnoreNodesPath == "" {
return nil
}

ignoreNodes, err := service.parseNodeFile(service.config.IgnoreNodesPath)
service.ignoreNodes, err = service.parseNodeFile(service.config.IgnoreNodesPath)
if err != nil {
return Error.Wrap(err)
}

service.onlineNodes.RemoveAll(ignoreNodes)
service.priorityNodes.RemoveAll(ignoreNodes)
service.priorityNodes.RemoveAll(service.ignoreNodes)

return nil
}
Expand Down

0 comments on commit 2feb49a

Please sign in to comment.