From b507274ed446847a59b950ba774411806adf0ccb Mon Sep 17 00:00:00 2001 From: Moby von Briesen Date: Wed, 20 Nov 2019 15:10:47 -0500 Subject: [PATCH] fix compilation issues --- satellite/gracefulexit/endpoint.go | 70 +++++++++++++------------- satellite/gracefulexit/pending.go | 25 +++++---- satellite/gracefulexit/pending_test.go | 33 ++++++------ satellite/gracefulexit/validation.go | 20 ++++---- 4 files changed, 79 insertions(+), 69 deletions(-) diff --git a/satellite/gracefulexit/endpoint.go b/satellite/gracefulexit/endpoint.go index d1bcf64d996c..fb6c0beb1905 100644 --- a/satellite/gracefulexit/endpoint.go +++ b/satellite/gracefulexit/endpoint.go @@ -183,7 +183,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { } // maps pieceIDs to pendingTransfers to keep track of ongoing piece transfer requests - pending := newPendingMap() + pending := NewPendingMap() var group errgroup.Group group.Go(func() error { incompleteLoop := sync2.NewCycle(endpoint.interval) @@ -196,7 +196,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { ctx, cancel := context.WithCancel(ctx) return incompleteLoop.Run(ctx, func(ctx context.Context) error { - if pending.length() == 0 { + if pending.Length() == 0 { incomplete, err := endpoint.db.GetIncompleteNotFailed(ctx, nodeID, endpoint.config.EndpointBatchSize, 0) if err != nil { return handleError(err) @@ -224,7 +224,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { return handleError(err) } } - if pending.length() > 0 { + if pending.Length() > 0 { processCond.Broadcast() } } @@ -239,7 +239,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { default: } - pendingCount := pending.length() + pendingCount := pending.Length() processMu.Lock() // if there are no more transfers and the pending queue is empty, send complete @@ -276,7 +276,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { } // if pending count is still 0 and the loop has exited, return - if pending.length() == 0 && !loopRunningFlag { + if pending.Length() == 0 && !loopRunningFlag { processMu.Unlock() continue } @@ -361,7 +361,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { return nil } -func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processStream, pending *pendingMap, incomplete *TransferQueueItem) error { +func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processStream, pending *PendingMap, incomplete *TransferQueueItem) error { nodeID := incomplete.NodeID if incomplete.OrderLimitSendCount >= endpoint.config.MaxOrderLimitSendCount { @@ -476,23 +476,23 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS } // update pending queue with the transfer item - pending.put(pieceID, &pendingTransfer{ - path: incomplete.Path, - pieceSize: pieceSize, - satelliteMessage: transferMsg, - originalPointer: pointer, - pieceNum: incomplete.PieceNum, + pending.Put(pieceID, &PendingTransfer{ + Path: incomplete.Path, + PieceSize: pieceSize, + SatelliteMessage: transferMsg, + OriginalPointer: pointer, + PieceNum: incomplete.PieceNum, }) return nil } -func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStream, pending *pendingMap, exitingNodeID storj.NodeID, message *pb.StorageNodeMessage_Succeeded) (err error) { +func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStream, pending *PendingMap, exitingNodeID storj.NodeID, message *pb.StorageNodeMessage_Succeeded) (err error) { defer mon.Task()(&ctx)(&err) originalPieceID := message.Succeeded.OriginalPieceId - transfer, ok := pending.get(originalPieceID) + transfer, ok := pending.Get(originalPieceID) if !ok { endpoint.log.Error("Could not find transfer item in pending queue", zap.Stringer("Piece ID", originalPieceID)) return Error.New("Could not find transfer item in pending queue") @@ -503,7 +503,7 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStr return Error.Wrap(err) } - receivingNodeID := transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit().StorageNodeId + receivingNodeID := transfer.SatelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit().StorageNodeId // get peerID and signee for new storage node peerID, err := endpoint.peerIdentities.Get(ctx, receivingNodeID) if err != nil { @@ -514,15 +514,15 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStr if err != nil { return Error.Wrap(err) } - transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, exitingNodeID, transfer.path, transfer.pieceNum) + transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, exitingNodeID, transfer.Path, transfer.PieceNum) if err != nil { return Error.Wrap(err) } - err = endpoint.updatePointer(ctx, transfer.originalPointer, exitingNodeID, receivingNodeID, string(transfer.path), transfer.pieceNum, transferQueueItem.RootPieceID) + err = endpoint.updatePointer(ctx, transfer.OriginalPointer, exitingNodeID, receivingNodeID, string(transfer.Path), transfer.PieceNum, transferQueueItem.RootPieceID) if err != nil { // remove the piece from the pending queue so it gets retried - pending.delete(originalPieceID) + pending.Delete(originalPieceID) return Error.Wrap(err) } @@ -532,17 +532,17 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStr failed = -1 } - err = endpoint.db.IncrementProgress(ctx, exitingNodeID, transfer.pieceSize, 1, failed) + err = endpoint.db.IncrementProgress(ctx, exitingNodeID, transfer.PieceSize, 1, failed) if err != nil { return Error.Wrap(err) } - err = endpoint.db.DeleteTransferQueueItem(ctx, exitingNodeID, transfer.path, transfer.pieceNum) + err = endpoint.db.DeleteTransferQueueItem(ctx, exitingNodeID, transfer.Path, transfer.PieceNum) if err != nil { return Error.Wrap(err) } - pending.delete(originalPieceID) + pending.Delete(originalPieceID) deleteMsg := &pb.SatelliteMessage{ Message: &pb.SatelliteMessage_DeletePiece{ @@ -561,19 +561,19 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStr return nil } -func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap, nodeID storj.NodeID, message *pb.StorageNodeMessage_Failed) (err error) { +func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap, nodeID storj.NodeID, message *pb.StorageNodeMessage_Failed) (err error) { defer mon.Task()(&ctx)(&err) endpoint.log.Warn("transfer failed", zap.Stringer("Piece ID", message.Failed.OriginalPieceId), zap.Stringer("transfer error", message.Failed.GetError())) mon.Meter("graceful_exit_transfer_piece_fail").Mark(1) //locked pieceID := message.Failed.OriginalPieceId - transfer, ok := pending.get(pieceID) + transfer, ok := pending.Get(pieceID) if !ok { endpoint.log.Debug("could not find transfer message in pending queue. skipping.", zap.Stringer("Piece ID", pieceID)) // TODO we should probably error out here so we don't get stuck in a loop with a SN that is not behaving properl } - transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, nodeID, transfer.path, transfer.pieceNum) + transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, nodeID, transfer.Path, transfer.PieceNum) if err != nil { return Error.Wrap(err) } @@ -589,38 +589,38 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap, // Remove the queue item and remove the node from the pointer. // If the pointer is not piece hash verified, do not count this as a failure. if pb.TransferFailed_Error(errorCode) == pb.TransferFailed_NOT_FOUND { - endpoint.log.Debug("piece not found on node", zap.Stringer("node ID", nodeID), zap.ByteString("path", transfer.path), zap.Int32("piece num", transfer.pieceNum)) - pointer, err := endpoint.metainfo.Get(ctx, string(transfer.path)) + endpoint.log.Debug("piece not found on node", zap.Stringer("node ID", nodeID), zap.ByteString("path", transfer.Path), zap.Int32("piece num", transfer.PieceNum)) + pointer, err := endpoint.metainfo.Get(ctx, string(transfer.Path)) if err != nil { return Error.Wrap(err) } remote := pointer.GetRemote() if remote == nil { - err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.path, transfer.pieceNum) + err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Path, transfer.PieceNum) if err != nil { return Error.Wrap(err) } - pending.delete(pieceID) + pending.Delete(pieceID) return nil } pieces := remote.GetRemotePieces() var nodePiece *pb.RemotePiece for _, piece := range pieces { - if piece.NodeId == nodeID && piece.PieceNum == transfer.pieceNum { + if piece.NodeId == nodeID && piece.PieceNum == transfer.PieceNum { nodePiece = piece } } if nodePiece == nil { - err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.path, transfer.pieceNum) + err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Path, transfer.PieceNum) if err != nil { return Error.Wrap(err) } - pending.delete(pieceID) + pending.Delete(pieceID) return nil } - _, err = endpoint.metainfo.UpdatePieces(ctx, string(transfer.path), pointer, nil, []*pb.RemotePiece{nodePiece}) + _, err = endpoint.metainfo.UpdatePieces(ctx, string(transfer.Path), pointer, nil, []*pb.RemotePiece{nodePiece}) if err != nil { return Error.Wrap(err) } @@ -634,11 +634,11 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap, } } - err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.path, transfer.pieceNum) + err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Path, transfer.PieceNum) if err != nil { return Error.Wrap(err) } - pending.delete(pieceID) + pending.Delete(pieceID) return nil } @@ -659,7 +659,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap, } } - pending.delete(pieceID) + pending.Delete(pieceID) return nil } diff --git a/satellite/gracefulexit/pending.go b/satellite/gracefulexit/pending.go index 540381e397ba..ce2220178592 100644 --- a/satellite/gracefulexit/pending.go +++ b/satellite/gracefulexit/pending.go @@ -12,11 +12,11 @@ import ( ) type PendingTransfer struct { - path []byte - pieceSize int64 - satelliteMessage *pb.SatelliteMessage - originalPointer *pb.Pointer - pieceNum int32 + Path []byte + PieceSize int64 + SatelliteMessage *pb.SatelliteMessage + OriginalPointer *pb.Pointer + PieceNum int32 } // PendingMap for managing concurrent access to the pending transfer map. @@ -34,11 +34,16 @@ func NewPendingMap() *PendingMap { } // put adds to the map. -func (pm *PendingMap) Put(pieceID storj.PieceID, PendingTransfer *PendingTransfer) { +func (pm *PendingMap) Put(pieceID storj.PieceID, pendingTransfer *PendingTransfer) error { pm.mu.Lock() defer pm.mu.Unlock() - pm.data[pieceID] = PendingTransfer + if _, ok := pm.data[pieceID]; ok { + return Error.New("piece ID already exists in pending map") + } + + pm.data[pieceID] = pendingTransfer + return nil } // get returns the pending transfer item from the map, if it exists. @@ -71,17 +76,17 @@ func (pm *PendingMap) Delete(pieceID storj.PieceID) error { } // IsFinished determines whether the work is finished, and blocks if needed. -func (pm *PendingMap) IsFinished(ctx context.Context) (bool, error) { +func (pm *PendingMap) IsFinished(ctx context.Context) bool { pm.mu.Lock() defer pm.mu.Unlock() if len(pm.data) > 0 { - return false, nil + return false } // concurrently wait for finish or more work // if finish happens first, return true. Otherwise return false. - return false, err + return false } // Finish is called when no more work will be added to the map. diff --git a/satellite/gracefulexit/pending_test.go b/satellite/gracefulexit/pending_test.go index b7636e0b283c..7138d5f051a4 100644 --- a/satellite/gracefulexit/pending_test.go +++ b/satellite/gracefulexit/pending_test.go @@ -4,6 +4,7 @@ package gracefulexit_test import ( + "bytes" "context" "testing" "time" @@ -22,12 +23,12 @@ func TestPendingBasic(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() - newWork := gracefulexit.PendingTransfer{ - path: []byte("testbucket/testfile"), - pieceSize: 10, - satelliteMessage: &pb.SatelliteMessage{}, - originalPointer: &pb.Pointer{}, - pieceNum: 1, + newWork := &gracefulexit.PendingTransfer{ + Path: []byte("testbucket/testfile"), + PieceSize: 10, + SatelliteMessage: &pb.SatelliteMessage{}, + OriginalPointer: &pb.Pointer{}, + PieceNum: 1, } pieceID := testrand.PieceID() @@ -39,13 +40,13 @@ func TestPendingBasic(t *testing.T) { require.NoError(t, err) // put should return an error if the item already exists - err := pending.Put(pieceID, newWork) + err = pending.Put(pieceID, newWork) require.Error(t, err) // get should work w, ok := pending.Get(pieceID) require.True(t, ok) - require.Equal(newWork.path, w.path) + require.True(t, bytes.Equal(newWork.Path, w.Path)) invalidPieceID := testrand.PieceID() _, ok = pending.Get(invalidPieceID) @@ -91,12 +92,12 @@ func TestPendingIsFinishedWorkAdded(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() - newWork := gracefulexit.PendingTransfer{ - path: []byte("testbucket/testfile"), - pieceSize: 10, - satelliteMessage: &pb.SatelliteMessage{}, - originalPointer: &pb.Pointer{}, - pieceNum: 1, + newWork := &gracefulexit.PendingTransfer{ + Path: []byte("testbucket/testfile"), + PieceSize: 10, + SatelliteMessage: &pb.SatelliteMessage{}, + OriginalPointer: &pb.Pointer{}, + PieceNum: 1, } pieceID := testrand.PieceID() pending := gracefulexit.NewPendingMap() @@ -135,6 +136,8 @@ func TestPendingIsFinishedFinishedCalled(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() + pending := gracefulexit.NewPendingMap() + fence := sync2.Fence{} var group errgroup.Group group.Go(func() error { @@ -161,6 +164,8 @@ func TestPendingIsFinishedCtxCanceled(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() + pending := gracefulexit.NewPendingMap() + ctx2, cancel := context.WithCancel(ctx) fence := sync2.Fence{} var group errgroup.Group diff --git a/satellite/gracefulexit/validation.go b/satellite/gracefulexit/validation.go index 217408e7e130..e2583d0ff0cf 100644 --- a/satellite/gracefulexit/validation.go +++ b/satellite/gracefulexit/validation.go @@ -12,30 +12,30 @@ import ( "storj.io/storj/pkg/signing" ) -func (endpoint *Endpoint) validatePendingTransfer(ctx context.Context, transfer *pendingTransfer) error { - if transfer.satelliteMessage == nil { +func (endpoint *Endpoint) validatePendingTransfer(ctx context.Context, transfer *PendingTransfer) error { + if transfer.SatelliteMessage == nil { return Error.New("Satellite message cannot be nil") } - if transfer.satelliteMessage.GetTransferPiece() == nil { + if transfer.SatelliteMessage.GetTransferPiece() == nil { return Error.New("Satellite message transfer piece cannot be nil") } - if transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit() == nil { + if transfer.SatelliteMessage.GetTransferPiece().GetAddressedOrderLimit() == nil { return Error.New("Addressed order limit on transfer piece cannot be nil") } - if transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit() == nil { + if transfer.SatelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit() == nil { return Error.New("Addressed order limit on transfer piece cannot be nil") } - if transfer.path == nil { + if transfer.Path == nil { return Error.New("Transfer path cannot be nil") } - if transfer.originalPointer == nil || transfer.originalPointer.GetRemote() == nil { + if transfer.OriginalPointer == nil || transfer.OriginalPointer.GetRemote() == nil { return Error.New("could not get remote pointer from transfer item") } return nil } -func (endpoint *Endpoint) verifyPieceTransferred(ctx context.Context, message *pb.StorageNodeMessage_Succeeded, transfer *pendingTransfer, receivingNodePeerID *identity.PeerIdentity) error { +func (endpoint *Endpoint) verifyPieceTransferred(ctx context.Context, message *pb.StorageNodeMessage_Succeeded, transfer *PendingTransfer, receivingNodePeerID *identity.PeerIdentity) error { originalOrderLimit := message.Succeeded.GetOriginalOrderLimit() if originalOrderLimit == nil { return ErrInvalidArgument.New("Original order limit cannot be nil") @@ -70,8 +70,8 @@ func (endpoint *Endpoint) verifyPieceTransferred(ctx context.Context, message *p return ErrInvalidArgument.New("Invalid original piece ID") } - receivingNodeID := transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit().StorageNodeId - calculatedNewPieceID := transfer.originalPointer.GetRemote().RootPieceId.Derive(receivingNodeID, transfer.pieceNum) + receivingNodeID := transfer.SatelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit().StorageNodeId + calculatedNewPieceID := transfer.OriginalPointer.GetRemote().RootPieceId.Derive(receivingNodeID, transfer.PieceNum) if calculatedNewPieceID != replacementPieceHash.PieceId { return ErrInvalidArgument.New("Invalid replacement piece ID") }