Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storagenode: decline uploads when there are too many live requests #2397

Merged
merged 8 commits into from Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/testplanet/storagenode.go
Expand Up @@ -109,6 +109,8 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatelliteIDs []strin
StaticDir: filepath.Join(developmentRoot, "web/operator/"),
},
Storage2: piecestore.Config{
ExpirationGracePeriod: 0,
MaxConcurrentRequests: 100,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving testplanet to 100, because having this lower will probably break some tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth testing where the limit is, in another PR of course.

Sender: orders.SenderConfig{
Interval: time.Hour,
Timeout: time.Hour,
Expand Down
22 changes: 22 additions & 0 deletions storagenode/piecestore/endpoint.go
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"io"
"os"
"sync/atomic"
"time"

"github.com/golang/protobuf/ptypes"
Expand Down Expand Up @@ -55,6 +56,7 @@ type OldConfig struct {
// Config defines parameters for piecestore endpoint.
type Config struct {
ExpirationGracePeriod time.Duration `help:"how soon before expiration date should things be considered expired" default:"48h0m0s"`
MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected." default:"30"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would SNOs know what value to configure here?

Is there a way for the storage node to self-diagnose and determine if it is overloaded or not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably with some kind of benchmarking by some other party. The issue is that it's not just about the storage node itself, but also about the network it's able to deliver.

I guess we could try monitoring bandwidth usage and when it isn't increasing anymore or load or memory usage etc. But these all are much more complicated solutions than "this is how much this storage node can serve".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it’s totally fine to have a default, if someone notices a lot of issues he just can tune it down. We had the same mechanic in V2 and that worked fine.
About the default itself, I would recommend something in the neighborhood of 5-10 max.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not sure what to put as limits for now. Is there an easy way to test how much we can handle?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My Server Node was handling 15-20 Requests max per second, when fully cached.
The Pi 3B+ is already overwhelmed with 3 requests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could run a few test uploads on my local Network to see what i can come up with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the storagenodes could potentially obtain data from one or more satellites to help it decide how many requests it can handle and adjust the max.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a dynamic scaling that is just adding another layer of potential issues @phutchins
Lets make a good decision/average and update the docs accordingly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know when you get a number from testing @stefanbenten

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will test that after lunch 👍


Monitor monitor.Config
Sender orders.SenderConfig
Expand All @@ -74,6 +76,8 @@ type Endpoint struct {
orders orders.DB
usage bandwidth.DB
usedSerials UsedSerials

liveRequests int32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make this be one of the first struct fields, so arm alignment is better guaranteed (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't this an issue for int64 only?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah good point, maybe worth testing on an arm device

}

// NewEndpoint creates a new piecestore endpoint.
Expand All @@ -91,13 +95,18 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, moni
orders: orders,
usage: usage,
usedSerials: usedSerials,

liveRequests: 0,
}, nil
}

// Delete handles deleting a piece on piece store.
func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequest) (_ *pb.PieceDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)

atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveRequests, -1)

if delete.Limit.Action != pb.PieceAction_DELETE {
return nil, Error.New("expected delete action got %v", delete.Limit.Action) // TODO: report grpc status unauthorized or bad request
}
Expand Down Expand Up @@ -128,6 +137,15 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ
func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error) {
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)

liveRequests := atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveRequests, -1)

if int(liveRequests) > endpoint.config.MaxConcurrentRequests {
endpoint.log.Error("upload rejected, too many requests", zap.Int32("live requests", liveRequests))
return status.Error(codes.Unavailable, "storage node overloaded")
}

startTime := time.Now().UTC()

// TODO: set connection timeouts
Expand Down Expand Up @@ -321,6 +339,10 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error)
func (endpoint *Endpoint) Download(stream pb.Piecestore_DownloadServer) (err error) {
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)

atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveRequests, -1)

startTime := time.Now().UTC()

