Skip to content

Commit

Permalink
cmd/storagenode: add check if satellites available to gracefulexit
Browse files Browse the repository at this point in the history
Change-Id: I8747507593d810bbdec0d140de0600ee147011c3
  • Loading branch information
Qweder93 committed Jun 10, 2020
1 parent 0b109c3 commit e52809d
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 36 deletions.
102 changes: 68 additions & 34 deletions cmd/storagenode/gracefulexit.go
Expand Up @@ -21,13 +21,19 @@ import (
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/private/process"
"storj.io/storj/private/date"
"storj.io/storj/private/prompt"
)

type gracefulExitClient struct {
conn *rpc.Conn
}

type unavailableSatellite struct {
id storj.NodeID
monthsLeft int
}

func dialGracefulExitClient(ctx context.Context, address string) (*gracefulExitClient, error) {
conn, err := rpc.NewDefaultDialer(nil).DialAddressUnencrypted(ctx, address)
if err != nil {
Expand All @@ -48,6 +54,10 @@ func (client *gracefulExitClient) getExitProgress(ctx context.Context) (*pb.GetE
return pb.NewDRPCNodeGracefulExitClient(client.conn).GetExitProgress(ctx, &pb.GetExitProgressRequest{})
}

func (client *gracefulExitClient) gracefulExitFeasibility(ctx context.Context, id storj.NodeID) (*pb.GracefulExitFeasibilityResponse, error) {
return pb.NewDRPCNodeGracefulExitClient(client.conn).GracefulExitFeasibility(ctx, &pb.GracefulExitFeasibilityNodeRequest{NodeId: id})
}

func (client *gracefulExitClient) close() error {
return client.conn.Close()
}
Expand Down Expand Up @@ -126,40 +136,7 @@ func cmdGracefulExitInit(cmd *cobra.Command, args []string) error {
}
}

if len(satelliteIDs) < 1 {
fmt.Println("Invalid input. Please use valid satellite domian names.")
return errs.New("Invalid satellite domain names")
}

// save satellites for graceful exit into the db
progresses := make([]*pb.ExitProgress, 0, len(satelliteIDs))
var errgroup errs.Group
for _, id := range satelliteIDs {
req := &pb.InitiateGracefulExitRequest{
NodeId: id,
}
resp, err := client.initGracefulExit(ctx, req)
if err != nil {
zap.L().Debug("Initializing graceful exit failed.", zap.Stringer("Satellite ID", id), zap.Error(err))
errgroup.Add(err)
continue
}
progresses = append(progresses, resp)
}

if len(progresses) < 1 {
fmt.Println("Failed to initialize graceful exit. Please try again later.")
return errgroup.Err()
}

displayExitProgress(w, progresses)

err = w.Flush()
if err != nil {
return errs.Wrap(err)
}

return nil
return gracefulExitInit(ctx, satelliteIDs, w, client)
}

func cmdGracefulExitStatus(cmd *cobra.Command, args []string) (err error) {
Expand Down Expand Up @@ -224,3 +201,60 @@ func displayExitProgress(w io.Writer, progresses []*pb.ExitProgress) {
fmt.Fprintf(w, "%s\t%s\t%.2f%%\t%s\t%s\t\n", progress.GetDomainName(), progress.NodeId.String(), progress.GetPercentComplete(), isSuccessful, receipt)
}
}

func gracefulExitInit(ctx context.Context, satelliteIDs []storj.NodeID, w *tabwriter.Writer, client *gracefulExitClient) (err error) {
if len(satelliteIDs) < 1 {
fmt.Println("Invalid input. Please use valid satellite domian names.")
return errs.New("Invalid satellite domain names")
}

var satellites []unavailableSatellite
for i := 0; i < len(satelliteIDs); i++ {
response, err := client.gracefulExitFeasibility(ctx, satelliteIDs[i])
if err != nil {
return err
}
if !response.IsAllowed {
left := int(response.MonthsRequired) - date.MonthsCountSince(response.JoinedAt)
satellites = append(satellites, unavailableSatellite{id: satelliteIDs[i], monthsLeft: left})
}
}

if satellites != nil {
fmt.Println("You are not allowed to initiate graceful exit on satellite for next amount of months:")
for _, satellite := range satellites {
fmt.Fprintf(w, "%s\t%d\n", satellite.id.String(), satellite.monthsLeft)
}
return errs.New("You are not allowed to graceful exit on some of provided satellites")
}

// save satellites for graceful exit into the db
progresses := make([]*pb.ExitProgress, 0, len(satelliteIDs))
var errgroup errs.Group
for _, id := range satelliteIDs {
req := &pb.InitiateGracefulExitRequest{
NodeId: id,
}
resp, err := client.initGracefulExit(ctx, req)
if err != nil {
zap.L().Debug("Initializing graceful exit failed.", zap.Stringer("Satellite ID", id), zap.Error(err))
errgroup.Add(err)
continue
}
progresses = append(progresses, resp)
}

if len(progresses) < 1 {
fmt.Println("Failed to initialize graceful exit. Please try again later.")
return errgroup.Err()
}

displayExitProgress(w, progresses)

err = w.Flush()
if err != nil {
return errs.Wrap(err)
}

return nil
}
73 changes: 73 additions & 0 deletions cmd/storagenode/gracefulexit_test.go
@@ -0,0 +1,73 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information

