diff --git a/satellite/nodeselection/config.go b/satellite/nodeselection/config.go new file mode 100644 index 000000000000..463ebb05dfca --- /dev/null +++ b/satellite/nodeselection/config.go @@ -0,0 +1,215 @@ +// Copyright (C) 2024 Storj Labs, Inc. +// See LICENSE for copying information. + +package nodeselection + +import ( + "bytes" + "os" + "strings" + + "github.com/jtolio/mito" + "github.com/zeebo/errs" + "gopkg.in/yaml.v3" + + "storj.io/common/storj" +) + +// placementConfig is the representation of YAML based placement configuration. +type placementConfig struct { + + // helpers which can be re-used later to simplify config + Templates map[string]string + + // the placement definitions + Placements []placementDefinition +} + +type placementDefinition struct { + ID storj.PlacementConstraint + Name string + Filter string + Invariant string + Selector string +} + +// LoadConfig loads the placement yaml file and creates the Placement definitions. +func LoadConfig(configFile string) (PlacementDefinitions, error) { + placements := make(PlacementDefinitions) + + cfg := &placementConfig{} + raw, err := os.ReadFile(configFile) + if err != nil { + return placements, errs.New("Couldn't load placement config from file %s: %v", configFile, err) + } + err = yaml.Unmarshal(raw, &cfg) + if err != nil { + return placements, errs.New("Couldn't parse placement config as YAML from file %s: %v", configFile, err) + } + + templates := map[string]string{} + for k, v := range cfg.Templates { + value := v + for a, b := range cfg.Templates { + value = strings.ReplaceAll(value, "$"+a, b) + } + templates[k] = value + } + + resolveTemplates := func(orig string) string { + val := orig + for k, v := range templates { + val = strings.ReplaceAll(val, "$"+k, v) + } + return val + } + + for _, def := range cfg.Placements { + p := Placement{ + ID: def.ID, + Name: def.Name, + } + + filter := resolveTemplates(def.Filter) + p.NodeFilter, err = filterFromString(filter) + if err != nil { + return placements, errs.New("Filter definition '%s' of placement %d is invalid: %v", filter, def.ID, err) + } + + invariant := resolveTemplates(def.Invariant) + p.Invariant, err = invariantFromString(invariant) + if err != nil { + return placements, errs.New("Invariant definition '%s' of placement %d is invalid: %v", invariant, def.ID, err) + } + + selector := resolveTemplates(def.Selector) + p.Selector, err = selectorFromString(selector) + if err != nil { + return placements, errs.New("Selector definition '%s' of placement %d is invalid: %v", selector, def.ID, err) + } + + placements[def.ID] = p + } + return placements, nil +} + +func filterFromString(expr string) (NodeFilter, error) { + if expr == "" { + expr = "all()" + } + env := map[any]any{ + "country": func(countries ...string) (NodeFilter, error) { + return NewCountryFilterFromString(countries) + }, + "all": func(filters ...NodeFilter) (NodeFilters, error) { + res := NodeFilters{} + for _, filter := range filters { + res = append(res, filter) + } + return res, nil + }, + mito.OpAnd: func(env map[any]any, a, b any) (any, error) { + filter1, ok1 := a.(NodeFilter) + filter2, ok2 := b.(NodeFilter) + if !ok1 || !ok2 { + return nil, ErrPlacement.New("&& is supported only between NodeFilter instances") + } + res := NodeFilters{filter1, filter2} + return res, nil + }, + mito.OpOr: func(env map[any]any, a, b any) (any, error) { + filter1, ok1 := a.(NodeFilter) + filter2, ok2 := b.(NodeFilter) + if !ok1 || !ok2 { + return nil, errs.New("OR is supported only between NodeFilter instances") + } + return OrFilter{filter1, filter2}, nil + }, + "tag": func(nodeIDstr string, key string, value any) (NodeFilters, error) { + nodeID, err := storj.NodeIDFromString(nodeIDstr) + if err != nil { + return nil, err + } + + var rawValue []byte + match := bytes.Equal + switch v := value.(type) { + case string: + rawValue = []byte(v) + case []byte: + rawValue = v + case stringNotMatch: + match = func(a, b []byte) bool { + return !bytes.Equal(a, b) + } + rawValue = []byte(v) + default: + return nil, ErrPlacement.New("3rd argument of tag() should be string or []byte") + } + res := NodeFilters{ + NewTagFilter(nodeID, key, rawValue, match), + } + return res, nil + }, + "exclude": func(filter NodeFilter) (NodeFilter, error) { + return NewExcludeFilter(filter), nil + }, + "empty": func() string { + return "" + }, + "notEmpty": func() any { + return stringNotMatch("") + }, + } + filter, err := mito.Eval(expr, env) + if err != nil { + return nil, errs.New("Invalid filter definition '%s', %v", expr, err) + } + return filter.(NodeFilter), nil +} + +func selectorFromString(expr string) (NodeSelectorInit, error) { + if expr == "" { + expr = "random()" + } + env := map[any]any{ + "attribute": func(attribute string) (NodeSelectorInit, error) { + attr, err := CreateNodeAttribute(attribute) + if err != nil { + return nil, err + } + return AttributeGroupSelector(attr), nil + }, + "random": func() (NodeSelectorInit, error) { + return RandomSelector(), nil + }, + "unvetted": func(newNodeRatio float64, def NodeSelectorInit) (NodeSelectorInit, error) { + return UnvettedSelector(newNodeRatio, def), nil + }, + } + selector, err := mito.Eval(expr, env) + if err != nil { + return nil, errs.New("Invalid selector definition '%s', %v", expr, err) + } + return selector.(NodeSelectorInit), nil +} + +func invariantFromString(expr string) (Invariant, error) { + if expr == "" { + return AllGood(), nil + } + env := map[any]any{ + "maxcontrol": func(attribute string, max int64) (Invariant, error) { + attr, err := CreateNodeAttribute(attribute) + if err != nil { + return nil, err + } + return ClumpingByAttribute(attr, int(max)), nil + }, + } + filter, err := mito.Eval(expr, env) + if err != nil { + return nil, errs.New("Invalid invariant definition '%s', %v", expr, err) + } + return filter.(Invariant), nil +} diff --git a/satellite/nodeselection/config_test.go b/satellite/nodeselection/config_test.go new file mode 100644 index 000000000000..2ab04a862fe5 --- /dev/null +++ b/satellite/nodeselection/config_test.go @@ -0,0 +1,89 @@ +// Copyright (C) 2024 Storj Labs, Inc. +// See LICENSE for copying information. + +package nodeselection + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "storj.io/common/identity/testidentity" + "storj.io/common/storj" + "storj.io/common/storj/location" + "storj.io/storj/satellite/metabase" +) + +func TestParsedConfig(t *testing.T) { + config, err := LoadConfig("config_test.yaml") + require.NoError(t, err) + require.Len(t, config, 2) + + { + // checking filters + require.True(t, config[1].NodeFilter.Match(&SelectedNode{ + CountryCode: location.Germany, + })) + require.False(t, config[1].NodeFilter.Match(&SelectedNode{ + CountryCode: location.Russia, + })) + require.Equal(t, "eu-1", config[1].Name) + } + + { + // checking one invariant + node := func(ix int, owner string) SelectedNode { + return SelectedNode{ + ID: testidentity.MustPregeneratedSignedIdentity(ix, storj.LatestIDVersion()).ID, + Tags: NodeTags{ + { + Name: "owner", + Value: []byte(owner), + }, + }, + } + } + + piece := func(ix int, nodeIx int) metabase.Piece { + return metabase.Piece{ + Number: uint16(ix), StorageNode: testidentity.MustPregeneratedSignedIdentity(nodeIx, storj.LatestIDVersion()).ID, + } + } + + result := config[0].Invariant( + metabase.Pieces{ + piece(1, 1), + piece(3, 2), + piece(5, 3), + piece(9, 4), + piece(10, 5), + piece(11, 6), + }, + []SelectedNode{ + node(1, "dery"), + node(2, "blathy"), + node(3, "blathy"), + node(4, "zipernowsky"), + node(5, "zipernowsky"), + node(6, "zipernowsky"), + }) + + // last zipernowsky is too much, as we allow only 2 + require.Equal(t, 1, result.Count()) + } + + { + // checking a selector + selected, err := config[0].Selector([]*SelectedNode{ + { + Vetted: false, + }, + }, nil)(1, nil) + + // having: new, requires: 0% unvetted = 100% vetted + require.Len(t, selected, 0) + require.NoError(t, err) + + } + +} diff --git a/satellite/nodeselection/config_test.yaml b/satellite/nodeselection/config_test.yaml new file mode 100644 index 000000000000..ad2483dd64b8 --- /dev/null +++ b/satellite/nodeselection/config_test.yaml @@ -0,0 +1,14 @@ +templates: + SIGNER_ZERO: 1111111111111111111111111111111VyS547o + NORMAL: exclude(tag("$SIGNER_ZERO","soc2","true")) && exclude(tag("$SIGNER_ZERO","datacenter","true")) +placements: + - id: 0 + name: global + filter: $NORMAL + invariant: maxcontrol("tag:owner",2) + selector: unvetted(0.0,random()) + - id: 1 + name: eu-1 + filter: country("EU") && $NORMAL + invariant: maxcontrol("last_net",1) + selector: attribute("last_net") diff --git a/satellite/nodeselection/placement.go b/satellite/nodeselection/placement.go index 0fd0f5153f3d..5bb69d62bc8b 100644 --- a/satellite/nodeselection/placement.go +++ b/satellite/nodeselection/placement.go @@ -104,6 +104,11 @@ func (c ConfigurablePlacementRule) Parse(defaultPlacement func() (Placement, err } rules := c.PlacementRules if _, err := os.Stat(rules); err == nil { + if strings.HasSuffix(rules, ".yaml") { + // new style of config, all others are deprecated + return LoadConfig(rules) + + } ruleBytes, err := os.ReadFile(rules) if err != nil { return nil, ErrPlacement.New("Placement definition file couldn't be read: %s %v", rules, err) @@ -193,6 +198,7 @@ func (d PlacementDefinitions) AddPlacementRule(id storj.PlacementConstraint, fil type stringNotMatch string // AddPlacementFromString parses placement definition form string representations from id:definition;id:definition;... +// Deprecated: we will switch to the YAML based configuration. func (d PlacementDefinitions) AddPlacementFromString(definitions string) error { env := map[any]any{ "country": func(countries ...string) (NodeFilter, error) { diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 86ab6380c6e8..00d7485844a9 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -10,6 +10,8 @@ import ( "io" "math" "net" + "os" + "path/filepath" "testing" "time" @@ -18,6 +20,8 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zaptest" + "storj.io/common/identity" + "storj.io/common/identity/testidentity" "storj.io/common/memory" "storj.io/common/pb" "storj.io/common/rpc" @@ -30,6 +34,7 @@ import ( "storj.io/storj/satellite" "storj.io/storj/satellite/accounting" "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/nodeselection" "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/repair/checker" "storj.io/storj/satellite/repair/queue" @@ -3335,3 +3340,123 @@ func TestRepairClumpedPieces(t *testing.T) { "%v should only include one of %s or %s", segment.Pieces, node0.ID(), node1.ID()) }) } + +func TestRepairClumpedPiecesBasedOnTags(t *testing.T) { + signer := testidentity.MustPregeneratedIdentity(50, storj.LatestIDVersion()) + tempDir := t.TempDir() + pc := identity.PeerConfig{ + CertPath: filepath.Join(tempDir, "identity.cert"), + } + require.NoError(t, pc.Save(signer.PeerIdentity())) + + placementConfig := fmt.Sprintf(` +placements: +- id: 0 + name: default + invariant: maxcontrol("tag:%s/datacenter", 2)`, signer.ID.String()) + + placementConfigPath := filepath.Join(tempDir, "placement.yaml") + require.NoError(t, os.WriteFile(placementConfigPath, []byte(placementConfig), 0755)) + + // Test that if nodes change IPs such that multiple pieces of a segment + // reside in the same network, that segment will be considered unhealthy + // by the repair checker and it will be repaired by the repair worker. + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 6, + UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + testplanet.ReconfigureRS(2, 3, 4, 4), + func(log *zap.Logger, index int, config *satellite.Config) { + config.Checker.DoDeclumping = true + config.Repairer.DoDeclumping = true + config.Placement.PlacementRules = placementConfigPath + config.TagAuthorities = pc.CertPath + }, + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + uplinkPeer := planet.Uplinks[0] + satellite := planet.Satellites[0] + // stop audit to prevent possible interactions i.e. repair timeout problems + satellite.Audit.Worker.Loop.Pause() + + satellite.RangedLoop.RangedLoop.Service.Loop.Stop() + satellite.Repair.Repairer.Loop.Pause() + + var testData = testrand.Bytes(8 * memory.KiB) + // first, upload some remote data + err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData) + require.NoError(t, err) + + segment, _ := getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, "testbucket") + remotePiecesBefore := segment.Pieces + + // that segment should be ignored by repair checker for now + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) + + injuredSegment, err := satellite.DB.RepairQueue().Select(ctx, nil, nil) + require.Error(t, err) + if !queue.ErrEmpty.Has(err) { + require.FailNow(t, "Should get ErrEmptyQueue, but got", err) + } + require.Nil(t, injuredSegment) + + // pieces list has not changed + segment, _ = getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, "testbucket") + remotePiecesAfter := segment.Pieces + require.Equal(t, remotePiecesBefore, remotePiecesAfter) + + // now move the network of one storage node holding a piece, so that it's the same as another + require.NoError(t, satellite.DB.OverlayCache().UpdateNodeTags(ctx, []nodeselection.NodeTag{ + { + NodeID: planet.FindNode(remotePiecesAfter[0].StorageNode).ID(), + SignedAt: time.Now(), + Name: "datacenter", + Value: []byte("dc1"), + Signer: signer.ID, + }, + { + NodeID: planet.FindNode(remotePiecesAfter[1].StorageNode).ID(), + SignedAt: time.Now(), + Name: "datacenter", + Value: []byte("dc1"), + Signer: signer.ID, + }, + { + NodeID: planet.FindNode(remotePiecesAfter[2].StorageNode).ID(), + SignedAt: time.Now(), + Name: "datacenter", + Value: []byte("dc1"), + Signer: signer.ID, + }, + })) + + require.NoError(t, satellite.RangedLoop.Repair.Observer.RefreshReliabilityCache(ctx)) + + // running repair checker again should put the segment into the repair queue + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) + + // and subsequently running the repair worker should pull that off the queue and repair it + satellite.Repair.Repairer.Loop.TriggerWait() + satellite.Repair.Repairer.WaitForPendingRepairs() + + // confirm that the segment now has exactly one piece on (node0 or node1) + // and still has the right number of pieces. + segment, _ = getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, "testbucket") + require.Len(t, segment.Pieces, 4) + foundOnDC1 := 0 + for _, piece := range segment.Pieces { + for i := 0; i < 3; i++ { + if piece.StorageNode.Compare(remotePiecesAfter[i].StorageNode) == 0 { + foundOnDC1++ + } + } + } + require.Equalf(t, 2, foundOnDC1, + "%v should be moved out from at least one node", segment.Pieces) + }) +}