// TODO: set connection timeouts
Expand Down
119 changes: 117 additions & 2 deletions storagenode/piecestore/endpoint_test.go
Expand Up @@ -6,13 +6,20 @@ package piecestore_test
import (
"io"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"

"storj.io/storj/internal/errs2"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
Expand All @@ -21,6 +28,7 @@ import (
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/pkcrypto"
"storj.io/storj/pkg/storj"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/uplink/piecestore"
)
Expand Down Expand Up @@ -360,14 +368,122 @@ func TestDelete(t *testing.T) {
}
}

func TestTooManyRequests(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()

const uplinkCount = 6
const maxConcurrent = 3
const expectedFailures = uplinkCount - maxConcurrent

log := zaptest.NewLogger(t)

planet, err := testplanet.NewCustom(log, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: uplinkCount,
Reconfigure: testplanet.Reconfigure{
StorageNode: func(index int, config *storagenode.Config) {
config.Storage2.MaxConcurrentRequests = maxConcurrent
},
},
})
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)

planet.Start(ctx)

doneWaiting := make(chan struct{})
failedCount := int64(expectedFailures)

uploads, _ := errgroup.WithContext(ctx)
defer ctx.Check(uploads.Wait)

for i, uplink := range planet.Uplinks {
i, uplink := i, uplink
uploads.Go(func() (err error) {
storageNode := planet.StorageNodes[0].Local()
signer := signing.SignerFromFullIdentity(uplink.Transport.Identity())
config := piecestore.DefaultConfig
config.UploadBufferSize = 0 // disable buffering so we can detect write error early

client, err := piecestore.Dial(ctx, uplink.Transport, &storageNode.Node, uplink.Log, signer, config)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, client.Close()) }()

pieceID := storj.PieceID{byte(i + 1)}
serialNumber := testrand.SerialNumber()

orderLimit := GenerateOrderLimit(
t,
planet.Satellites[0].ID(),
uplink.ID(),
planet.StorageNodes[0].ID(),
pieceID,
pb.PieceAction_PUT,
serialNumber,
24*time.Hour,
24*time.Hour,
int64(10000),
)

satelliteSigner := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
orderLimit, err = signing.SignOrderLimit(ctx, satelliteSigner, orderLimit)
if err != nil {
return err
}

upload, err := client.Upload(ctx, orderLimit)
if err != nil {
if errs2.IsRPC(err, codes.Unavailable) {
if atomic.AddInt64(&failedCount, -1) == 0 {
close(doneWaiting)
}
return nil
}
uplink.Log.Error("upload failed", zap.Stringer("Piece ID", pieceID))
return err
}

_, err = upload.Write(make([]byte, orderLimit.Limit))
if err != nil {
if errs2.IsRPC(err, codes.Unavailable) {
if atomic.AddInt64(&failedCount, -1) == 0 {
close(doneWaiting)
}
return nil
}
uplink.Log.Error("write failed", zap.Stringer("Piece ID", pieceID))
return err
}

_, err = upload.Commit(ctx)
if err != nil {
if errs2.IsRPC(err, codes.Unavailable) {
if atomic.AddInt64(&failedCount, -1) == 0 {
close(doneWaiting)
}
return nil
}
uplink.Log.Error("commit failed", zap.Stringer("Piece ID", pieceID))
return err
}

return nil
})
}
}

func GenerateOrderLimit(t *testing.T, satellite storj.NodeID, uplink storj.NodeID, storageNode storj.NodeID, pieceID storj.PieceID,
action pb.PieceAction, serialNumber storj.SerialNumber, pieceExpiration, orderExpiration time.Duration, limit int64) *pb.OrderLimit2 {

pe, err := ptypes.TimestampProto(time.Now().Add(pieceExpiration))
require.NoError(t, err)

oe, err := ptypes.TimestampProto(time.Now().Add(orderExpiration))
require.NoError(t, err)
orderLimit := &pb.OrderLimit2{

return &pb.OrderLimit2{
SatelliteId: satellite,
UplinkId: uplink,
StorageNodeId: storageNode,
Expand All @@ -378,5 +494,4 @@ func GenerateOrderLimit(t *testing.T, satellite storj.NodeID, uplink storj.NodeI
PieceExpiration: pe,
Limit: limit,
}
return orderLimit
}