Skip to content

Commit

Permalink
storagenode/trust: ensure trust pool updates satellite status on Refresh
Browse files Browse the repository at this point in the history
Fixes #6261

Change-Id: Ic01ce423156058dd4676fb073c0de3d768991d0e
  • Loading branch information
profclems authored and Storj Robot committed Oct 3, 2023
1 parent 8a1bedd commit a2c162d
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 151 deletions.
2 changes: 1 addition & 1 deletion cmd/storagenode/cmd_forget_satellite_test.go
Expand Up @@ -118,7 +118,7 @@ func Test_cmdForgetSatellite(t *testing.T) {
// test that the satellite was inserted correctly
satellite, err := db.Satellites().GetSatellite(ctx, satelliteID)
require.NoError(t, err)
require.Equal(t, satellites.Untrusted, satellites.Status(satellite.Status))
require.Equal(t, satellites.Untrusted, satellite.Status)

// set up the identity
ident := planet.StorageNodes[0].Identity
Expand Down
247 changes: 124 additions & 123 deletions storagenode/preflight/localtime_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"storj.io/storj/private/testplanet"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/preflight"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
"storj.io/storj/storagenode/trust"
)

Expand All @@ -48,138 +49,138 @@ func TestLocalTime_InSync(t *testing.T) {
}