package main

import (
"os"
"testing"
"text/tabwriter"

"github.com/stretchr/testify/require"
"go.uber.org/zap"

"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
)

func TestGracefulExitTooEarly(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 2, StorageNodeCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
if index == 1 {
config.GracefulExit.NodeMinAgeInMonths = 0
} else {
config.GracefulExit.NodeMinAgeInMonths = 6
}
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
address := planet.StorageNodes[0].Server.PrivateAddr().String()

client, err := dialGracefulExitClient(ctx, address)
require.NoError(t, err)

response, err := client.gracefulExitFeasibility(ctx, planet.Satellites[0].ID())
require.NoError(t, err)
require.Equal(t, response.IsAllowed, false)

response2, err := client.gracefulExitFeasibility(ctx, planet.Satellites[1].ID())
require.NoError(t, err)
require.Equal(t, response2.IsAllowed, true)
})
}

func TestGracefulExitInit(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 2, StorageNodeCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
if index == 1 {
config.GracefulExit.NodeMinAgeInMonths = 0
} else {
config.GracefulExit.NodeMinAgeInMonths = 6
}
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
var satelliteIDs []storj.NodeID
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)

address := planet.StorageNodes[0].Server.PrivateAddr().String()
satelliteIDs = append(satelliteIDs, planet.Satellites[0].ID(), planet.Satellites[1].ID())

client, err := dialGracefulExitClient(ctx, address)
require.NoError(t, err)

err = gracefulExitInit(ctx, satelliteIDs, w, client)
require.Error(t, err)
})
}
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -39,7 +39,7 @@ require (
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20200428211428-0c9eba77bc32 // indirect
storj.io/common v0.0.0-20200601223809-7af8b7ee5d6c
storj.io/common v0.0.0-20200610120518-23c66ad8b617
storj.io/drpc v0.0.12
storj.io/monkit-jaeger v0.0.0-20200518165323-80778fc3f91b
storj.io/private v0.0.0-20200605221229-3236fe879ab3
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Expand Up @@ -639,6 +639,10 @@ storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0
storj.io/common v0.0.0-20200529121635-ef4a5bc8ec88/go.mod h1:6S6Ub92/BB+ofU7hbyPcm96b4Q1ayyN0HLog+3u+wGc=
storj.io/common v0.0.0-20200601223809-7af8b7ee5d6c h1:Qq+20xjNcJfGIbErbMV9ndsHH179LtA9m6LA4fFTrvM=
storj.io/common v0.0.0-20200601223809-7af8b7ee5d6c/go.mod h1:6S6Ub92/BB+ofU7hbyPcm96b4Q1ayyN0HLog+3u+wGc=
storj.io/common v0.0.0-20200608231620-70c5a4b319bb h1:J1mRpqv/WuVZKxjIQ1H4pK1bUZxF5KNT3F80evQzpj4=
storj.io/common v0.0.0-20200608231620-70c5a4b319bb/go.mod h1:6S6Ub92/BB+ofU7hbyPcm96b4Q1ayyN0HLog+3u+wGc=
storj.io/common v0.0.0-20200610120518-23c66ad8b617 h1:7Mo4JvcCvT8BQyGWTMmsI+BYRKCKrXEadXSfdVXifE8=
storj.io/common v0.0.0-20200610120518-23c66ad8b617/go.mod h1:6S6Ub92/BB+ofU7hbyPcm96b4Q1ayyN0HLog+3u+wGc=
storj.io/drpc v0.0.11 h1:6vLxfpSbwCLtqzAoXzXx/SxBqBtbzbmquXPqfcWKqfw=
storj.io/drpc v0.0.11 h1:6vLxfpSbwCLtqzAoXzXx/SxBqBtbzbmquXPqfcWKqfw=
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
Expand Down
31 changes: 31 additions & 0 deletions satellite/gracefulexit/endpoint.go
Expand Up @@ -908,3 +908,34 @@ func (endpoint *Endpoint) getNodePiece(ctx context.Context, pointer *pb.Pointer,

return nodePiece, nil
}

// GracefulExitFeasibility returns node's joined at date, nodeMinAge and if graceful exit available.
func (endpoint *Endpoint) GracefulExitFeasibility(ctx context.Context, req *pb.GracefulExitFeasibilityRequest) (_ *pb.GracefulExitFeasibilityResponse, err error) {
defer mon.Task()(&ctx)(&err)

peer, err := identity.PeerIdentityFromContext(ctx)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, Error.Wrap(err).Error())
}

