Skip to content

Commit

Permalink
satellite/orders: support overriding download long tail latency
Browse files Browse the repository at this point in the history
also moves logic from
common/storj's (RedundancyStrategy).DownloadNodes
to the Satellite.

See https://review.dev.storj.io/c/storj/common/+/12257

Change-Id: I441abb8d36d7daf2131ed31df63af195c9911dae
  • Loading branch information
jtolio authored and ihaid committed Jan 29, 2024
1 parent eaa3edc commit 59baf07
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 10 deletions.
2 changes: 1 addition & 1 deletion satellite/accounting/projectusage_test.go
Expand Up @@ -1164,7 +1164,7 @@ func TestProjectUsage_BandwidthDeadAllocation(t *testing.T) {
project.ID, now.Year(), now.Month(), now.Day(), 0)
require.NoError(t, err)

require.Equal(t, int64(segments[0].Redundancy.DownloadNodes())*pieceSize, bandwidthUsage)
require.Equal(t, int64(planet.Satellites[0].Orders.Service.DownloadNodes(segments[0].Redundancy))*pieceSize, bandwidthUsage)

initialBandwidthUsage := bandwidthUsage
var updatedBandwidthUsage int64
Expand Down
71 changes: 70 additions & 1 deletion satellite/orders/service.go
Expand Up @@ -5,7 +5,10 @@ package orders

import (
"context"
"fmt"
mathrand "math/rand"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -36,6 +39,8 @@ type Config struct {
FlushInterval time.Duration `help:"how often to flush the rollups write cache to the database" devDefault:"30s" releaseDefault:"1m" testDefault:"$TESTINTERVAL"`
NodeStatusLogging bool `hidden:"true" help:"deprecated, log the offline/disqualification status of nodes" default:"false" testDefault:"true"`
OrdersSemaphoreSize int `help:"how many concurrent orders to process at once. zero is unlimited" default:"2"`

DownloadTailToleranceOverrides string `help:"how many nodes should be used for downloads for certain k. must be >= k. if not specified, this is calculated from long tail tolerance. format is comma separated like k-d,k-d,k-d e.g. 29-35,3-5." default:""`
}

// Overlay defines the overlay dependency of orders.Service.
Expand Down Expand Up @@ -63,6 +68,8 @@ type Service struct {

orderExpiration time.Duration

downloadOverrides map[int16]int32

rngMu sync.Mutex
rng *mathrand.Rand
}
Expand All @@ -76,6 +83,11 @@ func NewService(
return nil, Error.New("encryption keys must be specified to include encrypted metadata")
}

downloadOverrides, err := parseDownloadOverrides(config.DownloadTailToleranceOverrides)
if err != nil {
return nil, err
}

return &Service{
log: log,
satellite: satellite,
Expand All @@ -87,6 +99,8 @@ func NewService(

orderExpiration: config.Expiration,

downloadOverrides: downloadOverrides,

rng: mathrand.New(mathrand.NewSource(time.Now().UnixNano())),
}, nil
}
Expand Down Expand Up @@ -126,6 +140,32 @@ func (service *Service) updateBandwidth(ctx context.Context, bucket metabase.Buc
return nil
}

// DownloadNodes calculates the number of nodes needed to download in the
// presence of node failure based on t = k + (n-o)k/o.
func (service *Service) DownloadNodes(scheme storj.RedundancyScheme) int32 {
if needed, found := service.downloadOverrides[scheme.RequiredShares]; found {
return needed
}

extra := int32(1)

if scheme.OptimalShares > 0 {
extra = int32(((scheme.TotalShares - scheme.OptimalShares) * scheme.RequiredShares) / scheme.OptimalShares)
if extra == 0 {
// ensure there is at least one extra node, so we can have error detection/correction
// N.B.: we actually need two for this, but the uplink doesn't make appropriate use of it (yet)
extra = 1
}
}

needed := int32(scheme.RequiredShares) + extra

if needed > int32(scheme.TotalShares) {
needed = int32(scheme.TotalShares)
}
return needed
}

// CreateGetOrderLimits creates the order limits for downloading the pieces of a segment.
func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, desiredNodes int32, overrideLimit int64) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) {
defer mon.Task()(&ctx)(&err)
Expand Down Expand Up @@ -158,7 +198,7 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}