func TestLocalTime_OutOfSync(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()

log := zaptest.NewLogger(t)

// set up mock satellite server configuration
mockSatID, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
config := server.Config{
Address: "127.0.0.1:0",
PrivateAddress: "127.0.0.1:0",

Config: tlsopts.Config{
PeerIDVersions: "*",
Extensions: extensions.Config{
Revocation: false,
WhitelistSignedLeaf: false,
},
},
}
mockSatTLSOptions, err := tlsopts.NewOptions(mockSatID, config.Config, nil)
require.NoError(t, err)
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {

t.Run("Less than 30m", func(t *testing.T) {
// register mock GetTime endpoint to mock server
var group errgroup.Group
defer ctx.Check(group.Wait)
log := zaptest.NewLogger(t)

contactServer, err := server.New(log, mockSatTLSOptions, config)
require.NoError(t, err)
defer ctx.Check(contactServer.Close)

err = pb.DRPCRegisterNode(contactServer.DRPC(), &mockServer{
localTime: time.Now().Add(-25 * time.Minute),
})
// set up mock satellite server configuration
mockSatID, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
config := server.Config{
Address: "127.0.0.1:0",
PrivateAddress: "127.0.0.1:0",

group.Go(func() error {
return contactServer.Run(ctx)
})

// get mock server address
_, portStr, err := net.SplitHostPort(contactServer.Addr().String())
require.NoError(t, err)
port, err := strconv.Atoi(portStr)
require.NoError(t, err)
url := trust.SatelliteURL{
ID: mockSatID.ID,
Host: "127.0.0.1",
Port: port,
Config: tlsopts.Config{
PeerIDVersions: "*",
Extensions: extensions.Config{
Revocation: false,
WhitelistSignedLeaf: false,
},
},
}
require.NoError(t, err)

// set up storagenode client
source, err := trust.NewStaticURLSource(url.String())
require.NoError(t, err)

identity, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
tlsOptions, err := tlsopts.NewOptions(identity, config.Config, nil)
require.NoError(t, err)
dialer := rpc.NewDefaultDialer(tlsOptions)
pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{
Sources: []trust.Source{source},
CachePath: ctx.File("trust-cache.json"),
}, nil)
require.NoError(t, err)
err = pool.Refresh(ctx)
require.NoError(t, err)

// should not return any error when node's clock is off no more than 30m
localtime := preflight.NewLocalTime(log, preflight.Config{
LocalTimeCheck: true,
}, pool, dialer)
err = localtime.Check(ctx)
require.NoError(t, err)
mockSatTLSOptions, err := tlsopts.NewOptions(mockSatID, config.Config, nil)
require.NoError(t, err)

t.Run("Less than 30m", func(t *testing.T) {
// register mock GetTime endpoint to mock server
var group errgroup.Group
defer ctx.Check(group.Wait)

contactServer, err := server.New(log, mockSatTLSOptions, config)
require.NoError(t, err)
defer ctx.Check(contactServer.Close)

err = pb.DRPCRegisterNode(contactServer.DRPC(), &mockServer{
localTime: time.Now().Add(-25 * time.Minute),
})
require.NoError(t, err)

group.Go(func() error {
return contactServer.Run(ctx)
})

// get mock server address
_, portStr, err := net.SplitHostPort(contactServer.Addr().String())
require.NoError(t, err)
port, err := strconv.Atoi(portStr)
require.NoError(t, err)
url := trust.SatelliteURL{
ID: mockSatID.ID,
Host: "127.0.0.1",
Port: port,
}
require.NoError(t, err)

// set up storagenode client
source, err := trust.NewStaticURLSource(url.String())
require.NoError(t, err)

identity, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
tlsOptions, err := tlsopts.NewOptions(identity, config.Config, nil)
require.NoError(t, err)
dialer := rpc.NewDefaultDialer(tlsOptions)
pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{
Sources: []trust.Source{source},
CachePath: ctx.File("trust-cache.json"),
}, db.Satellites())
require.NoError(t, err)
err = pool.Refresh(ctx)
require.NoError(t, err)

// should not return any error when node's clock is off no more than 30m
localtime := preflight.NewLocalTime(log, preflight.Config{
LocalTimeCheck: true,
}, pool, dialer)
err = localtime.Check(ctx)
require.NoError(t, err)

})

t.Run("More than 30m", func(t *testing.T) {
// register mock GetTime endpoint to mock server
var group errgroup.Group
defer ctx.Check(group.Wait)

contactServer, err := server.New(log, mockSatTLSOptions, config)
require.NoError(t, err)
defer ctx.Check(contactServer.Close)

err = pb.DRPCRegisterNode(contactServer.DRPC(), &mockServer{
localTime: time.Now().Add(-31 * time.Minute),
})
require.NoError(t, err)

group.Go(func() error {
return contactServer.Run(ctx)
t.Run("More than 30m", func(t *testing.T) {
// register mock GetTime endpoint to mock server
var group errgroup.Group
defer ctx.Check(group.Wait)

contactServer, err := server.New(log, mockSatTLSOptions, config)
require.NoError(t, err)
defer ctx.Check(contactServer.Close)

err = pb.DRPCRegisterNode(contactServer.DRPC(), &mockServer{
localTime: time.Now().Add(-31 * time.Minute),
})
require.NoError(t, err)

group.Go(func() error {
return contactServer.Run(ctx)
})

// get mock server address
_, portStr, err := net.SplitHostPort(contactServer.Addr().String())
require.NoError(t, err)
port, err := strconv.Atoi(portStr)
require.NoError(t, err)
url := trust.SatelliteURL{
ID: mockSatID.ID,
Host: "127.0.0.1",
Port: port,
}
require.NoError(t, err)

// set up storagenode client
source, err := trust.NewStaticURLSource(url.String())
require.NoError(t, err)

identity, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
tlsOptions, err := tlsopts.NewOptions(identity, config.Config, nil)
require.NoError(t, err)
dialer := rpc.NewDefaultDialer(tlsOptions)
pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{
Sources: []trust.Source{source},
CachePath: ctx.File("trust-cache.json"),
}, db.Satellites())
require.NoError(t, err)
err = pool.Refresh(ctx)
require.NoError(t, err)

// should return an error when node's clock is off by more than 30m with all trusted satellites
localtime := preflight.NewLocalTime(log, preflight.Config{
LocalTimeCheck: true,
}, pool, dialer)
err = localtime.Check(ctx)
require.Error(t, err)
})

// get mock server address
_, portStr, err := net.SplitHostPort(contactServer.Addr().String())
require.NoError(t, err)
port, err := strconv.Atoi(portStr)
require.NoError(t, err)
url := trust.SatelliteURL{
ID: mockSatID.ID,
Host: "127.0.0.1",
Port: port,
}
require.NoError(t, err)

// set up storagenode client
source, err := trust.NewStaticURLSource(url.String())
require.NoError(t, err)

identity, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
tlsOptions, err := tlsopts.NewOptions(identity, config.Config, nil)
require.NoError(t, err)
dialer := rpc.NewDefaultDialer(tlsOptions)
pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{
Sources: []trust.Source{source},
CachePath: ctx.File("trust-cache.json"),
}, nil)
require.NoError(t, err)
err = pool.Refresh(ctx)
require.NoError(t, err)

// should return an error when node's clock is off by more than 30m with all trusted satellites
localtime := preflight.NewLocalTime(log, preflight.Config{
LocalTimeCheck: true,
}, pool, dialer)
err = localtime.Check(ctx)
require.Error(t, err)
})
}

Expand Down
16 changes: 9 additions & 7 deletions storagenode/satellites/satellites.go
Expand Up @@ -13,19 +13,21 @@ import (
// Status refers to the state of the relationship with a satellites.
type Status = int

// It is important that the values/order of these Status constants are not changed
// because they are stored in the database.
const (
// Unexpected status should not be used for sanity checking.
Unexpected Status = 0
// Normal status reflects a lack of graceful exit.
Normal = 1
Normal Status = 1
// Exiting reflects an active graceful exit.
Exiting = 2
Exiting Status = 2
// ExitSucceeded reflects a graceful exit that succeeded.
ExitSucceeded = 3
ExitSucceeded Status = 3
// ExitFailed reflects a graceful exit that failed.
ExitFailed = 4
ExitFailed Status = 4
// Untrusted reflects a satellite that is not trusted.
Untrusted = 5
Untrusted Status = 5
)

// ExitProgress contains the status of a graceful exit.
Expand All @@ -36,14 +38,14 @@ type ExitProgress struct {
StartingDiskUsage int64
BytesDeleted int64
CompletionReceipt []byte
Status int32
Status Status
}

// Satellite contains the satellite and status.
type Satellite struct {
SatelliteID storj.NodeID
AddedAt time.Time
Status int32
Status Status
}

// DB works with satellite database.
Expand Down
32 changes: 16 additions & 16 deletions storagenode/trust/service.go
Expand Up @@ -120,21 +120,6 @@ func (pool *Pool) Run(ctx context.Context) error {
pool.log.Error("Failed to refresh", zap.Error(err))
return err
}

for _, trustedSatellite := range pool.satellites {
status := satellites.Normal
// for cases where a satellite was previously marked as untrusted, but is now trusted
// we reset the status back to normal
satellite, err := pool.satellitesDB.GetSatellite(ctx, trustedSatellite.url.ID)
if err == nil && !satellite.SatelliteID.IsZero() {
if satellite.Status != satellites.Untrusted {
status = satellites.Status(satellite.Status)
}
}
if err := pool.satellitesDB.SetAddressAndStatus(ctx, trustedSatellite.url.ID, trustedSatellite.url.Address, status); err != nil {
return err
}
}
}
}

Expand Down Expand Up @@ -229,14 +214,29 @@ func (pool *Pool) Refresh(ctx context.Context) error {
}

// remove trusted IDs that are no longer in the URL list
for id := range pool.satellites {
for id, info := range pool.satellites {
if _, ok := trustedIDs[id]; !ok {
pool.log.Debug("Satellite is no longer trusted", zap.String("id", id.String()))
delete(pool.satellites, id)
err := pool.satellitesDB.UpdateSatelliteStatus(ctx, id, satellites.Untrusted)
if err != nil {
return err
}

continue
}

// for cases where a satellite was previously marked as untrusted, but is now trusted
// we reset the status back to normal
status := satellites.Normal
dbSatellite, err := pool.satellitesDB.GetSatellite(ctx, info.url.ID)
if err == nil && !dbSatellite.SatelliteID.IsZero() {
if dbSatellite.Status != satellites.Untrusted {
status = dbSatellite.Status
}
}
if err := pool.satellitesDB.SetAddressAndStatus(ctx, info.url.ID, info.url.Address, status); err != nil {
return err
}
}

Expand Down

0 comments on commit a2c162d

Please sign in to comment.