Skip to content

Commit

Permalink
fix compilation issues
Browse files Browse the repository at this point in the history
  • Loading branch information
mobyvb committed Nov 20, 2019
1 parent 1f97e86 commit b507274
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 69 deletions.
70 changes: 35 additions & 35 deletions satellite/gracefulexit/endpoint.go
Expand Up @@ -183,7 +183,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
}

// maps pieceIDs to pendingTransfers to keep track of ongoing piece transfer requests
pending := newPendingMap()
pending := NewPendingMap()
var group errgroup.Group
group.Go(func() error {
incompleteLoop := sync2.NewCycle(endpoint.interval)
Expand All @@ -196,7 +196,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {

ctx, cancel := context.WithCancel(ctx)
return incompleteLoop.Run(ctx, func(ctx context.Context) error {
if pending.length() == 0 {
if pending.Length() == 0 {
incomplete, err := endpoint.db.GetIncompleteNotFailed(ctx, nodeID, endpoint.config.EndpointBatchSize, 0)
if err != nil {
return handleError(err)
Expand Down Expand Up @@ -224,7 +224,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
return handleError(err)
}
}
if pending.length() > 0 {
if pending.Length() > 0 {
processCond.Broadcast()
}
}
Expand All @@ -239,7 +239,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
default:
}

pendingCount := pending.length()
pendingCount := pending.Length()

processMu.Lock()
// if there are no more transfers and the pending queue is empty, send complete
Expand Down Expand Up @@ -276,7 +276,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
}

// if pending count is still 0 and the loop has exited, return
if pending.length() == 0 && !loopRunningFlag {
if pending.Length() == 0 && !loopRunningFlag {
processMu.Unlock()
continue
}
Expand Down Expand Up @@ -361,7 +361,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
return nil
}

func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processStream, pending *pendingMap, incomplete *TransferQueueItem) error {
func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processStream, pending *PendingMap, incomplete *TransferQueueItem) error {
nodeID := incomplete.NodeID

if incomplete.OrderLimitSendCount >= endpoint.config.MaxOrderLimitSendCount {
Expand Down Expand Up @@ -476,23 +476,23 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS
}

// update pending queue with the transfer item
pending.put(pieceID, &pendingTransfer{
path: incomplete.Path,
pieceSize: pieceSize,
satelliteMessage: transferMsg,
originalPointer: pointer,
pieceNum: incomplete.PieceNum,
pending.Put(pieceID, &PendingTransfer{
Path: incomplete.Path,
PieceSize: pieceSize,
SatelliteMessage: transferMsg,
OriginalPointer: pointer,
PieceNum: incomplete.PieceNum,
})

return nil
}

func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStream, pending *pendingMap, exitingNodeID storj.NodeID, message *pb.StorageNodeMessage_Succeeded) (err error) {
func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStream, pending *PendingMap, exitingNodeID storj.NodeID, message *pb.StorageNodeMessage_Succeeded) (err error) {
defer mon.Task()(&ctx)(&err)

originalPieceID := message.Succeeded.OriginalPieceId

transfer, ok := pending.get(originalPieceID)
transfer, ok := pending.Get(originalPieceID)
if !ok {
endpoint.log.Error("Could not find transfer item in pending queue", zap.Stringer("Piece ID", originalPieceID))
return Error.New("Could not find transfer item in pending queue")
Expand All @@ -503,7 +503,7 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStr
return Error.Wrap(err)
}

receivingNodeID := transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit().StorageNodeId
receivingNodeID := transfer.SatelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit().StorageNodeId
// get peerID and signee for new storage node
peerID, err := endpoint.peerIdentities.Get(ctx, receivingNodeID)
if err != nil {
Expand All @@ -514,15 +514,15 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStr
if err != nil {
return Error.Wrap(err)
}
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, exitingNodeID, transfer.path, transfer.pieceNum)
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, exitingNodeID, transfer.Path, transfer.PieceNum)
if err != nil {
return Error.Wrap(err)
}

err = endpoint.updatePointer(ctx, transfer.originalPointer, exitingNodeID, receivingNodeID, string(transfer.path), transfer.pieceNum, transferQueueItem.RootPieceID)
err = endpoint.updatePointer(ctx, transfer.OriginalPointer, exitingNodeID, receivingNodeID, string(transfer.Path), transfer.PieceNum, transferQueueItem.RootPieceID)
if err != nil {
// remove the piece from the pending queue so it gets retried
pending.delete(originalPieceID)
pending.Delete(originalPieceID)

return Error.Wrap(err)
}
Expand All @@ -532,17 +532,17 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStr
failed = -1
}

err = endpoint.db.IncrementProgress(ctx, exitingNodeID, transfer.pieceSize, 1, failed)
err = endpoint.db.IncrementProgress(ctx, exitingNodeID, transfer.PieceSize, 1, failed)
if err != nil {
return Error.Wrap(err)
}

err = endpoint.db.DeleteTransferQueueItem(ctx, exitingNodeID, transfer.path, transfer.pieceNum)
err = endpoint.db.DeleteTransferQueueItem(ctx, exitingNodeID, transfer.Path, transfer.PieceNum)
if err != nil {
return Error.Wrap(err)
}

pending.delete(originalPieceID)
pending.Delete(originalPieceID)

deleteMsg := &pb.SatelliteMessage{
Message: &pb.SatelliteMessage_DeletePiece{
Expand All @@ -561,19 +561,19 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStr
return nil
}

func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap, nodeID storj.NodeID, message *pb.StorageNodeMessage_Failed) (err error) {
func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap, nodeID storj.NodeID, message *pb.StorageNodeMessage_Failed) (err error) {
defer mon.Task()(&ctx)(&err)
endpoint.log.Warn("transfer failed", zap.Stringer("Piece ID", message.Failed.OriginalPieceId), zap.Stringer("transfer error", message.Failed.GetError()))
mon.Meter("graceful_exit_transfer_piece_fail").Mark(1) //locked

pieceID := message.Failed.OriginalPieceId
transfer, ok := pending.get(pieceID)
transfer, ok := pending.Get(pieceID)
if !ok {
endpoint.log.Debug("could not find transfer message in pending queue. skipping.", zap.Stringer("Piece ID", pieceID))

// TODO we should probably error out here so we don't get stuck in a loop with a SN that is not behaving properl
}
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, nodeID, transfer.path, transfer.pieceNum)
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, nodeID, transfer.Path, transfer.PieceNum)
if err != nil {
return Error.Wrap(err)
}
Expand All @@ -589,38 +589,38 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap,
// Remove the queue item and remove the node from the pointer.
// If the pointer is not piece hash verified, do not count this as a failure.
if pb.TransferFailed_Error(errorCode) == pb.TransferFailed_NOT_FOUND {
endpoint.log.Debug("piece not found on node", zap.Stringer("node ID", nodeID), zap.ByteString("path", transfer.path), zap.Int32("piece num", transfer.pieceNum))
pointer, err := endpoint.metainfo.Get(ctx, string(transfer.path))
endpoint.log.Debug("piece not found on node", zap.Stringer("node ID", nodeID), zap.ByteString("path", transfer.Path), zap.Int32("piece num", transfer.PieceNum))
pointer, err := endpoint.metainfo.Get(ctx, string(transfer.Path))
if err != nil {
return Error.Wrap(err)
}
remote := pointer.GetRemote()
if remote == nil {
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.path, transfer.pieceNum)
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Path, transfer.PieceNum)
if err != nil {
return Error.Wrap(err)
}
pending.delete(pieceID)
pending.Delete(pieceID)
return nil
}
pieces := remote.GetRemotePieces()

var nodePiece *pb.RemotePiece
for _, piece := range pieces {
if piece.NodeId == nodeID && piece.PieceNum == transfer.pieceNum {
if piece.NodeId == nodeID && piece.PieceNum == transfer.PieceNum {
nodePiece = piece
}
}
if nodePiece == nil {
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.path, transfer.pieceNum)
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Path, transfer.PieceNum)
if err != nil {
return Error.Wrap(err)
}
pending.delete(pieceID)
pending.Delete(pieceID)
return nil
}