neededLimits := segment.Redundancy.DownloadNodes()
neededLimits := service.DownloadNodes(segment.Redundancy)
if desiredNodes > neededLimits {
neededLimits = desiredNodes
}
Expand Down Expand Up @@ -598,3 +638,32 @@ func resolveStorageNode(node *pb.Node, lastIPPort string, resolveDNS bool) *pb.N
}
return node
}

func parseDownloadOverrides(val string) (map[int16]int32, error) {
rv := map[int16]int32{}
val = strings.TrimSpace(val)
if val != "" {
for _, entry := range strings.Split(val, ",") {
parts := strings.Split(entry, "-")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid download override value %q", val)
}
required, err := strconv.ParseInt(parts[0], 10, 16)
if err != nil {
return nil, fmt.Errorf("invalid download override value %q: %w", val, err)
}
download, err := strconv.ParseInt(parts[1], 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid download override value %q: %w", val, err)
}
if required > download {
return nil, fmt.Errorf("invalid download override value %q: required > download", val)
}
if _, found := rv[int16(required)]; found {
return nil, fmt.Errorf("invalid download override value %q: duplicate key", val)
}
rv[int16(required)] = int32(download)
}
}
return rv, nil
}
116 changes: 108 additions & 8 deletions satellite/orders/service_test.go
@@ -1,7 +1,7 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.

package orders_test
package orders

