Skip to content

Commit

Permalink
storagenode/piecestore: add bandwidth only when settling orders
Browse files Browse the repository at this point in the history
Change-Id: Id032f75189d9206ac79db8ef7018d9911b4727c8
  • Loading branch information
liori authored and jtolio committed May 7, 2024
1 parent 860bbdd commit 4d93a3c
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 12 deletions.
2 changes: 2 additions & 0 deletions storagenode/inspector/inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func TestInspectorStats(t *testing.T) {

var downloaded int
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Orders.SendOrders(ctx, time.Now().Add(24*time.Hour))

response, err := storageNode.Storage2.Inspector.Stats(ctx, &internalpb.StatsRequest{})
require.NoError(t, err)

Expand Down
19 changes: 18 additions & 1 deletion storagenode/orders/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"storj.io/common/context2"
"storj.io/common/pb"
"storj.io/common/process"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/storagenode/orders/ordersfile"
"storj.io/storj/storagenode/trust"
)
Expand Down Expand Up @@ -98,19 +100,21 @@ type Service struct {
dialer rpc.Dialer
ordersStore *FileStore
orders DB
bandwidth bandwidth.DB
trust *trust.Pool

Sender *sync2.Cycle
Cleanup *sync2.Cycle
}

// NewService creates an order service.
func NewService(log *zap.Logger, dialer rpc.Dialer, ordersStore *FileStore, orders DB, trust *trust.Pool, config Config) *Service {
func NewService(log *zap.Logger, dialer rpc.Dialer, ordersStore *FileStore, orders DB, bandwidth bandwidth.DB, trust *trust.Pool, config Config) *Service {
return &Service{
log: log,
dialer: dialer,
ordersStore: ordersStore,
orders: orders,
bandwidth: bandwidth,
config: config,
trust: trust,

Expand Down Expand Up @@ -235,6 +239,19 @@ func (service *Service) SendOrders(ctx context.Context, now time.Time) {
log.Warn("skipping order settlement for untrusted satellite. Order will be archived", zap.String("satellite ID", satelliteID.String()))
}

amountsByActions := make(map[pb.PieceAction]int64)
for _, order := range unsentInfo.InfoList {
amountsByActions[order.Limit.Action] += order.Order.Amount
}
for action, amount := range amountsByActions {
// TODO liori: better time.
// TODO liori: might be imprecise, see b6026b9ff3b41d0d80e1cd1bae13f567856385ca
err := service.bandwidth.Add(context2.WithoutCancellation(ctx), satelliteID, action, amount, time.Now())
if err != nil {
service.log.Error("failed to add bandwidth usage", zap.String("satellite ID", satelliteID.String()), zap.String("action", action.String()), zap.Int64("amount", amount), zap.Error(err))
}
}

err = service.ordersStore.Archive(satelliteID, unsentInfo, time.Now().UTC(), status)
if err != nil {
log.Error("failed to archive orders", zap.Error(err))
Expand Down
1 change: 1 addition & 0 deletions storagenode/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
dialer,
peer.OrdersStore,
peer.DB.Orders(),
peer.DB.Bandwidth(),
peer.Storage2.Trust,
config.Storage2.Orders,
)
Expand Down
11 changes: 0 additions & 11 deletions storagenode/piecestore/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"golang.org/x/sync/errgroup"

"storj.io/common/bloomfilter"
"storj.io/common/context2"
"storj.io/common/errs2"
"storj.io/common/identity"
"storj.io/common/memory"
Expand Down Expand Up @@ -907,16 +906,6 @@ func (endpoint *Endpoint) beginSaveOrder(limit *pb.OrderLimit) (_commit func(ctx
err = commit(&ordersfile.Info{Limit: limit, Order: order})
if err != nil {
endpoint.log.Error("failed to add order", zap.Error(err))
} else {
amount := order.Amount
if amountFunc != nil {
amount = amountFunc()
}
// We always want to save order to the database to be able to settle.
err = endpoint.usage.Add(context2.WithoutCancellation(ctx), limit.SatelliteId, limit.Action, amount, time.Now())
if err != nil {
endpoint.log.Error("failed to add bandwidth usage", zap.Error(err))
}
}
}, nil
}
Expand Down
2 changes: 2 additions & 0 deletions storagenode/piecestore/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestUploadAndPartialDownload(t *testing.T) {

var totalBandwidthUsage bandwidth.Usage
for _, storagenode := range planet.StorageNodes {
storagenode.Storage2.Orders.SendOrders(ctx, time.Now().Add(24*time.Hour))
usage, err := storagenode.Bandwidth.Cache.Summary(ctx, time.Now().Add(-24*time.Hour), time.Now().Add(24*time.Hour))
require.NoError(t, err)
totalBandwidthUsage.Add(usage)
Expand Down Expand Up @@ -197,6 +198,7 @@ func TestUpload(t *testing.T) {
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))

from, to := date.MonthBoundary(time.Now().UTC())
planet.StorageNodes[0].Storage2.Orders.SendOrders(ctx, time.Now().Add(24*time.Hour))
summary, err := planet.StorageNodes[0].Bandwidth.Cache.SatelliteIngressSummary(ctx, planet.Satellites[0].ID(), from, to)
require.NoError(t, err)
require.Equal(t, expectedIngressAmount, summary.Put)
Expand Down

0 comments on commit 4d93a3c

Please sign in to comment.