Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storagenode: decline uploads when there are too many live requests #2397

Merged
merged 8 commits into from Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/testplanet/storagenode.go
Expand Up @@ -109,6 +109,8 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatelliteIDs []strin
StaticDir: filepath.Join(developmentRoot, "web/operator/"),
},
Storage2: piecestore.Config{
ExpirationGracePeriod: 0,
MaxConcurrentRequests: 100,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving testplanet to 100, because having this lower will probably break some tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth testing where the limit is, in another PR of course.

OrderLimitGracePeriod: time.Hour * 24,
Sender: orders.SenderConfig{
Interval: time.Hour,
Expand Down
22 changes: 22 additions & 0 deletions storagenode/piecestore/endpoint.go
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"io"
"os"
"sync/atomic"
"time"

"github.com/golang/protobuf/ptypes"
Expand Down Expand Up @@ -55,6 +56,7 @@ type OldConfig struct {
// Config defines parameters for piecestore endpoint.
type Config struct {
ExpirationGracePeriod time.Duration `help:"how soon before expiration date should things be considered expired" default:"48h0m0s"`
MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected." default:"6"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we want to define a devDefault and a releaseDefault?
to prevent storj-sim slowdowns?

OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"1h0m0s"`

Monitor monitor.Config
Expand All @@ -75,6 +77,8 @@ type Endpoint struct {
orders orders.DB
usage bandwidth.DB
usedSerials UsedSerials

liveRequests int32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make this be one of the first struct fields, so arm alignment is better guaranteed (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't this an issue for int64 only?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah good point, maybe worth testing on an arm device

}

// NewEndpoint creates a new piecestore endpoint.
Expand All @@ -92,13 +96,18 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, moni
orders: orders,
usage: usage,
usedSerials: usedSerials,

liveRequests: 0,
}, nil
}

// Delete handles deleting a piece on piece store.
func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequest) (_ *pb.PieceDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)

atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveRequests, -1)

if delete.Limit.Action != pb.PieceAction_DELETE {
return nil, Error.New("expected delete action got %v", delete.Limit.Action) // TODO: report grpc status unauthorized or bad request
}
Expand Down Expand Up @@ -129,6 +138,15 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ
func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error) {
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)

liveRequests := atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveRequests, -1)

if int(liveRequests) > endpoint.config.MaxConcurrentRequests {
endpoint.log.Error("upload rejected, too many requests", zap.Int32("live requests", liveRequests))
return status.Error(codes.Unavailable, "storage node overloaded")
}

startTime := time.Now().UTC()

// TODO: set connection timeouts
Expand Down Expand Up @@ -322,6 +340,10 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error)
func (endpoint *Endpoint) Download(stream pb.Piecestore_DownloadServer) (err error) {
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)

atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveRequests, -1)

startTime := time.Now().UTC()

// TODO: set connection timeouts
Expand Down
125 changes: 123 additions & 2 deletions storagenode/piecestore/endpoint_test.go
Expand Up @@ -6,13 +6,20 @@ package piecestore_test
import (
"io"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"

"storj.io/storj/internal/errs2"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
Expand All @@ -21,6 +28,7 @@ import (
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/pkcrypto"
"storj.io/storj/pkg/storj"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/uplink/piecestore"
)
Expand Down Expand Up @@ -361,15 +369,129 @@ func TestDelete(t *testing.T) {
}
}

func TestTooManyRequests(t *testing.T) {
t.Skip("flaky, because of EOF issues")

ctx := testcontext.New(t)
defer ctx.Cleanup()

const uplinkCount = 6
const maxConcurrent = 3
const expectedFailures = uplinkCount - maxConcurrent

log := zaptest.NewLogger(t)

planet, err := testplanet.NewCustom(log, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: uplinkCount,
Reconfigure: testplanet.Reconfigure{
StorageNode: func(index int, config *storagenode.Config) {
config.Storage2.MaxConcurrentRequests = maxConcurrent
},
},
})
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)

planet.Start(ctx)

doneWaiting := make(chan struct{})
failedCount := int64(expectedFailures)

uploads, _ := errgroup.WithContext(ctx)
defer ctx.Check(uploads.Wait)

for i, uplink := range planet.Uplinks {
i, uplink := i, uplink
uploads.Go(func() (err error) {
storageNode := planet.StorageNodes[0].Local()
signer := signing.SignerFromFullIdentity(uplink.Transport.Identity())
config := piecestore.DefaultConfig
config.UploadBufferSize = 0 // disable buffering so we can detect write error early

client, err := piecestore.Dial(ctx, uplink.Transport, &storageNode.Node, uplink.Log, signer, config)
if err != nil {
return err
}
defer func() {
if cerr := client.Close(); cerr != nil {
uplink.Log.Error("close failed", zap.Error(cerr))
err = errs.Combine(err, cerr)
}
}()

pieceID := storj.PieceID{byte(i + 1)}
serialNumber := testrand.SerialNumber()

orderLimit := GenerateOrderLimit(
t,
planet.Satellites[0].ID(),
uplink.ID(),
planet.StorageNodes[0].ID(),
pieceID,
pb.PieceAction_PUT,
serialNumber,
24*time.Hour,
24*time.Hour,
int64(10000),
)

satelliteSigner := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
orderLimit, err = signing.SignOrderLimit(ctx, satelliteSigner, orderLimit)
if err != nil {
return err
}

upload, err := client.Upload(ctx, orderLimit)
if err != nil {
if errs2.IsRPC(err, codes.Unavailable) {
if atomic.AddInt64(&failedCount, -1) == 0 {
close(doneWaiting)
}
return nil
}
uplink.Log.Error("upload failed", zap.Stringer("Piece ID", pieceID), zap.Error(err))
return err
}

_, err = upload.Write(make([]byte, orderLimit.Limit))
if err != nil {
if errs2.IsRPC(err, codes.Unavailable) {
if atomic.AddInt64(&failedCount, -1) == 0 {
close(doneWaiting)
}
return nil
}
uplink.Log.Error("write failed", zap.Stringer("Piece ID", pieceID), zap.Error(err))
return err
}

_, err = upload.Commit(ctx)
if err != nil {
if errs2.IsRPC(err, codes.Unavailable) {
if atomic.AddInt64(&failedCount, -1) == 0 {
close(doneWaiting)
}
return nil
}
uplink.Log.Error("commit failed", zap.Stringer("Piece ID", pieceID), zap.Error(err))
return err
}

return nil
})
}
}

func GenerateOrderLimit(t *testing.T, satellite storj.NodeID, uplink storj.NodeID, storageNode storj.NodeID, pieceID storj.PieceID,
action pb.PieceAction, serialNumber storj.SerialNumber, pieceExpiration, orderExpiration time.Duration, limit int64) *pb.OrderLimit {

pe, err := ptypes.TimestampProto(time.Now().Add(pieceExpiration))
require.NoError(t, err)

oe, err := ptypes.TimestampProto(time.Now().Add(orderExpiration))
require.NoError(t, err)

orderLimit := &pb.OrderLimit{
return &pb.OrderLimit{
SatelliteId: satellite,
UplinkId: uplink,
StorageNodeId: storageNode,
Expand All @@ -381,5 +503,4 @@ func GenerateOrderLimit(t *testing.T, satellite storj.NodeID, uplink storj.NodeI
PieceExpiration: pe,
Limit: limit,
}
return orderLimit
}