Skip to content

Commit

Permalink
private,satellite: add chore to dq stray nodes
Browse files Browse the repository at this point in the history
Full scope:
private/testplanet,satellite/{overlay,satellitedb}

Description:
In most cases, downtime tracking with audits will eventually lead
to DQ for nodes who are unresponsive. However, if a stray node has no
pieces, it will not be audited and will thus never be disqualified.
This chore will check for nodes who have not successfully been contacted
in some set time and DQ them.

There are some new flags for toggling DQ of stray nodes and the timeframes
for running the chore and how long nodes can go without contact.

Change-Id: Ic9d41fdbf214736798925e728245180fb3c55615
  • Loading branch information
cam-a committed Jan 19, 2021
1 parent 2e34b63 commit 75d8282
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 7 deletions.
15 changes: 12 additions & 3 deletions private/testplanet/satellite.go
Expand Up @@ -56,6 +56,7 @@ import (
"storj.io/storj/satellite/nodestats"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/overlay/straynodes"
"storj.io/storj/satellite/payments/paymentsconfig"
"storj.io/storj/satellite/payments/stripecoinpayments"
"storj.io/storj/satellite/repair/checker"
Expand Down Expand Up @@ -92,9 +93,10 @@ type Satellite struct {
}

Overlay struct {
DB overlay.DB
Service *overlay.Service
Inspector *overlay.Inspector
DB overlay.DB
Service *overlay.Service
Inspector *overlay.Inspector
DQStrayNodes *straynodes.Chore
}

Metainfo struct {
Expand All @@ -120,6 +122,7 @@ type Satellite struct {
Repairer *repairer.Service
Inspector *irreparable.Inspector
}

Audit struct {
Queues *audit.Queues
Worker *audit.Worker
Expand Down Expand Up @@ -439,6 +442,11 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
OfflineThreshold: 0.6,
},
},
StrayNodes: straynodes.Config{
EnableDQ: true,
Interval: time.Minute,
MaxDurationWithoutContact: 30 * time.Second,
},
Metainfo: metainfo.Config{
DatabaseURL: "", // not used
MinRemoteSegmentSize: 0, // TODO: fix tests to work with 1024
Expand Down Expand Up @@ -696,6 +704,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.Overlay.DB = api.Overlay.DB
system.Overlay.Service = api.Overlay.Service
system.Overlay.Inspector = api.Overlay.Inspector
system.Overlay.DQStrayNodes = peer.Overlay.DQStrayNodes

system.Metainfo.Database = api.Metainfo.Database
system.Metainfo.Service = peer.Metainfo.Service
Expand Down
18 changes: 16 additions & 2 deletions satellite/core.go
Expand Up @@ -38,6 +38,7 @@ import (
"storj.io/storj/satellite/metrics"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/overlay/straynodes"
"storj.io/storj/satellite/payments"
"storj.io/storj/satellite/payments/stripecoinpayments"
"storj.io/storj/satellite/repair/checker"
Expand Down Expand Up @@ -73,8 +74,9 @@ type Core struct {
}

Overlay struct {
DB overlay.DB
Service *overlay.Service
DB overlay.DB
Service *overlay.Service
DQStrayNodes *straynodes.Chore
}

Metainfo struct {
Expand All @@ -92,6 +94,7 @@ type Core struct {
Repair struct {
Checker *checker.Checker
}

Audit struct {
Queues *audit.Queues
Worker *audit.Worker
Expand Down Expand Up @@ -229,6 +232,17 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
Name: "overlay",
Close: peer.Overlay.Service.Close,
})

if config.StrayNodes.EnableDQ {
peer.Overlay.DQStrayNodes = straynodes.NewChore(peer.Log.Named("overlay:dq-stray-nodes"), peer.Overlay.DB, config.StrayNodes)
peer.Services.Add(lifecycle.Item{
Name: "overlay:dq-stray-nodes",
Run: peer.Overlay.DQStrayNodes.Run,
Close: peer.Overlay.DQStrayNodes.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Overlay DQ Stray Nodes", peer.Overlay.DQStrayNodes.Loop))
}
}

{ // setup live accounting
Expand Down
2 changes: 2 additions & 0 deletions satellite/overlay/service.go
Expand Up @@ -88,6 +88,8 @@ type DB interface {

// DisqualifyNode disqualifies a storage node.
DisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error)
// DQNodesLastSeenBefore disqualifies all nodes where last_contact_success < cutoff.
DQNodesLastSeenBefore(ctx context.Context, cutoff time.Time) (err error)

// SuspendNodeUnknownAudit suspends a storage node for unknown audits.
SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
Expand Down
61 changes: 61 additions & 0 deletions satellite/overlay/straynodes/chore.go
@@ -0,0 +1,61 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.

package straynodes

import (
"context"
"time"

"github.com/spacemonkeygo/monkit/v3"
"go.uber.org/zap"

"storj.io/common/sync2"
"storj.io/storj/satellite/overlay"
)

var mon = monkit.Package()

// Config contains configurable values for stray nodes chore.
type Config struct {
EnableDQ bool `help:"whether nodes will be disqualified if they have not been contacted in some time" releaseDefault:"false" devDefault:"true"`
Interval time.Duration `help:"how often to check for and DQ stray nodes" releaseDefault:"168h" devDefault:"5m"`
MaxDurationWithoutContact time.Duration `help:"length of time a node can go without contacting satellite before being disqualified" releaseDefault:"720h" devDefault:"5m"`
}

// Chore disqualifies stray nodes.
type Chore struct {
log *zap.Logger
cache overlay.DB
maxDurationWithoutContact time.Duration
Loop *sync2.Cycle
}

// NewChore creates a new stray nodes Chore.
func NewChore(log *zap.Logger, cache overlay.DB, config Config) *Chore {
return &Chore{
log: log,
cache: cache,
maxDurationWithoutContact: config.MaxDurationWithoutContact,
Loop: sync2.NewCycle(config.Interval),
}
}

// Run runs the chore.
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)

return chore.Loop.Run(ctx, func(ctx context.Context) error {
err := chore.cache.DQNodesLastSeenBefore(ctx, time.Now().UTC().Add(-chore.maxDurationWithoutContact))
if err != nil {
chore.log.Error("error disqualifying stray nodes", zap.Error(err))
}
return nil
})
}

// Close closes chore.
func (chore *Chore) Close() error {
chore.Loop.Close()
return nil
}
68 changes: 68 additions & 0 deletions satellite/overlay/straynodes/chore_test.go
@@ -0,0 +1,68 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.

package straynodes_test

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"

"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/overlay"
)

func TestDQStrayNodes(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 2,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.StrayNodes.MaxDurationWithoutContact = 24 * time.Hour
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
strayNode := planet.StorageNodes[0]
liveNode := planet.StorageNodes[1]
sat := planet.Satellites[0]
strayNode.Contact.Chore.Pause(ctx)
sat.Overlay.DQStrayNodes.Loop.Pause()

cache := planet.Satellites[0].Overlay.DB

strayInfo, err := cache.Get(ctx, strayNode.ID())
require.NoError(t, err)
require.Nil(t, strayInfo.Disqualified)

checkInInfo := overlay.NodeCheckInInfo{
NodeID: strayNode.ID(),
IsUp: true,
Address: &pb.NodeAddress{
Address: "1.2.3.4",
},
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
}

// set strayNode last_contact_success to 48 hours ago
require.NoError(t, sat.Overlay.DB.UpdateCheckIn(ctx, checkInInfo, time.Now().Add(-48*time.Hour), sat.Config.Overlay.Node))

sat.Overlay.DQStrayNodes.Loop.TriggerWait()

strayInfo, err = cache.Get(ctx, strayNode.ID())
require.NoError(t, err)
require.NotNil(t, strayInfo.Disqualified)

liveInfo, err := cache.Get(ctx, liveNode.ID())
require.NoError(t, err)
require.Nil(t, liveInfo.Disqualified)
})
}
6 changes: 4 additions & 2 deletions satellite/peer.go
Expand Up @@ -36,6 +36,7 @@ import (
"storj.io/storj/satellite/nodeapiversion"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/overlay/straynodes"
"storj.io/storj/satellite/payments/paymentsconfig"
"storj.io/storj/satellite/payments/stripecoinpayments"
"storj.io/storj/satellite/referrals"
Expand Down Expand Up @@ -114,8 +115,9 @@ type Config struct {

Admin admin.Config

Contact contact.Config
Overlay overlay.Config
Contact contact.Config
Overlay overlay.Config
StrayNodes straynodes.Config

Metainfo metainfo.Config
Orders orders.Config
Expand Down
8 changes: 8 additions & 0 deletions satellite/satellitedb/overlaycache.go
Expand Up @@ -1565,6 +1565,14 @@ func (cache *overlaycache) populateUpdateFields(dbNode *dbx.Node, updateReq *ove
return updateFields
}

// DQNodesLastSeenBefore disqualifies all nodes where last_contact_success < cutoff.
func (cache *overlaycache) DQNodesLastSeenBefore(ctx context.Context, cutoff time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
q := `UPDATE nodes SET disqualified = current_timestamp WHERE last_contact_success < $1;`
_, err = cache.db.ExecContext(ctx, q, cutoff)
return err
}

// UpdateCheckIn updates a single storagenode with info from when the the node last checked in.
func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeCheckInInfo, timestamp time.Time, config overlay.NodeSelectionConfig) (err error) {
defer mon.Task()(&ctx)(&err)
Expand Down
21 changes: 21 additions & 0 deletions satellite/satellitedb/overlaycache_test.go
Expand Up @@ -15,6 +15,27 @@ import (
"storj.io/storj/satellite/overlay"
)

func TestDQNodesLastSeenBefore(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
node := planet.StorageNodes[0]
node.Contact.Chore.Pause(ctx)

cache := planet.Satellites[0].Overlay.DB

info, err := cache.Get(ctx, node.ID())
require.NoError(t, err)
require.Nil(t, info.Disqualified)

require.NoError(t, cache.DQNodesLastSeenBefore(ctx, time.Now()))

info, err = cache.Get(ctx, node.ID())
require.NoError(t, err)
require.NotNil(t, info.Disqualified)
})
}

func TestUpdateStats(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 2,
Expand Down
9 changes: 9 additions & 0 deletions scripts/testdata/satellite-config.yaml.lock
Expand Up @@ -652,6 +652,15 @@ server.private-address: 127.0.0.1:7778
# if true, uses peer ca whitelist checking
# server.use-peer-ca-whitelist: true

# whether nodes will be disqualified if they have not been contacted in some time
# stray-nodes.enable-dq: false

# how often to check for and DQ stray nodes
# stray-nodes.interval: 168h0m0s

# length of time a node can go without contacting satellite before being disqualified
# stray-nodes.max-duration-without-contact: 720h0m0s

# how frequently the tally service should run
# tally.interval: 1h0m0s

Expand Down

0 comments on commit 75d8282

Please sign in to comment.