_, err = endpoint.metainfo.UpdatePieces(ctx, string(transfer.path), pointer, nil, []*pb.RemotePiece{nodePiece})
_, err = endpoint.metainfo.UpdatePieces(ctx, string(transfer.Path), pointer, nil, []*pb.RemotePiece{nodePiece})
if err != nil {
return Error.Wrap(err)
}
Expand All @@ -634,11 +634,11 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap,
}
}

err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.path, transfer.pieceNum)
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Path, transfer.PieceNum)
if err != nil {
return Error.Wrap(err)
}
pending.delete(pieceID)
pending.Delete(pieceID)

return nil
}
Expand All @@ -659,7 +659,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap,
}
}

pending.delete(pieceID)
pending.Delete(pieceID)

return nil
}
Expand Down
25 changes: 15 additions & 10 deletions satellite/gracefulexit/pending.go
Expand Up @@ -12,11 +12,11 @@ import (
)

type PendingTransfer struct {
path []byte
pieceSize int64
satelliteMessage *pb.SatelliteMessage
originalPointer *pb.Pointer
pieceNum int32
Path []byte
PieceSize int64
SatelliteMessage *pb.SatelliteMessage
OriginalPointer *pb.Pointer
PieceNum int32
}

// PendingMap for managing concurrent access to the pending transfer map.
Expand All @@ -34,11 +34,16 @@ func NewPendingMap() *PendingMap {
}

