From 17af0f0c745a755c1d0dded686511acc930716b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Tue, 16 Jan 2024 17:07:34 +0100 Subject: [PATCH] satellite/nodeselection: YAML based placement configuration Existing placement configuration proven to be hard to read. And we also added more functionalities to Placement (like selectors or invariants). It's time to use external file, which is easier to read (and easier to maintain the same values for multiple microservice instances). Legacy configuration loading will be removed after 1-2 releases. Change-Id: I8ef4c001b60fb96f04a34eb3acc370349f620dd4 --- satellite/nodeselection/config.go | 215 +++++++++++++++++++++++ satellite/nodeselection/config_test.go | 89 ++++++++++ satellite/nodeselection/config_test.yaml | 14 ++ satellite/nodeselection/placement.go | 6 + satellite/repair/repair_test.go | 125 +++++++++++++ 5 files changed, 449 insertions(+) create mode 100644 satellite/nodeselection/config.go create mode 100644 satellite/nodeselection/config_test.go create mode 100644 satellite/nodeselection/config_test.yaml diff --git a/satellite/nodeselection/config.go b/satellite/nodeselection/config.go new file mode 100644 index 000000000000..463ebb05dfca --- /dev/null +++ b/satellite/nodeselection/config.go @@ -0,0 +1,215 @@ +// Copyright (C) 2024 Storj Labs, Inc. +// See LICENSE for copying information. + +package nodeselection + +import ( + "bytes" + "os" + "strings" + + "github.com/jtolio/mito" + "github.com/zeebo/errs" + "gopkg.in/yaml.v3" + + "storj.io/common/storj" +) + +// placementConfig is the representation of YAML based placement configuration. +type placementConfig struct { + + // helpers which can be re-used later to simplify config + Templates map[string]string + + // the placement definitions + Placements []placementDefinition +} + +type placementDefinition struct { + ID storj.PlacementConstraint + Name string + Filter string + Invariant string + Selector string +} + +// LoadConfig loads the placement yaml file and creates the Placement definitions. +func LoadConfig(configFile string) (PlacementDefinitions, error) { + placements := make(PlacementDefinitions) + + cfg := &placementConfig{} + raw, err := os.ReadFile(configFile) + if err != nil { + return placements, errs.New("Couldn't load placement config from file %s: %v", configFile, err) + } + err = yaml.Unmarshal(raw, &cfg) + if err != nil { + return placements, errs.New("Couldn't parse placement config as YAML from file %s: %v", configFile, err) + } + + templates := map[string]string{} + for k, v := range cfg.Templates { + value := v + for a, b := range cfg.Templates { + value = strings.ReplaceAll(value, "$"+a, b) + } + templates[k] = value + } + + resolveTemplates := func(orig string) string { + val := orig + for k, v := range templates { + val = strings.ReplaceAll(val, "$"+k, v) + } + return val + } + + for _, def := range cfg.Placements { + p := Placement{ + ID: def.ID, + Name: def.Name, + } + + filter := resolveTemplates(def.Filter) + p.NodeFilter, err = filterFromString(filter) + if err != nil { + return placements, errs.New("Filter definition '%s' of placement %d is invalid: %v", filter, def.ID, err) + } + + invariant := resolveTemplates(def.Invariant) + p.Invariant, err = invariantFromString(invariant) + if err != nil { + return placements, errs.New("Invariant definition '%s' of placement %d is invalid: %v", invariant, def.ID, err) + } + + selector := resolveTemplates(def.Selector) + p.Selector, err = selectorFromString(selector) + if err != nil { + return placements, errs.New("Selector definition '%s' of placement %d is invalid: %v", selector, def.ID, err) + } + + placements[def.ID] = p + } + return placements, nil +} + +func filterFromString(expr string) (NodeFilter, error) { + if expr == "" { + expr = "all()" + } + env := map[any]any{ + "country": func(countries ...string) (NodeFilter, error) { + return NewCountryFilterFromString(countries) + }, + "all": func(filters ...NodeFilter) (NodeFilters, error) { + res := NodeFilters{} + for _, filter := range filters { + res = append(res, filter) + } + return res, nil + }, + mito.OpAnd: func(env map[any]any, a, b any) (any, error) { + filter1, ok1 := a.(NodeFilter) + filter2, ok2 := b.(NodeFilter) + if !ok1 || !ok2 { + return nil, ErrPlacement.New("&& is supported only between NodeFilter instances") + } + res := NodeFilters{filter1, filter2} + return res, nil + }, + mito.OpOr: func(env map[any]any, a, b any) (any, error) { + filter1, ok1 := a.(NodeFilter) + filter2, ok2 := b.(NodeFilter) + if !ok1 || !ok2 { + return nil, errs.New("OR is supported only between NodeFilter instances") + } + return OrFilter{filter1, filter2}, nil + }, + "tag": func(nodeIDstr string, key string, value any) (NodeFilters, error) { + nodeID, err := storj.NodeIDFromString(nodeIDstr) + if err != nil { + return nil, err + } + + var rawValue []byte + match := bytes.Equal + switch v := value.(type) { + case string: + rawValue = []byte(v) + case []byte: + rawValue = v + case stringNotMatch: + match = func(a, b []byte) bool { + return !bytes.Equal(a, b) + } + rawValue = []byte(v) + default: + return nil, ErrPlacement.New("3rd argument of tag() should be string or []byte") + } + res := NodeFilters{ + NewTagFilter(nodeID, key, rawValue, match), + } + return res, nil + }, + "exclude": func(filter NodeFilter) (NodeFilter, error) { + return NewExcludeFilter(filter), nil + }, + "empty": func() string { + return "" + }, + "notEmpty": func() any { + return stringNotMatch("") + }, + } + filter, err := mito.Eval(expr, env) + if err != nil { + return nil, errs.New("Invalid filter definition '%s', %v", expr, err) + } + return filter.(NodeFilter), nil +} + +func selectorFromString(expr string) (NodeSelectorInit, error) { + if expr == "" { + expr = "random()" + } + env := map[any]any{ + "attribute": func(attribute string) (NodeSelectorInit, error) { + attr, err := CreateNodeAttribute(attribute) + if err != nil { + return nil, err + } + return AttributeGroupSelector(attr), nil + }, + "random": func() (NodeSelectorInit, error) { + return RandomSelector(), nil + }, + "unvetted": func(newNodeRatio float64, def NodeSelectorInit) (NodeSelectorInit, error) { + return UnvettedSelector(newNodeRatio, def), nil + }, + } + selector, err := mito.Eval(expr, env) + if err != nil { + return nil, errs.New("Invalid selector definition '%s', %v", expr, err) + } + return selector.(NodeSelectorInit), nil +} + +func invariantFromString(expr string) (Invariant, error) { + if expr == "" { + return AllGood(), nil + } + env := map[any]any{ + "maxcontrol": func(attribute string, max int64) (Invariant, error) { + attr, err := CreateNodeAttribute(attribute) + if err != nil { + return nil, err + } + return ClumpingByAttribute(attr, int(max)), nil + }, + } + filter, err := mito.Eval(expr, env) + if err != nil { + return nil, errs.New("Invalid invariant definition '%s', %v", expr, err) + } + return filter.(Invariant), nil +} diff --git a/satellite/nodeselection/config_test.go b/satellite/nodeselection/config_test.go new file mode 100644 index 000000000000..2ab04a862fe5 --- /dev/null +++ b/satellite/nodeselection/config_test.go @@ -0,0 +1,89 @@ +// Copyright (C) 2024 Storj Labs, Inc. +// See LICENSE for copying information. + +package nodeselection + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "storj.io/common/identity/testidentity" + "storj.io/common/storj" + "storj.io/common/storj/location" + "storj.io/storj/satellite/metabase" +) + +func TestParsedConfig(t *testing.T) { + config, err := LoadConfig("config_test.yaml") + require.NoError(t, err) + require.Len(t, config, 2) + + { + // checking filters + require.True(t, config[1].NodeFilter.Match(&SelectedNode{ + CountryCode: location.Germany, + })) + require.False(t, config[1].NodeFilter.Match(&SelectedNode{ + CountryCode: location.Russia, + })) + require.Equal(t, "eu-1", config[1].Name) + } + + { + // checking one invariant + node := func(ix int, owner string) SelectedNode { + return SelectedNode{ + ID: testidentity.MustPregeneratedSignedIdentity(ix, storj.LatestIDVersion()).ID, + Tags: NodeTags{ + { + Name: "owner", + Value: []byte(owner), + }, + }, + } + } + + piece := func(ix int, nodeIx int) metabase.Piece { + return metabase.Piece{ + Number: uint16(ix), StorageNode: testidentity.MustPregeneratedSignedIdentity(nodeIx, storj.LatestIDVersion()).ID, + } + } + + result := config[0].Invariant( + metabase.Pieces{ + piece(1, 1), + piece(3, 2), + piece(5, 3), + piece(9, 4), + piece(10, 5), + piece(11, 6), + }, + []SelectedNode{ + node(1, "dery"), + node(2, "blathy"), + node(3, "blathy"), + node(4, "zipernowsky"), + node(5, "zipernowsky"), + node(6, "zipernowsky"), + }) + + // last zipernowsky is too much, as we allow only 2 + require.Equal(t, 1, result.Count()) + } + + { + // checking a selector + selected, err := config[0].Selector([]*SelectedNode{ + { + Vetted: false, + }, + }, nil)(1, nil) + + // having: new, requires: 0% unvetted = 100% vetted + require.Len(t, selected, 0) + require.NoError(t, err) + + } + +} diff --git a/satellite/nodeselection/config_test.yaml b/satellite/nodeselection/config_test.yaml new file mode 100644 index 000000000000..ad2483dd64b8 --- /dev/null +++ b/satellite/nodeselection/config_test.yaml @@ -0,0 +1,14 @@ +templates: + SIGNER_ZERO: 1111111111111111111111111111111VyS547o + NORMAL: exclude(tag("$SIGNER_ZERO","soc2","true")) && exclude(tag("$SIGNER_ZERO","datacenter","true")) +placements: + - id: 0 + name: global + filter: $NORMAL + invariant: maxcontrol("tag:owner",2) + selector: unvetted(0.0,random()) + - id: 1 + name: eu-1 + filter: country("EU") && $NORMAL + invariant: maxcontrol("last_net",1) + selector: attribute("last_net") diff --git a/satellite/nodeselection/placement.go b/satellite/nodeselection/placement.go index 0fd0f5153f3d..5bb69d62bc8b 100644 --- a/satellite/nodeselection/placement.go +++ b/satellite/nodeselection/placement.go @@ -104,6 +104,11 @@ func (c ConfigurablePlacementRule) Parse(defaultPlacement func() (Placement, err } rules := c.PlacementRules if _, err := os.Stat(rules); err == nil { + if strings.HasSuffix(rules, ".yaml") { + // new style of config, all others are deprecated + return LoadConfig(rules) + + } ruleBytes, err := os.ReadFile(rules) if err != nil { return nil, ErrPlacement.New("Placement definition file couldn't be read: %s %v", rules, err) @@ -193,6 +198,7 @@ func (d PlacementDefinitions) AddPlacementRule(id storj.PlacementConstraint, fil type stringNotMatch string // AddPlacementFromString parses placement definition form string representations from id:definition;id:definition;... +// Deprecated: we will switch to the YAML based configuration. func (d PlacementDefinitions) AddPlacementFromString(definitions string) error { env := map[any]any{ "country": func(countries ...string) (NodeFilter, error) { diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 86ab6380c6e8..00d7485844a9 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -10,6 +10,8 @@ import ( "io" "math" "net" + "os" + "path/filepath" "testing" "time" @@ -18,6 +20,8 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zaptest" + "storj.io/common/identity" + "storj.io/common/identity/testidentity" "storj.io/common/memory" "storj.io/common/pb" "storj.io/common/rpc" @@ -30,6 +34,7 @@ import ( "storj.io/storj/satellite" "storj.io/storj/satellite/accounting" "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/nodeselection" "storj.io/storj/satellite/overlay" "storj.io/storj/satellite/repair/checker" "storj.io/storj/satellite/repair/queue" @@ -3335,3 +3340,123 @@ func TestRepairClumpedPieces(t *testing.T) { "%v should only include one of %s or %s", segment.Pieces, node0.ID(), node1.ID()) }) } + +func TestRepairClumpedPiecesBasedOnTags(t *testing.T) { + signer := testidentity.MustPregeneratedIdentity(50, storj.LatestIDVersion()) + tempDir := t.TempDir() + pc := identity.PeerConfig{ + CertPath: filepath.Join(tempDir, "identity.cert"), + } + require.NoError(t, pc.Save(signer.PeerIdentity())) + + placementConfig := fmt.Sprintf(` +placements: +- id: 0 + name: default + invariant: maxcontrol("tag:%s/datacenter", 2)`, signer.ID.String()) + + placementConfigPath := filepath.Join(tempDir, "placement.yaml") + require.NoError(t, os.WriteFile(placementConfigPath, []byte(placementConfig), 0755)) + + // Test that if nodes change IPs such that multiple pieces of a segment + // reside in the same network, that segment will be considered unhealthy + // by the repair checker and it will be repaired by the repair worker. + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 6, + UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + testplanet.ReconfigureRS(2, 3, 4, 4), + func(log *zap.Logger, index int, config *satellite.Config) { + config.Checker.DoDeclumping = true + config.Repairer.DoDeclumping = true + config.Placement.PlacementRules = placementConfigPath + config.TagAuthorities = pc.CertPath + }, + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + uplinkPeer := planet.Uplinks[0] + satellite := planet.Satellites[0] + // stop audit to prevent possible interactions i.e. repair timeout problems + satellite.Audit.Worker.Loop.Pause() + + satellite.RangedLoop.RangedLoop.Service.Loop.Stop() + satellite.Repair.Repairer.Loop.Pause() + + var testData = testrand.Bytes(8 * memory.KiB) + // first, upload some remote data + err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData) + require.NoError(t, err) + + segment, _ := getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, "testbucket") + remotePiecesBefore := segment.Pieces + + // that segment should be ignored by repair checker for now + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) + + injuredSegment, err := satellite.DB.RepairQueue().Select(ctx, nil, nil) + require.Error(t, err) + if !queue.ErrEmpty.Has(err) { + require.FailNow(t, "Should get ErrEmptyQueue, but got", err) + } + require.Nil(t, injuredSegment) + + // pieces list has not changed + segment, _ = getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, "testbucket") + remotePiecesAfter := segment.Pieces + require.Equal(t, remotePiecesBefore, remotePiecesAfter) + + // now move the network of one storage node holding a piece, so that it's the same as another + require.NoError(t, satellite.DB.OverlayCache().UpdateNodeTags(ctx, []nodeselection.NodeTag{ + { + NodeID: planet.FindNode(remotePiecesAfter[0].StorageNode).ID(), + SignedAt: time.Now(), + Name: "datacenter", + Value: []byte("dc1"), + Signer: signer.ID, + }, + { + NodeID: planet.FindNode(remotePiecesAfter[1].StorageNode).ID(), + SignedAt: time.Now(), + Name: "datacenter", + Value: []byte("dc1"), + Signer: signer.ID, + }, + { + NodeID: planet.FindNode(remotePiecesAfter[2].StorageNode).ID(), + SignedAt: time.Now(), + Name: "datacenter", + Value: []byte("dc1"), + Signer: signer.ID, + }, + })) + + require.NoError(t, satellite.RangedLoop.Repair.Observer.RefreshReliabilityCache(ctx)) + + // running repair checker again should put the segment into the repair queue + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) + + // and subsequently running the repair worker should pull that off the queue and repair it + satellite.Repair.Repairer.Loop.TriggerWait() + satellite.Repair.Repairer.WaitForPendingRepairs() + + // confirm that the segment now has exactly one piece on (node0 or node1) + // and still has the right number of pieces. + segment, _ = getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, "testbucket") + require.Len(t, segment.Pieces, 4) + foundOnDC1 := 0 + for _, piece := range segment.Pieces { + for i := 0; i < 3; i++ { + if piece.StorageNode.Compare(remotePiecesAfter[i].StorageNode) == 0 { + foundOnDC1++ + } + } + } + require.Equalf(t, 2, foundOnDC1, + "%v should be moved out from at least one node", segment.Pieces) + }) +}