Skip to content

Commit

Permalink
storagenode/piecestore: fix ingress graph skewed by larger signed orders
Browse files Browse the repository at this point in the history
Storagenodes are currently getting larger signed orders due to
a performance optimization in uplink, which now messes with the
ingress graph because the storagenode plots the graph using
the order amount instead of actually uploaded bytes, which this
change fixes.

The egress graph might have a similar issue if the order amount
is larger than the actually downloaded bytes but since we pay
for orders, whether fulfilled or unfulfilled, we continue using
the order amount for the egress graph.

Resolves #5853

Change-Id: I2af7ee3ff249801ce07714bba055370ebd597c6e
  • Loading branch information
profclems committed Jun 20, 2023
1 parent bbdeb1e commit b6026b9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
23 changes: 18 additions & 5 deletions storagenode/piecestore/endpoint.go
Expand Up @@ -399,7 +399,9 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
return rpcstatus.Wrap(rpcstatus.InvalidArgument, err)
}
largestOrder := pb.Order{}
defer commitOrderToStore(ctx, &largestOrder)
defer commitOrderToStore(ctx, &largestOrder, func() int64 {
return pieceWriter.Size()
})

// monitor speed of upload client to flag out slow uploads.
speedEstimate := speedEstimation{
Expand Down Expand Up @@ -730,7 +732,14 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
if err != nil {
return err
}
defer commitOrderToStore(ctx, &largestOrder)
defer func() {
order := &largestOrder
commitOrderToStore(ctx, order, func() int64 {
// for downloads, we store the order amount for the egress graph instead
// of the bytes actually downloaded
return order.Amount
})
}()

// ensure that we always terminate sending goroutine
defer throttle.Fail(io.EOF)
Expand Down Expand Up @@ -823,7 +832,7 @@ func (endpoint *Endpoint) sendData(ctx context.Context, stream pb.DRPCPiecestore
}

// beginSaveOrder saves the order with all necessary information. It assumes it has been already verified.
func (endpoint *Endpoint) beginSaveOrder(limit *pb.OrderLimit) (_commit func(ctx context.Context, order *pb.Order), err error) {
func (endpoint *Endpoint) beginSaveOrder(limit *pb.OrderLimit) (_commit func(ctx context.Context, order *pb.Order, amountFunc func() int64), err error) {
defer mon.Task()(nil)(&err)

commit, err := endpoint.ordersStore.BeginEnqueue(limit.SatelliteId, limit.OrderCreation)
Expand All @@ -832,7 +841,7 @@ func (endpoint *Endpoint) beginSaveOrder(limit *pb.OrderLimit) (_commit func(ctx
}

done := false
return func(ctx context.Context, order *pb.Order) {
return func(ctx context.Context, order *pb.Order, amountFunc func() int64) {
if done {
return
}
Expand All @@ -851,8 +860,12 @@ func (endpoint *Endpoint) beginSaveOrder(limit *pb.OrderLimit) (_commit func(ctx
if err != nil {
endpoint.log.Error("failed to add order", zap.Error(err))
} else {
amount := order.Amount
if amountFunc != nil {
amount = amountFunc()
}
// We always want to save order to the database to be able to settle.
err = endpoint.usage.Add(context2.WithoutCancellation(ctx), limit.SatelliteId, limit.Action, order.Amount, time.Now())
err = endpoint.usage.Add(context2.WithoutCancellation(ctx), limit.SatelliteId, limit.Action, amount, time.Now())
if err != nil {
endpoint.log.Error("failed to add bandwidth usage", zap.Error(err))
}
Expand Down
10 changes: 10 additions & 0 deletions storagenode/piecestore/endpoint_test.go
Expand Up @@ -26,6 +26,7 @@ import (
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/date"
"storj.io/storj/private/testplanet"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/bandwidth"
Expand Down Expand Up @@ -116,6 +117,8 @@ func TestUpload(t *testing.T) {
require.NoError(t, err)
defer ctx.Check(client.Close)

var expectedIngressAmount int64

for _, tt := range []struct {
pieceID storj.PieceID
contentLength memory.Size
Expand Down Expand Up @@ -183,8 +186,15 @@ func TestUpload(t *testing.T) {

signee := signing.SignerFromFullIdentity(planet.StorageNodes[0].Identity)
require.NoError(t, signing.VerifyPieceHashSignature(ctx, signee, pieceHash))

expectedIngressAmount += int64(len(data)) // assuming all data is uploaded
}
}

from, to := date.MonthBoundary(time.Now().UTC())
summary, err := planet.StorageNodes[0].DB.Bandwidth().SatelliteIngressSummary(ctx, planet.Satellites[0].ID(), from, to)
require.NoError(t, err)
require.Equal(t, expectedIngressAmount, summary.Put)
})
}

Expand Down

2 comments on commit b6026b9

@storjrobot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Storj Community Forum (official). There might be relevant details there:

https://forum.storj.io/t/new-node-v1-80-10-ingress-does-not-corralate-to-disk-usage/23062/2

@storjrobot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Storj Community Forum (official). There might be relevant details there:

https://forum.storj.io/t/nodes-in-the-same-machine-with-different-ingress/23294/6

Please sign in to comment.