-
Notifications
You must be signed in to change notification settings - Fork 385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storagenode/gracefulexit: Implement storage node graceful exit worker - part 1 #3322
Conversation
satellite/gracefulexit/endpoint.go
Outdated
@@ -182,6 +182,12 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { | |||
pending := newPendingMap() | |||
|
|||
var morePiecesFlag int32 = 1 | |||
var errorFlag int32 = 0 | |||
handleError := func(err error) error { | |||
atomic.StoreInt32(&errorFlag, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I'm curious what's the benefit of using the atomic store vs. just returning the wrapped error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this so I didn't have to call group.Wait() in the for loop to know if an error occurred. But there's a probably a better way I'm just not familiar with in Go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we could have an error channel? I'm not totally sure what would be better either- maybe this is fine, but I just don't think I'd seen it used before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with having an error channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to use channel.
require.NoError(t, err) | ||
|
||
err = exitingNode.DB.Satellites().InitiateGracefulExit(ctx, satellite1.ID(), time.Now(), 10000) | ||
require.NoError(t, err) | ||
|
||
// check that theh storage node is exiting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - the
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
storagenode/gracefulexit/worker.go
Outdated
if errs.Is(err, os.ErrNotExist) { | ||
transferErr = pb.TransferFailed_NOT_FOUND | ||
} | ||
worker.log.Error("failed to get piece reader.", zap.String("satellite ID", satelliteID.String()), zap.String("piece ID", pieceID.String()), zap.Error(errs.Wrap(err))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - I think use zap.Stringer instead of zap.String - then you can just pass in satelliteID instead of satelliteID.String()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
storagenode/gracefulexit/worker.go
Outdated
} | ||
|
||
// verifyPieceHash verifies whether the piece hash matches the locally computed hash. | ||
func verifyPieceHash(ctx context.Context, limit *pb.OrderLimit, hash *pb.PieceHash, expectedHash []byte) (err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason for using this instead of the piecestore.Endpoint's exported VerifyPieceHash method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
piecestore.Endpoint.VerifyPieceHash also checks signatures. As far as I know, we don't have the information to check the signature. I could be wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added signature verification
satellite/gracefulexit/endpoint.go
Outdated
@@ -182,6 +182,12 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { | |||
pending := newPendingMap() | |||
|
|||
var morePiecesFlag int32 = 1 | |||
var errorFlag int32 = 0 | |||
handleError := func(err error) error { | |||
atomic.StoreInt32(&errorFlag, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we could have an error channel? I'm not totally sure what would be better either- maybe this is fine, but I just don't think I'd seen it used before.
|
||
deleteMsg := &pb.SatelliteMessage{ | ||
Message: &pb.SatelliteMessage_DeletePiece{ | ||
DeletePiece: &pb.DeletePiece{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to send the satellite signature with the delete message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could a malicious node send a delete message though? maybe it doesn't work that way lol but just checking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exiting node connects to the satellite and receives the delete message. There is no way (that I know of) for another storage node to connect to a node and send the delete message.
storagenode/gracefulexit/worker.go
Outdated
pieceID := msg.DeletePiece.OriginalPieceId | ||
err := worker.store.Delete(ctx, satelliteID, pieceID) | ||
if err != nil { | ||
worker.log.Error("failed to delete piece.", zap.Stringer("satellite ID", satelliteID), zap.Stringer("piece ID", pieceID), zap.Error(errs.Wrap(err))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we break here to wait for the next worker execution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking we'd just move on to the next message, but maybe I'm missing something. In addition, the storage node should purge any remaining pieces at the end of the overall graceful exit process anyway.
@@ -743,7 +743,7 @@ func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, buc | |||
UplinkPublicKey: piecePublicKey, | |||
StorageNodeId: nodeID, | |||
PieceId: rootPieceID.Derive(nodeID, pieceNum), | |||
Action: pb.PieceAction_PUT_GRACEFUL_EXIT, | |||
Action: pb.PieceAction_PUT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you removing PUT_GRACEFUL_EXIT
? I think it could be useful to keep it down the line so we can distinguish normal uploads from graceful exit uploads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will address in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mobyvb why would that be useful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also there are multiple places that need updating to make PieceAction_PUT_GRACEFUL_EXIT work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@egonelbre I was thinking it could help to analyze problems that SNOs may notice, but it's not essential.
} | ||
break | ||
default: | ||
// TODO handle err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we return a custom error in this case? This should never happen.
exitingNode.GracefulExit.Chore.Loop.TriggerWait() | ||
|
||
exitProgress, err = exitingNode.DB.Satellites().ListGracefulExits(ctx) | ||
require.NoError(t, err) | ||
for _, progress := range exitProgress { | ||
if progress.SatelliteID == satellite1.ID() { | ||
require.NotNil(t, progress.FinishedAt) | ||
} | ||
} | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should also check to ensure that the piece that needed to be transferred has shown up on one of the remaining 8 nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can iterate over metainfo before the transfer(s), then keep track of all the paths/piece numbers associated with the exiting node. Then, after the transfers, we can check metainfo for the same segments, and expect that the same piece numbers are still in the pointers, but are associated with different nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
return errs.Wrap(err) | ||
} | ||
|
||
func (worker *Worker) handleFailure(ctx context.Context, transferError pb.TransferFailed_Error, pieceID pb.PieceID, satelliteID storj.NodeID, send func(*pb.StorageNodeMessage) error) { | ||
failure := &pb.StorageNodeMessage{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need the satelliteID
argument here since it's on the worker struct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
nodePieceCounts := make(map[storj.NodeID]int) | ||
for _, n := range planet.StorageNodes { | ||
node := n | ||
// make sure there are no more pieces on the node. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment here is a bit misleading
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Copy/paste error. Updated
storagenode/gracefulexit/chore.go
Outdated
@@ -70,16 +79,16 @@ func (chore *Chore) Run(ctx context.Context) (err error) { | |||
|
|||
for _, satellite := range satellites { | |||
satelliteID := satellite.SatelliteID | |||
worker := NewWorker(chore.log, chore.satelliteDB, satelliteID) | |||
worker := NewWorker(chore.log, chore.store, chore.satelliteDB, chore.trust, chore.dialer, satelliteID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious what's the reason for passing in the trust
down to worker instead of getting the address here and then pass the address down to the worker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree. Only the address is necessary. Updated
} else { | ||
worker.log.Error("failed to put piece.", zap.Stringer("satellite ID", worker.satelliteID), zap.Stringer("piece ID", pieceID), zap.Error(errs.Wrap(err))) | ||
// TODO look at error type to decide on the transfer error | ||
worker.handleFailure(ctx, pb.TransferFailed_STORAGE_NODE_UNAVAILABLE, pieceID, c.Send) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use TransferFailed_STORAGE_NODE_UNAVAILABLE
instead of UNKNOWN
?
errChan := make(chan error, 1) | ||
handleError := func(err error) error { | ||
errChan <- err | ||
close(errChan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something about this immediate closing of the err channel after sending the error to it is not sitting right with me, but I'm not sure why.. just gonna bookmark this for now. would the group.Wait() still be called? if not, would that mean the goroutines could potentially be left running?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is only 1 sender and 1 receiver for the channel. I think it's safe to close it in the sender code. The group wait gets checked on error and again before the method exits.
|
||
deleteMsg := &pb.SatelliteMessage{ | ||
Message: &pb.SatelliteMessage_DeletePiece{ | ||
DeletePiece: &pb.DeletePiece{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could a malicious node send a delete message though? maybe it doesn't work that way lol but just checking
func getNodePieceCounts(ctx context.Context, planet *testplanet.Planet) (_ map[storj.NodeID]int, err error) { | ||
nodePieceCounts := make(map[storj.NodeID]int) | ||
for _, n := range planet.StorageNodes { | ||
node := n |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you have to do this reassignment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
linter didn't like the former.
storagenode/gracefulexit/chore_test.go:149:21: Using the variable on range scope `node` in function literal (scopelint)
nodePieceCounts[node.ID()]++
storagenode/gracefulexit/worker.go
Outdated
// https://storjlabs.atlassian.net/browse/V3-2613 | ||
addr, err := worker.trust.GetAddress(ctx, worker.satelliteID) | ||
if err != nil { | ||
return errs.Wrap(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - wondering if we should have a worker specific error class and use that to wrap instead of the general errs.Wrap.
continue | ||
} | ||
|
||
putCtx, cancel := context.WithCancel(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^I think this is a good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. We can tie up remaining loose ends in part 2 once everything is hooked up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What:
This worker is responsible for initiating graceful exit with the satellite and then processes piece transfer requests.
https://storjlabs.atlassian.net/browse/V3-2613
Splitting this into 2 parts so that we begin end to end testing of all components
Part 2 will implement concurrent processing of the transfers and better error handling.
Why:
This is needed to transfer pieces of an exiting node to a replacement node.
Please describe the tests:
Please describe the performance impact:
Code Review Checklist (to be filled out by reviewer)