import (
"fmt"
Expand All @@ -10,6 +10,7 @@ import (

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"storj.io/common/identity/testidentity"
Expand All @@ -20,7 +21,6 @@ import (
"storj.io/common/testrand"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/orders"
)

func TestGetOrderLimits(t *testing.T) {
Expand Down Expand Up @@ -49,20 +49,20 @@ func TestGetOrderLimits(t *testing.T) {
require.NoError(t, err)
k := signing.SignerFromFullIdentity(testIdentity)

overlayService := orders.NewMockOverlayForOrders(ctrl)
overlayService := NewMockOverlayForOrders(ctrl)
overlayService.
EXPECT().
CachedGetOnlineNodesForGet(gomock.Any(), gomock.Any()).
Return(nodes, nil).AnyTimes()

service, err := orders.NewService(zaptest.NewLogger(t), k, overlayService, orders.NewNoopDB(),
service, err := NewService(zaptest.NewLogger(t), k, overlayService, NewNoopDB(),
func(constraint storj.PlacementConstraint) (filter nodeselection.NodeFilter) {
return nodeselection.AnyFilter{}
},
orders.Config{
EncryptionKeys: orders.EncryptionKeys{
Default: orders.EncryptionKey{
ID: orders.EncryptionKeyID{1, 2, 3, 4, 5, 6, 7, 8},
Config{
EncryptionKeys: EncryptionKeys{
Default: EncryptionKey{
ID: EncryptionKeyID{1, 2, 3, 4, 5, 6, 7, 8},
Key: testrand.Key(),
},
},
Expand Down Expand Up @@ -112,5 +112,105 @@ func TestGetOrderLimits(t *testing.T) {
t.Run("Request more than the replication", func(t *testing.T) {
checkExpectedLimits(1000, 8)
})
}

func TestDownloadNodes(t *testing.T) {
key, err := storj.NewKey([]byte("test-key"))
require.NoError(t, err)
encryptionKeys := EncryptionKeys{
Default: EncryptionKey{
ID: EncryptionKeyID{0, 1, 2, 3, 4, 5, 6, 7},
Key: *key,
},
}

service, err := NewService(zap.L(), nil, nil, nil, nil, Config{EncryptionKeys: encryptionKeys})
require.NoError(t, err)

for i, tt := range []struct {
k, m, o, n int16
needed int32
}{
{k: 0, m: 0, o: 0, n: 0, needed: 0},
{k: 1, m: 1, o: 1, n: 1, needed: 1},
{k: 1, m: 1, o: 2, n: 2, needed: 2},
{k: 1, m: 2, o: 2, n: 2, needed: 2},
{k: 2, m: 3, o: 4, n: 4, needed: 3},
{k: 2, m: 4, o: 6, n: 8, needed: 3},
{k: 20, m: 30, o: 40, n: 50, needed: 25},
{k: 29, m: 35, o: 80, n: 95, needed: 34},
} {
tag := fmt.Sprintf("#%d. %+v", i, tt)

rs := storj.RedundancyScheme{
RequiredShares: tt.k,
RepairShares: tt.m,
OptimalShares: tt.o,
TotalShares: tt.n,
}

require.Equal(t, tt.needed, service.DownloadNodes(rs), tag)
}

service, err = NewService(zap.L(), nil, nil, nil, nil, Config{
EncryptionKeys: encryptionKeys,
DownloadTailToleranceOverrides: "1-4,20-21",
})
require.NoError(t, err)

for i, tt := range []struct {
k, m, o, n int16
needed int32
}{
{k: 0, m: 0, o: 0, n: 0, needed: 0},
{k: 1, m: 1, o: 1, n: 1, needed: 4},
{k: 1, m: 1, o: 2, n: 2, needed: 4},
{k: 1, m: 2, o: 2, n: 2, needed: 4},
{k: 2, m: 3, o: 4, n: 4, needed: 3},
{k: 2, m: 4, o: 6, n: 8, needed: 3},
{k: 20, m: 30, o: 40, n: 50, needed: 21},
{k: 29, m: 35, o: 80, n: 95, needed: 34},
} {
tag := fmt.Sprintf("#%d. %+v", i, tt)

rs := storj.RedundancyScheme{
RequiredShares: tt.k,
RepairShares: tt.m,
OptimalShares: tt.o,
TotalShares: tt.n,
}

require.Equal(t, tt.needed, service.DownloadNodes(rs), tag)
}
}

func TestParseDownloadOverrides(t *testing.T) {
for i, tt := range []struct {
unparsed string
parsed map[int16]int32
success bool
}{
{"", map[int16]int32{}, true},
{" \n", map[int16]int32{}, true},
{"29-28", nil, false},
{"29-29", map[int16]int32{29: 35}, true},
{"29-35", map[int16]int32{29: 35}, true},
{"29-35,29-36", nil, false},
{"29-35,2-4", map[int16]int32{2: 4, 29: 35}, true},
{"29-35,2-4,7-9", map[int16]int32{2: 4, 29: 35, 7: 9}, true},
{"29-35,", nil, false},
{",29-35", nil, false},
} {
tag := fmt.Sprintf("#%d. %+v", i, tt)

actual, err := parseDownloadOverrides(tt.unparsed)
if !tt.success {
require.Error(t, err, tag)
continue
}
require.Equal(t, len(actual), len(tt.parsed), tag)
for k, v := range tt.parsed {
require.Equal(t, v, tt.parsed[k], tag)
}
}
}
3 changes: 3 additions & 0 deletions satellite/satellite-config.yaml.lock
Expand Up @@ -817,6 +817,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# max number of offline emails to send a node operator until the node comes back online
# offline-nodes.max-emails: 3

# how many nodes should be used for downloads for certain k. must be >= k. if not specified, this is calculated from long tail tolerance. format is comma separated like k-d,k-d,k-d e.g. 29-35,3-5.
# orders.download-tail-tolerance-overrides: ""

# encryption keys to encrypt info in orders
# orders.encryption-keys: ""

Expand Down

0 comments on commit 59baf07

Please sign in to comment.