endpoint.log.Debug("graceful exit process", zap.Stringer("Node ID", peer.ID))

var response pb.GracefulExitFeasibilityResponse

nodeDossier, err := endpoint.overlaydb.Get(ctx, peer.ID)
if err != nil {
endpoint.log.Error("unable to retrieve node dossier for attempted exiting node", zap.Stringer("node ID", peer.ID))
return nil, Error.Wrap(err)
}

eligibilityDate := nodeDossier.CreatedAt.AddDate(0, endpoint.config.NodeMinAgeInMonths, 0)
if time.Now().Before(eligibilityDate) {
response.IsAllowed = false
} else {
response.IsAllowed = true
}

response.JoinedAt = nodeDossier.CreatedAt
response.MonthsRequired = int32(endpoint.config.NodeMinAgeInMonths)
return &response, nil
}
30 changes: 29 additions & 1 deletion storagenode/gracefulexit/endpoint.go
Expand Up @@ -7,9 +7,11 @@ import (
"context"
"time"

"github.com/zeebo/errs"
"go.uber.org/zap"

"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/satellites"
Expand All @@ -22,15 +24,17 @@ type Endpoint struct {
usageCache *pieces.BlobsUsageCache
trust *trust.Pool
satellites satellites.DB
dialer rpc.Dialer
}

// NewEndpoint creates a new graceful exit endpoint.
func NewEndpoint(log *zap.Logger, trust *trust.Pool, satellites satellites.DB, usageCache *pieces.BlobsUsageCache) *Endpoint {
func NewEndpoint(log *zap.Logger, trust *trust.Pool, satellites satellites.DB, dialer rpc.Dialer, usageCache *pieces.BlobsUsageCache) *Endpoint {
return &Endpoint{
log: log,
usageCache: usageCache,
trust: trust,
satellites: satellites,
dialer: dialer,
}
}

Expand Down Expand Up @@ -155,3 +159,27 @@ func (e *Endpoint) GetExitProgress(ctx context.Context, req *pb.GetExitProgressR
}
return resp, nil
}

// GracefulExitFeasibility returns graceful exit feasibility by node's age on chosen satellite.
func (e *Endpoint) GracefulExitFeasibility(ctx context.Context, request *pb.GracefulExitFeasibilityNodeRequest) (*pb.GracefulExitFeasibilityResponse, error) {
nodeurl, err := e.trust.GetNodeURL(ctx, request.NodeId)
if err != nil {
return nil, errs.New("unable to find satellite %s: %w", request.NodeId, err)
}

conn, err := e.dialer.DialNodeURL(ctx, nodeurl)
if err != nil {
return nil, errs.Wrap(err)
}
defer func() {
err = errs.Combine(err, conn.Close())
}()

client := pb.NewDRPCSatelliteGracefulExitClient(conn)

feasibility, err := client.GracefulExitFeasibility(ctx, &pb.GracefulExitFeasibilityRequest{})
if err != nil {
return nil, errs.Wrap(err)
}
return feasibility, nil
}
1 change: 1 addition & 0 deletions storagenode/peer.go
Expand Up @@ -635,6 +635,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Log.Named("gracefulexit:endpoint"),
peer.Storage2.Trust,
peer.DB.Satellites(),
peer.Dialer,
peer.Storage2.BlobsCache,
)
if err := pb.DRPCRegisterNodeGracefulExit(peer.Server.PrivateDRPC(), peer.GracefulExit.Endpoint); err != nil {
Expand Down

0 comments on commit e52809d

Please sign in to comment.