// put adds to the map.
func (pm *PendingMap) Put(pieceID storj.PieceID, PendingTransfer *PendingTransfer) {
func (pm *PendingMap) Put(pieceID storj.PieceID, pendingTransfer *PendingTransfer) error {
pm.mu.Lock()
defer pm.mu.Unlock()

pm.data[pieceID] = PendingTransfer
if _, ok := pm.data[pieceID]; ok {
return Error.New("piece ID already exists in pending map")
}

pm.data[pieceID] = pendingTransfer
return nil
}

// get returns the pending transfer item from the map, if it exists.
Expand Down Expand Up @@ -71,17 +76,17 @@ func (pm *PendingMap) Delete(pieceID storj.PieceID) error {
}

// IsFinished determines whether the work is finished, and blocks if needed.
func (pm *PendingMap) IsFinished(ctx context.Context) (bool, error) {
func (pm *PendingMap) IsFinished(ctx context.Context) bool {
pm.mu.Lock()
defer pm.mu.Unlock()

if len(pm.data) > 0 {
return false, nil
return false
}

// concurrently wait for finish or more work
// if finish happens first, return true. Otherwise return false.
return false, err
return false
}

// Finish is called when no more work will be added to the map.
Expand Down
33 changes: 19 additions & 14 deletions satellite/gracefulexit/pending_test.go
Expand Up @@ -4,6 +4,7 @@
package gracefulexit_test

import (
"bytes"
"context"
"testing"
"time"
Expand All @@ -22,12 +23,12 @@ func TestPendingBasic(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()

newWork := gracefulexit.PendingTransfer{
path: []byte("testbucket/testfile"),
pieceSize: 10,
satelliteMessage: &pb.SatelliteMessage{},
originalPointer: &pb.Pointer{},
pieceNum: 1,
newWork := &gracefulexit.PendingTransfer{
Path: []byte("testbucket/testfile"),
PieceSize: 10,
SatelliteMessage: &pb.SatelliteMessage{},
OriginalPointer: &pb.Pointer{},
PieceNum: 1,
}

pieceID := testrand.PieceID()
Expand All @@ -39,13 +40,13 @@ func TestPendingBasic(t *testing.T) {
require.NoError(t, err)

// put should return an error if the item already exists
err := pending.Put(pieceID, newWork)
err = pending.Put(pieceID, newWork)
require.Error(t, err)

// get should work
w, ok := pending.Get(pieceID)
require.True(t, ok)
require.Equal(newWork.path, w.path)
require.True(t, bytes.Equal(newWork.Path, w.Path))

invalidPieceID := testrand.PieceID()
_, ok = pending.Get(invalidPieceID)
Expand Down Expand Up @@ -91,12 +92,12 @@ func TestPendingIsFinishedWorkAdded(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()

newWork := gracefulexit.PendingTransfer{
path: []byte("testbucket/testfile"),
pieceSize: 10,
satelliteMessage: &pb.SatelliteMessage{},
originalPointer: &pb.Pointer{},
pieceNum: 1,
newWork := &gracefulexit.PendingTransfer{
Path: []byte("testbucket/testfile"),
PieceSize: 10,
SatelliteMessage: &pb.SatelliteMessage{},
OriginalPointer: &pb.Pointer{},
PieceNum: 1,
}
pieceID := testrand.PieceID()
pending := gracefulexit.NewPendingMap()
Expand Down Expand Up @@ -135,6 +136,8 @@ func TestPendingIsFinishedFinishedCalled(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()

pending := gracefulexit.NewPendingMap()

fence := sync2.Fence{}
var group errgroup.Group
group.Go(func() error {
Expand All @@ -161,6 +164,8 @@ func TestPendingIsFinishedCtxCanceled(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()

pending := gracefulexit.NewPendingMap()

ctx2, cancel := context.WithCancel(ctx)
fence := sync2.Fence{}
var group errgroup.Group
Expand Down

0 comments on commit b507274

Please sign in to comment.